diff options
Diffstat (limited to 'include/grpc++')
-rw-r--r-- | include/grpc++/completion_queue.h | 165 | ||||
-rw-r--r-- | include/grpc++/impl/client_unary_call.h | 38 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/async_unary_call.h | 159 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/client_unary_call.h | 76 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/completion_queue.h | 202 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/method_handler_impl.h | 211 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/rpc_method.h | 73 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/rpc_service_method.h | 92 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/server_interface.h | 4 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/sync_stream.h | 416 | ||||
-rw-r--r-- | include/grpc++/impl/method_handler_impl.h | 168 | ||||
-rw-r--r-- | include/grpc++/impl/rpc_method.h | 36 | ||||
-rw-r--r-- | include/grpc++/impl/rpc_service_method.h | 55 | ||||
-rw-r--r-- | include/grpc++/support/async_unary_call.h | 119 | ||||
-rw-r--r-- | include/grpc++/support/sync_stream.h | 379 |
15 files changed, 1239 insertions, 954 deletions
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index 911d7919e1..f53f5ee56c 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -31,172 +31,9 @@ * */ -/// A completion queue implements a concurrent producer-consumer queue, with two -/// main methods, \a Next and \a AsyncNext. #ifndef GRPCXX_COMPLETION_QUEUE_H #define GRPCXX_COMPLETION_QUEUE_H -#include <grpc++/impl/codegen/grpc_library.h> -#include <grpc++/impl/codegen/status.h> -#include <grpc++/impl/codegen/time.h> - -struct grpc_completion_queue; - -namespace grpc { - -template <class R> -class ClientReader; -template <class W> -class ClientWriter; -template <class W, class R> -class ClientReaderWriter; -template <class R> -class ServerReader; -template <class W> -class ServerWriter; -template <class W, class R> -class ServerReaderWriter; -template <class ServiceType, class RequestType, class ResponseType> -class RpcMethodHandler; -template <class ServiceType, class RequestType, class ResponseType> -class ClientStreamingHandler; -template <class ServiceType, class RequestType, class ResponseType> -class ServerStreamingHandler; -template <class ServiceType, class RequestType, class ResponseType> -class BidiStreamingHandler; -class UnknownMethodHandler; - -class Channel; -class ChannelInterface; -class ClientContext; -class CompletionQueueTag; -class CompletionQueue; -class RpcMethod; -class Server; -class ServerBuilder; -class ServerContext; - -/// A thin wrapper around \a grpc_completion_queue (see / \a -/// src/core/surface/completion_queue.h). -class CompletionQueue : private GrpcLibrary { - public: - /// Default constructor. Implicitly creates a \a grpc_completion_queue - /// instance. - CompletionQueue(); - - /// Wrap \a take, taking ownership of the instance. - /// - /// \param take The completion queue instance to wrap. Ownership is taken. - explicit CompletionQueue(grpc_completion_queue* take); - - /// Destructor. Destroys the owned wrapped completion queue / instance. - ~CompletionQueue(); - - /// Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT. - enum NextStatus { - SHUTDOWN, ///< The completion queue has been shutdown. - GOT_EVENT, ///< Got a new event; \a tag will be filled in with its - ///< associated value; \a ok indicating its success. - TIMEOUT ///< deadline was reached. - }; - - /// Read from the queue, blocking up to \a deadline (or the queue's shutdown). - /// Both \a tag and \a ok are updated upon success (if an event is available - /// within the \a deadline). A \a tag points to an arbitrary location usually - /// employed to uniquely identify an event. - /// - /// \param tag[out] Upon sucess, updated to point to the event's tag. - /// \param ok[out] Upon sucess, true if read a regular event, false otherwise. - /// \param deadline[in] How long to block in wait for an event. - /// - /// \return The type of event read. - template <typename T> - NextStatus AsyncNext(void** tag, bool* ok, const T& deadline) { - TimePoint<T> deadline_tp(deadline); - return AsyncNextInternal(tag, ok, deadline_tp.raw_time()); - } - - /// Read from the queue, blocking until an event is available or the queue is - /// shutting down. - /// - /// \param tag[out] Updated to point to the read event's tag. - /// \param ok[out] true if read a regular event, false otherwise. - /// - /// \return true if read a regular event, false if the queue is shutting down. - bool Next(void** tag, bool* ok) { - return (AsyncNextInternal(tag, ok, gpr_inf_future(GPR_CLOCK_REALTIME)) != - SHUTDOWN); - } - - /// Request the shutdown of the queue. - /// - /// \warning This method must be called at some point. Once invoked, \a Next - /// will start to return false and \a AsyncNext will return \a - /// NextStatus::SHUTDOWN. Only once either one of these methods does that - /// (that is, once the queue has been \em drained) can an instance of this - /// class be destroyed. - void Shutdown(); - - /// Returns a \em raw pointer to the underlying \a grpc_completion_queue - /// instance. - /// - /// \warning Remember that the returned instance is owned. No transfer of - /// owership is performed. - grpc_completion_queue* cq() { return cq_; } - - private: - // Friend synchronous wrappers so that they can access Pluck(), which is - // a semi-private API geared towards the synchronous implementation. - template <class R> - friend class ::grpc::ClientReader; - template <class W> - friend class ::grpc::ClientWriter; - template <class W, class R> - friend class ::grpc::ClientReaderWriter; - template <class R> - friend class ::grpc::ServerReader; - template <class W> - friend class ::grpc::ServerWriter; - template <class W, class R> - friend class ::grpc::ServerReaderWriter; - template <class ServiceType, class RequestType, class ResponseType> - friend class RpcMethodHandler; - template <class ServiceType, class RequestType, class ResponseType> - friend class ClientStreamingHandler; - template <class ServiceType, class RequestType, class ResponseType> - friend class ServerStreamingHandler; - template <class ServiceType, class RequestType, class ResponseType> - friend class BidiStreamingHandler; - friend class UnknownMethodHandler; - friend class ::grpc::Server; - friend class ::grpc::ServerContext; - template <class InputMessage, class OutputMessage> - friend Status BlockingUnaryCall(ChannelInterface* channel, - const RpcMethod& method, - ClientContext* context, - const InputMessage& request, - OutputMessage* result); - - NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); - - /// Wraps \a grpc_completion_queue_pluck. - /// \warning Must not be mixed with calls to \a Next. - bool Pluck(CompletionQueueTag* tag); - - /// Performs a single polling pluck on \a tag. - void TryPluck(CompletionQueueTag* tag); - - grpc_completion_queue* cq_; // owned -}; - -/// A specific type of completion queue used by the processing of notifications -/// by servers. Instantiated by \a ServerBuilder. -class ServerCompletionQueue : public CompletionQueue { - private: - friend class ServerBuilder; - ServerCompletionQueue() {} -}; - -} // namespace grpc +#include <grpc++/impl/codegen/completion_queue.h> #endif // GRPCXX_COMPLETION_QUEUE_H diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h index 9017bf5e35..abe321eacc 100644 --- a/include/grpc++/impl/client_unary_call.h +++ b/include/grpc++/impl/client_unary_call.h @@ -34,42 +34,6 @@ #ifndef GRPCXX_IMPL_CLIENT_UNARY_CALL_H #define GRPCXX_IMPL_CLIENT_UNARY_CALL_H -#include <grpc++/impl/codegen/call.h> -#include <grpc++/impl/codegen/channel_interface.h> -#include <grpc++/impl/codegen/config.h> -#include <grpc++/impl/codegen/status.h> - -namespace grpc { - -class Channel; -class ClientContext; -class CompletionQueue; -class RpcMethod; - -// Wrapper that performs a blocking unary call -template <class InputMessage, class OutputMessage> -Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, const InputMessage& request, - OutputMessage* result) { - CompletionQueue cq; - Call call(channel->CreateCall(method, context, &cq)); - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>, - CallOpClientSendClose, CallOpClientRecvStatus> ops; - Status status = ops.SendMessage(request); - if (!status.ok()) { - return status; - } - ops.SendInitialMetadata(context->send_initial_metadata_); - ops.RecvInitialMetadata(context); - ops.RecvMessage(result); - ops.ClientSendClose(); - ops.ClientRecvStatus(context, &status); - call.PerformOps(&ops); - GPR_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.ok()); - return status; -} - -} // namespace grpc +#include <grpc++/impl/codegen/client_unary_call.h> #endif // GRPCXX_IMPL_CLIENT_UNARY_CALL_H diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h new file mode 100644 index 0000000000..39e18415ac --- /dev/null +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -0,0 +1,159 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H +#define GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H + +#include <grpc++/impl/codegen/call.h> +#include <grpc++/impl/codegen/channel_interface.h> +#include <grpc++/impl/codegen/client_context.h> +#include <grpc++/impl/codegen/server_context.h> +#include <grpc++/impl/codegen/service_type.h> +#include <grpc++/impl/codegen/status.h> +#include <grpc/impl/codegen/log.h> + +namespace grpc { + +class CompletionQueue; + +template <class R> +class ClientAsyncResponseReaderInterface { + public: + virtual ~ClientAsyncResponseReaderInterface() {} + virtual void ReadInitialMetadata(void* tag) = 0; + virtual void Finish(R* msg, Status* status, void* tag) = 0; +}; + +template <class R> +class ClientAsyncResponseReader GRPC_FINAL + : public ClientAsyncResponseReaderInterface<R> { + public: + template <class W> + ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, + const RpcMethod& method, ClientContext* context, + const W& request) + : context_(context), call_(channel->CreateCall(method, context, cq)) { + init_buf_.SendInitialMetadata(context->send_initial_metadata_); + // TODO(ctiller): don't assert + GPR_ASSERT(init_buf_.SendMessage(request).ok()); + init_buf_.ClientSendClose(); + call_.PerformOps(&init_buf_); + } + + void ReadInitialMetadata(void* tag) { + GPR_ASSERT(!context_->initial_metadata_received_); + + meta_buf_.set_output_tag(tag); + meta_buf_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_buf_); + } + + void Finish(R* msg, Status* status, void* tag) { + finish_buf_.set_output_tag(tag); + if (!context_->initial_metadata_received_) { + finish_buf_.RecvInitialMetadata(context_); + } + finish_buf_.RecvMessage(msg); + finish_buf_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_buf_); + } + + private: + ClientContext* context_; + Call call_; + SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> + init_buf_; + CallOpSet<CallOpRecvInitialMetadata> meta_buf_; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>, + CallOpClientRecvStatus> + finish_buf_; +}; + +template <class W> +class ServerAsyncResponseWriter GRPC_FINAL + : public ServerAsyncStreamingInterface { + public: + explicit ServerAsyncResponseWriter(ServerContext* ctx) + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + + void SendInitialMetadata(void* tag) GRPC_OVERRIDE { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + meta_buf_.set_output_tag(tag); + meta_buf_.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_.PerformOps(&meta_buf_); + } + + void Finish(const W& msg, const Status& status, void* tag) { + finish_buf_.set_output_tag(tag); + if (!ctx_->sent_initial_metadata_) { + finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + // The response is dropped if the status is not OK. + if (status.ok()) { + finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, + finish_buf_.SendMessage(msg)); + } else { + finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); + } + call_.PerformOps(&finish_buf_); + } + + void FinishWithError(const Status& status, void* tag) { + GPR_ASSERT(!status.ok()); + finish_buf_.set_output_tag(tag); + if (!ctx_->sent_initial_metadata_) { + finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_buf_); + } + + private: + void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } + + Call call_; + ServerContext* ctx_; + CallOpSet<CallOpSendInitialMetadata> meta_buf_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> + finish_buf_; +}; + +} // namespace grpc + +#endif // GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H diff --git a/include/grpc++/impl/codegen/client_unary_call.h b/include/grpc++/impl/codegen/client_unary_call.h new file mode 100644 index 0000000000..817a98ac1f --- /dev/null +++ b/include/grpc++/impl/codegen/client_unary_call.h @@ -0,0 +1,76 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_IMPL_CODEGEN_CLIENT_UNARY_CALL_H +#define GRPCXX_IMPL_CODEGEN_CLIENT_UNARY_CALL_H + +#include <grpc++/impl/codegen/call.h> +#include <grpc++/impl/codegen/channel_interface.h> +#include <grpc++/impl/codegen/config.h> +#include <grpc++/impl/codegen/status.h> + +namespace grpc { + +class Channel; +class ClientContext; +class CompletionQueue; +class RpcMethod; + +// Wrapper that performs a blocking unary call +template <class InputMessage, class OutputMessage> +Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, const InputMessage& request, + OutputMessage* result) { + CompletionQueue cq; + Call call(channel->CreateCall(method, context, &cq)); + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>, + CallOpClientSendClose, CallOpClientRecvStatus> + ops; + Status status = ops.SendMessage(request); + if (!status.ok()) { + return status; + } + ops.SendInitialMetadata(context->send_initial_metadata_); + ops.RecvInitialMetadata(context); + ops.RecvMessage(result); + ops.ClientSendClose(); + ops.ClientRecvStatus(context, &status); + call.PerformOps(&ops); + GPR_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.ok()); + return status; +} + +} // namespace grpc + +#endif // GRPCXX_IMPL_CODEGEN_CLIENT_UNARY_CALL_H diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h new file mode 100644 index 0000000000..102831e1c9 --- /dev/null +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -0,0 +1,202 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/// A completion queue implements a concurrent producer-consumer queue, with two +/// main methods, \a Next and \a AsyncNext. +#ifndef GRPCXX_IMPL_CODEGEN_COMPLETION_QUEUE_H +#define GRPCXX_IMPL_CODEGEN_COMPLETION_QUEUE_H + +#include <grpc++/impl/codegen/grpc_library.h> +#include <grpc++/impl/codegen/status.h> +#include <grpc++/impl/codegen/time.h> + +struct grpc_completion_queue; + +namespace grpc { + +template <class R> +class ClientReader; +template <class W> +class ClientWriter; +template <class W, class R> +class ClientReaderWriter; +template <class R> +class ServerReader; +template <class W> +class ServerWriter; +template <class W, class R> +class ServerReaderWriter; +template <class ServiceType, class RequestType, class ResponseType> +class RpcMethodHandler; +template <class ServiceType, class RequestType, class ResponseType> +class ClientStreamingHandler; +template <class ServiceType, class RequestType, class ResponseType> +class ServerStreamingHandler; +template <class ServiceType, class RequestType, class ResponseType> +class BidiStreamingHandler; +class UnknownMethodHandler; + +class Channel; +class ChannelInterface; +class ClientContext; +class CompletionQueueTag; +class CompletionQueue; +class RpcMethod; +class Server; +class ServerBuilder; +class ServerContext; + +/// A thin wrapper around \a grpc_completion_queue (see / \a +/// src/core/surface/completion_queue.h). +class CompletionQueue : private GrpcLibrary { + public: + /// Default constructor. Implicitly creates a \a grpc_completion_queue + /// instance. + CompletionQueue(); + + /// Wrap \a take, taking ownership of the instance. + /// + /// \param take The completion queue instance to wrap. Ownership is taken. + explicit CompletionQueue(grpc_completion_queue* take); + + /// Destructor. Destroys the owned wrapped completion queue / instance. + ~CompletionQueue(); + + /// Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT. + enum NextStatus { + SHUTDOWN, ///< The completion queue has been shutdown. + GOT_EVENT, ///< Got a new event; \a tag will be filled in with its + ///< associated value; \a ok indicating its success. + TIMEOUT ///< deadline was reached. + }; + + /// Read from the queue, blocking up to \a deadline (or the queue's shutdown). + /// Both \a tag and \a ok are updated upon success (if an event is available + /// within the \a deadline). A \a tag points to an arbitrary location usually + /// employed to uniquely identify an event. + /// + /// \param tag[out] Upon sucess, updated to point to the event's tag. + /// \param ok[out] Upon sucess, true if read a regular event, false otherwise. + /// \param deadline[in] How long to block in wait for an event. + /// + /// \return The type of event read. + template <typename T> + NextStatus AsyncNext(void** tag, bool* ok, const T& deadline) { + TimePoint<T> deadline_tp(deadline); + return AsyncNextInternal(tag, ok, deadline_tp.raw_time()); + } + + /// Read from the queue, blocking until an event is available or the queue is + /// shutting down. + /// + /// \param tag[out] Updated to point to the read event's tag. + /// \param ok[out] true if read a regular event, false otherwise. + /// + /// \return true if read a regular event, false if the queue is shutting down. + bool Next(void** tag, bool* ok) { + return (AsyncNextInternal(tag, ok, gpr_inf_future(GPR_CLOCK_REALTIME)) != + SHUTDOWN); + } + + /// Request the shutdown of the queue. + /// + /// \warning This method must be called at some point. Once invoked, \a Next + /// will start to return false and \a AsyncNext will return \a + /// NextStatus::SHUTDOWN. Only once either one of these methods does that + /// (that is, once the queue has been \em drained) can an instance of this + /// class be destroyed. + void Shutdown(); + + /// Returns a \em raw pointer to the underlying \a grpc_completion_queue + /// instance. + /// + /// \warning Remember that the returned instance is owned. No transfer of + /// owership is performed. + grpc_completion_queue* cq() { return cq_; } + + private: + // Friend synchronous wrappers so that they can access Pluck(), which is + // a semi-private API geared towards the synchronous implementation. + template <class R> + friend class ::grpc::ClientReader; + template <class W> + friend class ::grpc::ClientWriter; + template <class W, class R> + friend class ::grpc::ClientReaderWriter; + template <class R> + friend class ::grpc::ServerReader; + template <class W> + friend class ::grpc::ServerWriter; + template <class W, class R> + friend class ::grpc::ServerReaderWriter; + template <class ServiceType, class RequestType, class ResponseType> + friend class RpcMethodHandler; + template <class ServiceType, class RequestType, class ResponseType> + friend class ClientStreamingHandler; + template <class ServiceType, class RequestType, class ResponseType> + friend class ServerStreamingHandler; + template <class ServiceType, class RequestType, class ResponseType> + friend class BidiStreamingHandler; + friend class UnknownMethodHandler; + friend class ::grpc::Server; + friend class ::grpc::ServerContext; + template <class InputMessage, class OutputMessage> + friend Status BlockingUnaryCall(ChannelInterface* channel, + const RpcMethod& method, + ClientContext* context, + const InputMessage& request, + OutputMessage* result); + + NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); + + /// Wraps \a grpc_completion_queue_pluck. + /// \warning Must not be mixed with calls to \a Next. + bool Pluck(CompletionQueueTag* tag); + + /// Performs a single polling pluck on \a tag. + void TryPluck(CompletionQueueTag* tag); + + grpc_completion_queue* cq_; // owned +}; + +/// A specific type of completion queue used by the processing of notifications +/// by servers. Instantiated by \a ServerBuilder. +class ServerCompletionQueue : public CompletionQueue { + private: + friend class ServerBuilder; + ServerCompletionQueue() {} +}; + +} // namespace grpc + +#endif // GRPCXX_IMPL_CODEGEN_COMPLETION_QUEUE_H diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h new file mode 100644 index 0000000000..ad65ce9484 --- /dev/null +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -0,0 +1,211 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H +#define GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H + +#include <grpc++/impl/codegen/rpc_service_method.h> +#include <grpc++/impl/codegen/sync_stream.h> + +namespace grpc { + +// A wrapper class of an application provided rpc method handler. +template <class ServiceType, class RequestType, class ResponseType> +class RpcMethodHandler : public MethodHandler { + public: + RpcMethodHandler(std::function<Status(ServiceType*, ServerContext*, + const RequestType*, ResponseType*)> + func, + ServiceType* service) + : func_(func), service_(service) {} + + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + RequestType req; + Status status = SerializationTraits<RequestType>::Deserialize( + param.request, &req, param.max_message_size); + ResponseType rsp; + if (status.ok()) { + status = func_(service_, param.server_context, &req, &rsp); + } + + GPR_ASSERT(!param.server_context->sent_initial_metadata_); + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> + ops; + ops.SendInitialMetadata(param.server_context->initial_metadata_); + if (status.ok()) { + status = ops.SendMessage(rsp); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); + } + + private: + // Application provided rpc handler function. + std::function<Status(ServiceType*, ServerContext*, const RequestType*, + ResponseType*)> + func_; + // The class the above handler function lives in. + ServiceType* service_; +}; + +// A wrapper class of an application provided client streaming handler. +template <class ServiceType, class RequestType, class ResponseType> +class ClientStreamingHandler : public MethodHandler { + public: + ClientStreamingHandler( + std::function<Status(ServiceType*, ServerContext*, + ServerReader<RequestType>*, ResponseType*)> + func, + ServiceType* service) + : func_(func), service_(service) {} + + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + ServerReader<RequestType> reader(param.call, param.server_context); + ResponseType rsp; + Status status = func_(service_, param.server_context, &reader, &rsp); + + GPR_ASSERT(!param.server_context->sent_initial_metadata_); + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> + ops; + ops.SendInitialMetadata(param.server_context->initial_metadata_); + if (status.ok()) { + status = ops.SendMessage(rsp); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); + } + + private: + std::function<Status(ServiceType*, ServerContext*, ServerReader<RequestType>*, + ResponseType*)> + func_; + ServiceType* service_; +}; + +// A wrapper class of an application provided server streaming handler. +template <class ServiceType, class RequestType, class ResponseType> +class ServerStreamingHandler : public MethodHandler { + public: + ServerStreamingHandler( + std::function<Status(ServiceType*, ServerContext*, const RequestType*, + ServerWriter<ResponseType>*)> + func, + ServiceType* service) + : func_(func), service_(service) {} + + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + RequestType req; + Status status = SerializationTraits<RequestType>::Deserialize( + param.request, &req, param.max_message_size); + + if (status.ok()) { + ServerWriter<ResponseType> writer(param.call, param.server_context); + status = func_(service_, param.server_context, &req, &writer); + } + + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; + if (!param.server_context->sent_initial_metadata_) { + ops.SendInitialMetadata(param.server_context->initial_metadata_); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); + } + + private: + std::function<Status(ServiceType*, ServerContext*, const RequestType*, + ServerWriter<ResponseType>*)> + func_; + ServiceType* service_; +}; + +// A wrapper class of an application provided bidi-streaming handler. +template <class ServiceType, class RequestType, class ResponseType> +class BidiStreamingHandler : public MethodHandler { + public: + BidiStreamingHandler( + std::function<Status(ServiceType*, ServerContext*, + ServerReaderWriter<ResponseType, RequestType>*)> + func, + ServiceType* service) + : func_(func), service_(service) {} + + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + ServerReaderWriter<ResponseType, RequestType> stream(param.call, + param.server_context); + Status status = func_(service_, param.server_context, &stream); + + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; + if (!param.server_context->sent_initial_metadata_) { + ops.SendInitialMetadata(param.server_context->initial_metadata_); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); + } + + private: + std::function<Status(ServiceType*, ServerContext*, + ServerReaderWriter<ResponseType, RequestType>*)> + func_; + ServiceType* service_; +}; + +// Handle unknown method by returning UNIMPLEMENTED error. +class UnknownMethodHandler : public MethodHandler { + public: + template <class T> + static void FillOps(ServerContext* context, T* ops) { + Status status(StatusCode::UNIMPLEMENTED, ""); + if (!context->sent_initial_metadata_) { + ops->SendInitialMetadata(context->initial_metadata_); + context->sent_initial_metadata_ = true; + } + ops->ServerSendStatus(context->trailing_metadata_, status); + } + + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; + FillOps(param.server_context, &ops); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); + } +}; + +} // namespace grpc + +#endif // GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H diff --git a/include/grpc++/impl/codegen/rpc_method.h b/include/grpc++/impl/codegen/rpc_method.h new file mode 100644 index 0000000000..85d5c1cfe2 --- /dev/null +++ b/include/grpc++/impl/codegen/rpc_method.h @@ -0,0 +1,73 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_IMPL_CODEGEN_RPC_METHOD_H +#define GRPCXX_IMPL_CODEGEN_RPC_METHOD_H + +#include <memory> + +#include <grpc++/impl/codegen/channel_interface.h> + +namespace grpc { + +class RpcMethod { + public: + enum RpcType { + NORMAL_RPC = 0, + CLIENT_STREAMING, // request streaming + SERVER_STREAMING, // response streaming + BIDI_STREAMING + }; + + RpcMethod(const char* name, RpcType type) + : name_(name), method_type_(type), channel_tag_(NULL) {} + + RpcMethod(const char* name, RpcType type, + const std::shared_ptr<ChannelInterface>& channel) + : name_(name), + method_type_(type), + channel_tag_(channel->RegisterMethod(name)) {} + + const char* name() const { return name_; } + RpcType method_type() const { return method_type_; } + void* channel_tag() const { return channel_tag_; } + + private: + const char* const name_; + const RpcType method_type_; + void* const channel_tag_; +}; + +} // namespace grpc + +#endif // GRPCXX_IMPL_CODEGEN_RPC_METHOD_H diff --git a/include/grpc++/impl/codegen/rpc_service_method.h b/include/grpc++/impl/codegen/rpc_service_method.h new file mode 100644 index 0000000000..519d942fc4 --- /dev/null +++ b/include/grpc++/impl/codegen/rpc_service_method.h @@ -0,0 +1,92 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_IMPL_CODEGEN_RPC_SERVICE_METHOD_H +#define GRPCXX_IMPL_CODEGEN_RPC_SERVICE_METHOD_H + +#include <climits> +#include <functional> +#include <map> +#include <memory> +#include <vector> + +#include <grpc++/impl/codegen/config.h> +#include <grpc++/impl/codegen/rpc_method.h> +#include <grpc++/impl/codegen/status.h> + +namespace grpc { +class ServerContext; +class StreamContextInterface; + +// Base class for running an RPC handler. +class MethodHandler { + public: + virtual ~MethodHandler() {} + struct HandlerParameter { + HandlerParameter(Call* c, ServerContext* context, grpc_byte_buffer* req, + int max_size) + : call(c), + server_context(context), + request(req), + max_message_size(max_size) {} + Call* call; + ServerContext* server_context; + // Handler required to grpc_byte_buffer_destroy this + grpc_byte_buffer* request; + int max_message_size; + }; + virtual void RunHandler(const HandlerParameter& param) = 0; +}; + +// Server side rpc method class +class RpcServiceMethod : public RpcMethod { + public: + // Takes ownership of the handler + RpcServiceMethod(const char* name, RpcMethod::RpcType type, + MethodHandler* handler) + : RpcMethod(name, type), server_tag_(nullptr), handler_(handler) {} + + void set_server_tag(void* tag) { server_tag_ = tag; } + void* server_tag() const { return server_tag_; } + // if MethodHandler is nullptr, then this is an async method + MethodHandler* handler() const { return handler_.get(); } + void ResetHandler() { handler_.reset(); } + + private: + void* server_tag_; + std::unique_ptr<MethodHandler> handler_; +}; + +} // namespace grpc + +#endif // GRPCXX_IMPL_CODEGEN_RPC_SERVICE_METHOD_H diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h index b0bc10dc6d..96934edff3 100644 --- a/include/grpc++/impl/codegen/server_interface.h +++ b/include/grpc++/impl/codegen/server_interface.h @@ -34,8 +34,9 @@ #ifndef GRPCXX_IMPL_CODEGEN_SERVER_INTERFACE_H #define GRPCXX_IMPL_CODEGEN_SERVER_INTERFACE_H -#include <grpc++/impl/codegen/completion_queue_tag.h> #include <grpc++/impl/codegen/call_hook.h> +#include <grpc++/impl/codegen/completion_queue_tag.h> +#include <grpc++/impl/codegen/rpc_service_method.h> namespace grpc { @@ -43,7 +44,6 @@ class AsyncGenericService; class AsynchronousService; class GenericServerContext; class RpcService; -class RpcServiceMethod; class ServerAsyncStreamingInterface; class ServerCompletionQueue; class ServerContext; diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h new file mode 100644 index 0000000000..b8cd44fb09 --- /dev/null +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -0,0 +1,416 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H +#define GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H + +#include <grpc++/impl/codegen/call.h> +#include <grpc++/impl/codegen/channel_interface.h> +#include <grpc++/impl/codegen/client_context.h> +#include <grpc++/impl/codegen/completion_queue.h> +#include <grpc++/impl/codegen/server_context.h> +#include <grpc++/impl/codegen/service_type.h> +#include <grpc++/impl/codegen/status.h> +#include <grpc/impl/codegen/log.h> + +namespace grpc { + +/// Common interface for all synchronous client side streaming. +class ClientStreamingInterface { + public: + virtual ~ClientStreamingInterface() {} + + /// Wait until the stream finishes, and return the final status. When the + /// client side declares it has no more message to send, either implicitly or + /// by calling \a WritesDone(), it needs to make sure there is no more message + /// to be received from the server, either implicitly or by getting a false + /// from a \a Read(). + /// + /// This function will return either: + /// - when all incoming messages have been read and the server has returned + /// status. + /// - OR when the server has returned a non-OK status. + virtual Status Finish() = 0; +}; + +/// An interface that yields a sequence of messages of type \a R. +template <class R> +class ReaderInterface { + public: + virtual ~ReaderInterface() {} + + /// Blocking read a message and parse to \a msg. Returns \a true on success. + /// + /// \param[out] msg The read message. + /// + /// \return \a false when there will be no more incoming messages, either + /// because the other side has called \a WritesDone() or the stream has failed + /// (or been cancelled). + virtual bool Read(R* msg) = 0; +}; + +/// An interface that can be fed a sequence of messages of type \a W. +template <class W> +class WriterInterface { + public: + virtual ~WriterInterface() {} + + /// Blocking write \a msg to the stream with options. + /// + /// \param msg The message to be written to the stream. + /// \param options Options affecting the write operation. + /// + /// \return \a true on success, \a false when the stream has been closed. + virtual bool Write(const W& msg, const WriteOptions& options) = 0; + + /// Blocking write \a msg to the stream with default options. + /// + /// \param msg The message to be written to the stream. + /// + /// \return \a true on success, \a false when the stream has been closed. + inline bool Write(const W& msg) { return Write(msg, WriteOptions()); } +}; + +/// Client-side interface for streaming reads of message of type \a R. +template <class R> +class ClientReaderInterface : public ClientStreamingInterface, + public ReaderInterface<R> { + public: + /// Blocking wait for initial metadata from server. The received metadata + /// can only be accessed after this call returns. Should only be called before + /// the first read. Calling this method is optional, and if it is not called + /// the metadata will be available in ClientContext after the first read. + virtual void WaitForInitialMetadata() = 0; +}; + +template <class R> +class ClientReader GRPC_FINAL : public ClientReaderInterface<R> { + public: + /// Blocking create a stream and write the first request out. + template <class W> + ClientReader(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, const W& request) + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> + ops; + ops.SendInitialMetadata(context->send_initial_metadata_); + // TODO(ctiller): don't assert + GPR_ASSERT(ops.SendMessage(request).ok()); + ops.ClientSendClose(); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } + + void WaitForInitialMetadata() GRPC_OVERRIDE { + GPR_ASSERT(!context_->initial_metadata_received_); + + CallOpSet<CallOpRecvInitialMetadata> ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); /// status ignored + } + + bool Read(R* msg) GRPC_OVERRIDE { + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; + if (!context_->initial_metadata_received_) { + ops.RecvInitialMetadata(context_); + } + ops.RecvMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops) && ops.got_message; + } + + Status Finish() GRPC_OVERRIDE { + CallOpSet<CallOpClientRecvStatus> ops; + Status status; + ops.ClientRecvStatus(context_, &status); + call_.PerformOps(&ops); + GPR_ASSERT(cq_.Pluck(&ops)); + return status; + } + + private: + ClientContext* context_; + CompletionQueue cq_; + Call call_; +}; + +/// Client-side interface for streaming writes of message of type \a W. +template <class W> +class ClientWriterInterface : public ClientStreamingInterface, + public WriterInterface<W> { + public: + /// Half close writing from the client. + /// Block until writes are completed. + /// + /// \return Whether the writes were successful. + virtual bool WritesDone() = 0; +}; + +template <class W> +class ClientWriter : public ClientWriterInterface<W> { + public: + /// Blocking create a stream. + template <class R> + ClientWriter(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, R* response) + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + finish_ops_.RecvMessage(response); + + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } + + using WriterInterface<W>::Write; + bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { + CallOpSet<CallOpSendMessage> ops; + if (!ops.SendMessage(msg, options).ok()) { + return false; + } + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + bool WritesDone() GRPC_OVERRIDE { + CallOpSet<CallOpClientSendClose> ops; + ops.ClientSendClose(); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + /// Read the final response and wait for the final status. + Status Finish() GRPC_OVERRIDE { + Status status; + finish_ops_.ClientRecvStatus(context_, &status); + call_.PerformOps(&finish_ops_); + GPR_ASSERT(cq_.Pluck(&finish_ops_)); + return status; + } + + private: + ClientContext* context_; + CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_; + CompletionQueue cq_; + Call call_; +}; + +/// Client-side interface for bi-directional streaming. +template <class W, class R> +class ClientReaderWriterInterface : public ClientStreamingInterface, + public WriterInterface<W>, + public ReaderInterface<R> { + public: + /// Blocking wait for initial metadata from server. The received metadata + /// can only be accessed after this call returns. Should only be called before + /// the first read. Calling this method is optional, and if it is not called + /// the metadata will be available in ClientContext after the first read. + virtual void WaitForInitialMetadata() = 0; + + /// Block until writes are completed. + /// + /// \return Whether the writes were successful. + virtual bool WritesDone() = 0; +}; + +template <class W, class R> +class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { + public: + /// Blocking create a stream. + ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context) + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } + + void WaitForInitialMetadata() GRPC_OVERRIDE { + GPR_ASSERT(!context_->initial_metadata_received_); + + CallOpSet<CallOpRecvInitialMetadata> ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); // status ignored + } + + bool Read(R* msg) GRPC_OVERRIDE { + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; + if (!context_->initial_metadata_received_) { + ops.RecvInitialMetadata(context_); + } + ops.RecvMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops) && ops.got_message; + } + + using WriterInterface<W>::Write; + bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { + CallOpSet<CallOpSendMessage> ops; + if (!ops.SendMessage(msg, options).ok()) return false; + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + bool WritesDone() GRPC_OVERRIDE { + CallOpSet<CallOpClientSendClose> ops; + ops.ClientSendClose(); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + Status Finish() GRPC_OVERRIDE { + CallOpSet<CallOpClientRecvStatus> ops; + Status status; + ops.ClientRecvStatus(context_, &status); + call_.PerformOps(&ops); + GPR_ASSERT(cq_.Pluck(&ops)); + return status; + } + + private: + ClientContext* context_; + CompletionQueue cq_; + Call call_; +}; + +template <class R> +class ServerReader GRPC_FINAL : public ReaderInterface<R> { + public: + ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + + void SendInitialMetadata() { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); + } + + bool Read(R* msg) GRPC_OVERRIDE { + CallOpSet<CallOpRecvMessage<R>> ops; + ops.RecvMessage(msg); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops) && ops.got_message; + } + + private: + Call* const call_; + ServerContext* const ctx_; +}; + +template <class W> +class ServerWriter GRPC_FINAL : public WriterInterface<W> { + public: + ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + + void SendInitialMetadata() { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); + } + + using WriterInterface<W>::Write; + bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; + if (!ops.SendMessage(msg, options).ok()) { + return false; + } + if (!ctx_->sent_initial_metadata_) { + ops.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops); + } + + private: + Call* const call_; + ServerContext* const ctx_; +}; + +/// Server-side interface for bi-directional streaming. +template <class W, class R> +class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>, + public ReaderInterface<R> { + public: + ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + + void SendInitialMetadata() { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); + } + + bool Read(R* msg) GRPC_OVERRIDE { + CallOpSet<CallOpRecvMessage<R>> ops; + ops.RecvMessage(msg); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops) && ops.got_message; + } + + using WriterInterface<W>::Write; + bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; + if (!ops.SendMessage(msg, options).ok()) { + return false; + } + if (!ctx_->sent_initial_metadata_) { + ops.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops); + } + + private: + Call* const call_; + ServerContext* const ctx_; +}; + +} // namespace grpc + +#endif // GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H diff --git a/include/grpc++/impl/method_handler_impl.h b/include/grpc++/impl/method_handler_impl.h index 2997cb0e62..305fd2865e 100644 --- a/include/grpc++/impl/method_handler_impl.h +++ b/include/grpc++/impl/method_handler_impl.h @@ -34,170 +34,6 @@ #ifndef GRPCXX_IMPL_METHOD_HANDLER_IMPL_H #define GRPCXX_IMPL_METHOD_HANDLER_IMPL_H -#include <grpc++/impl/rpc_service_method.h> -#include <grpc++/support/sync_stream.h> +#include <grpc++/impl/codegen/method_handler_impl.h> -namespace grpc { - -// A wrapper class of an application provided rpc method handler. -template <class ServiceType, class RequestType, class ResponseType> -class RpcMethodHandler : public MethodHandler { - public: - RpcMethodHandler( - std::function<Status(ServiceType*, ServerContext*, const RequestType*, - ResponseType*)> func, - ServiceType* service) - : func_(func), service_(service) {} - - void RunHandler(const HandlerParameter& param) GRPC_FINAL { - RequestType req; - Status status = SerializationTraits<RequestType>::Deserialize( - param.request, &req, param.max_message_size); - ResponseType rsp; - if (status.ok()) { - status = func_(service_, param.server_context, &req, &rsp); - } - - GPR_ASSERT(!param.server_context->sent_initial_metadata_); - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> ops; - ops.SendInitialMetadata(param.server_context->initial_metadata_); - if (status.ok()) { - status = ops.SendMessage(rsp); - } - ops.ServerSendStatus(param.server_context->trailing_metadata_, status); - param.call->PerformOps(&ops); - param.call->cq()->Pluck(&ops); - } - - private: - // Application provided rpc handler function. - std::function<Status(ServiceType*, ServerContext*, const RequestType*, - ResponseType*)> func_; - // The class the above handler function lives in. - ServiceType* service_; -}; - -// A wrapper class of an application provided client streaming handler. -template <class ServiceType, class RequestType, class ResponseType> -class ClientStreamingHandler : public MethodHandler { - public: - ClientStreamingHandler( - std::function<Status(ServiceType*, ServerContext*, - ServerReader<RequestType>*, ResponseType*)> func, - ServiceType* service) - : func_(func), service_(service) {} - - void RunHandler(const HandlerParameter& param) GRPC_FINAL { - ServerReader<RequestType> reader(param.call, param.server_context); - ResponseType rsp; - Status status = func_(service_, param.server_context, &reader, &rsp); - - GPR_ASSERT(!param.server_context->sent_initial_metadata_); - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> ops; - ops.SendInitialMetadata(param.server_context->initial_metadata_); - if (status.ok()) { - status = ops.SendMessage(rsp); - } - ops.ServerSendStatus(param.server_context->trailing_metadata_, status); - param.call->PerformOps(&ops); - param.call->cq()->Pluck(&ops); - } - - private: - std::function<Status(ServiceType*, ServerContext*, ServerReader<RequestType>*, - ResponseType*)> func_; - ServiceType* service_; -}; - -// A wrapper class of an application provided server streaming handler. -template <class ServiceType, class RequestType, class ResponseType> -class ServerStreamingHandler : public MethodHandler { - public: - ServerStreamingHandler( - std::function<Status(ServiceType*, ServerContext*, const RequestType*, - ServerWriter<ResponseType>*)> func, - ServiceType* service) - : func_(func), service_(service) {} - - void RunHandler(const HandlerParameter& param) GRPC_FINAL { - RequestType req; - Status status = SerializationTraits<RequestType>::Deserialize( - param.request, &req, param.max_message_size); - - if (status.ok()) { - ServerWriter<ResponseType> writer(param.call, param.server_context); - status = func_(service_, param.server_context, &req, &writer); - } - - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; - if (!param.server_context->sent_initial_metadata_) { - ops.SendInitialMetadata(param.server_context->initial_metadata_); - } - ops.ServerSendStatus(param.server_context->trailing_metadata_, status); - param.call->PerformOps(&ops); - param.call->cq()->Pluck(&ops); - } - - private: - std::function<Status(ServiceType*, ServerContext*, const RequestType*, - ServerWriter<ResponseType>*)> func_; - ServiceType* service_; -}; - -// A wrapper class of an application provided bidi-streaming handler. -template <class ServiceType, class RequestType, class ResponseType> -class BidiStreamingHandler : public MethodHandler { - public: - BidiStreamingHandler( - std::function<Status(ServiceType*, ServerContext*, - ServerReaderWriter<ResponseType, RequestType>*)> - func, - ServiceType* service) - : func_(func), service_(service) {} - - void RunHandler(const HandlerParameter& param) GRPC_FINAL { - ServerReaderWriter<ResponseType, RequestType> stream(param.call, - param.server_context); - Status status = func_(service_, param.server_context, &stream); - - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; - if (!param.server_context->sent_initial_metadata_) { - ops.SendInitialMetadata(param.server_context->initial_metadata_); - } - ops.ServerSendStatus(param.server_context->trailing_metadata_, status); - param.call->PerformOps(&ops); - param.call->cq()->Pluck(&ops); - } - - private: - std::function<Status(ServiceType*, ServerContext*, - ServerReaderWriter<ResponseType, RequestType>*)> func_; - ServiceType* service_; -}; - -// Handle unknown method by returning UNIMPLEMENTED error. -class UnknownMethodHandler : public MethodHandler { - public: - template <class T> - static void FillOps(ServerContext* context, T* ops) { - Status status(StatusCode::UNIMPLEMENTED, ""); - if (!context->sent_initial_metadata_) { - ops->SendInitialMetadata(context->initial_metadata_); - context->sent_initial_metadata_ = true; - } - ops->ServerSendStatus(context->trailing_metadata_, status); - } - - void RunHandler(const HandlerParameter& param) GRPC_FINAL { - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; - FillOps(param.server_context, &ops); - param.call->PerformOps(&ops); - param.call->cq()->Pluck(&ops); - } -}; - -} // namespace grpc - -#endif // GRPCXX_IMPL_METHOD_HANDLER_IMPL_H
\ No newline at end of file +#endif // GRPCXX_IMPL_METHOD_HANDLER_IMPL_H diff --git a/include/grpc++/impl/rpc_method.h b/include/grpc++/impl/rpc_method.h index 387891727d..8b5d1a3282 100644 --- a/include/grpc++/impl/rpc_method.h +++ b/include/grpc++/impl/rpc_method.h @@ -34,40 +34,6 @@ #ifndef GRPCXX_IMPL_RPC_METHOD_H #define GRPCXX_IMPL_RPC_METHOD_H -#include <memory> - -#include <grpc++/impl/codegen/channel_interface.h> - -namespace grpc { - -class RpcMethod { - public: - enum RpcType { - NORMAL_RPC = 0, - CLIENT_STREAMING, // request streaming - SERVER_STREAMING, // response streaming - BIDI_STREAMING - }; - - RpcMethod(const char* name, RpcType type) - : name_(name), method_type_(type), channel_tag_(NULL) {} - - RpcMethod(const char* name, RpcType type, - const std::shared_ptr<ChannelInterface>& channel) - : name_(name), - method_type_(type), - channel_tag_(channel->RegisterMethod(name)) {} - - const char* name() const { return name_; } - RpcType method_type() const { return method_type_; } - void* channel_tag() const { return channel_tag_; } - - private: - const char* const name_; - const RpcType method_type_; - void* const channel_tag_; -}; - -} // namespace grpc +#include <grpc++/impl/codegen/rpc_method.h> #endif // GRPCXX_IMPL_RPC_METHOD_H diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h index a0bae80dab..101e6a636b 100644 --- a/include/grpc++/impl/rpc_service_method.h +++ b/include/grpc++/impl/rpc_service_method.h @@ -34,59 +34,6 @@ #ifndef GRPCXX_IMPL_RPC_SERVICE_METHOD_H #define GRPCXX_IMPL_RPC_SERVICE_METHOD_H -#include <climits> -#include <functional> -#include <map> -#include <memory> -#include <vector> - -#include <grpc++/impl/rpc_method.h> -#include <grpc++/support/config.h> -#include <grpc++/support/status.h> - -namespace grpc { -class ServerContext; -class StreamContextInterface; - -// Base class for running an RPC handler. -class MethodHandler { - public: - virtual ~MethodHandler() {} - struct HandlerParameter { - HandlerParameter(Call* c, ServerContext* context, grpc_byte_buffer* req, - int max_size) - : call(c), - server_context(context), - request(req), - max_message_size(max_size) {} - Call* call; - ServerContext* server_context; - // Handler required to grpc_byte_buffer_destroy this - grpc_byte_buffer* request; - int max_message_size; - }; - virtual void RunHandler(const HandlerParameter& param) = 0; -}; - -// Server side rpc method class -class RpcServiceMethod : public RpcMethod { - public: - // Takes ownership of the handler - RpcServiceMethod(const char* name, RpcMethod::RpcType type, - MethodHandler* handler) - : RpcMethod(name, type), server_tag_(nullptr), handler_(handler) {} - - void set_server_tag(void* tag) { server_tag_ = tag; } - void* server_tag() const { return server_tag_; } - // if MethodHandler is nullptr, then this is an async method - MethodHandler* handler() const { return handler_.get(); } - void ResetHandler() { handler_.reset(); } - - private: - void* server_tag_; - std::unique_ptr<MethodHandler> handler_; -}; - -} // namespace grpc +#include <grpc++/impl/codegen/rpc_service_method.h> #endif // GRPCXX_IMPL_RPC_SERVICE_METHOD_H diff --git a/include/grpc++/support/async_unary_call.h b/include/grpc++/support/async_unary_call.h index 3a601e063d..6d74328be5 100644 --- a/include/grpc++/support/async_unary_call.h +++ b/include/grpc++/support/async_unary_call.h @@ -34,123 +34,6 @@ #ifndef GRPCXX_SUPPORT_ASYNC_UNARY_CALL_H #define GRPCXX_SUPPORT_ASYNC_UNARY_CALL_H -#include <grpc/impl/codegen/log.h> -#include <grpc++/impl/codegen/channel_interface.h> -#include <grpc++/impl/codegen/client_context.h> -#include <grpc++/impl/codegen/server_context.h> -#include <grpc++/impl/codegen/call.h> -#include <grpc++/impl/codegen/service_type.h> -#include <grpc++/impl/codegen/status.h> - -namespace grpc { - -class CompletionQueue; - -template <class R> -class ClientAsyncResponseReaderInterface { - public: - virtual ~ClientAsyncResponseReaderInterface() {} - virtual void ReadInitialMetadata(void* tag) = 0; - virtual void Finish(R* msg, Status* status, void* tag) = 0; -}; - -template <class R> -class ClientAsyncResponseReader GRPC_FINAL - : public ClientAsyncResponseReaderInterface<R> { - public: - template <class W> - ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - const W& request) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_buf_.SendInitialMetadata(context->send_initial_metadata_); - // TODO(ctiller): don't assert - GPR_ASSERT(init_buf_.SendMessage(request).ok()); - init_buf_.ClientSendClose(); - call_.PerformOps(&init_buf_); - } - - void ReadInitialMetadata(void* tag) { - GPR_ASSERT(!context_->initial_metadata_received_); - - meta_buf_.set_output_tag(tag); - meta_buf_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_buf_); - } - - void Finish(R* msg, Status* status, void* tag) { - finish_buf_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - finish_buf_.RecvInitialMetadata(context_); - } - finish_buf_.RecvMessage(msg); - finish_buf_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_buf_); - } - - private: - ClientContext* context_; - Call call_; - SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpClientSendClose> init_buf_; - CallOpSet<CallOpRecvInitialMetadata> meta_buf_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>, - CallOpClientRecvStatus> finish_buf_; -}; - -template <class W> -class ServerAsyncResponseWriter GRPC_FINAL - : public ServerAsyncStreamingInterface { - public: - explicit ServerAsyncResponseWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} - - void SendInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_ASSERT(!ctx_->sent_initial_metadata_); - - meta_buf_.set_output_tag(tag); - meta_buf_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_buf_); - } - - void Finish(const W& msg, const Status& status, void* tag) { - finish_buf_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - // The response is dropped if the status is not OK. - if (status.ok()) { - finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, - finish_buf_.SendMessage(msg)); - } else { - finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); - } - call_.PerformOps(&finish_buf_); - } - - void FinishWithError(const Status& status, void* tag) { - GPR_ASSERT(!status.ok()); - finish_buf_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_buf_); - } - - private: - void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } - - Call call_; - ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_buf_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> finish_buf_; -}; - -} // namespace grpc +#include <grpc++/impl/codegen/async_unary_call.h> #endif // GRPCXX_SUPPORT_ASYNC_UNARY_CALL_H diff --git a/include/grpc++/support/sync_stream.h b/include/grpc++/support/sync_stream.h index 3557ba5156..2ea2ac5443 100644 --- a/include/grpc++/support/sync_stream.h +++ b/include/grpc++/support/sync_stream.h @@ -34,383 +34,6 @@ #ifndef GRPCXX_SUPPORT_SYNC_STREAM_H #define GRPCXX_SUPPORT_SYNC_STREAM_H -#include <grpc++/channel.h> -#include <grpc++/client_context.h> -#include <grpc++/completion_queue.h> -#include <grpc++/impl/call.h> -#include <grpc++/impl/codegen/channel_interface.h> -#include <grpc++/impl/service_type.h> -#include <grpc++/server_context.h> -#include <grpc++/support/status.h> -#include <grpc/support/log.h> - -namespace grpc { - -/// Common interface for all synchronous client side streaming. -class ClientStreamingInterface { - public: - virtual ~ClientStreamingInterface() {} - - /// Wait until the stream finishes, and return the final status. When the - /// client side declares it has no more message to send, either implicitly or - /// by calling \a WritesDone(), it needs to make sure there is no more message - /// to be received from the server, either implicitly or by getting a false - /// from a \a Read(). - /// - /// This function will return either: - /// - when all incoming messages have been read and the server has returned - /// status. - /// - OR when the server has returned a non-OK status. - virtual Status Finish() = 0; -}; - -/// An interface that yields a sequence of messages of type \a R. -template <class R> -class ReaderInterface { - public: - virtual ~ReaderInterface() {} - - /// Blocking read a message and parse to \a msg. Returns \a true on success. - /// - /// \param[out] msg The read message. - /// - /// \return \a false when there will be no more incoming messages, either - /// because the other side has called \a WritesDone() or the stream has failed - /// (or been cancelled). - virtual bool Read(R* msg) = 0; -}; - -/// An interface that can be fed a sequence of messages of type \a W. -template <class W> -class WriterInterface { - public: - virtual ~WriterInterface() {} - - /// Blocking write \a msg to the stream with options. - /// - /// \param msg The message to be written to the stream. - /// \param options Options affecting the write operation. - /// - /// \return \a true on success, \a false when the stream has been closed. - virtual bool Write(const W& msg, const WriteOptions& options) = 0; - - /// Blocking write \a msg to the stream with default options. - /// - /// \param msg The message to be written to the stream. - /// - /// \return \a true on success, \a false when the stream has been closed. - inline bool Write(const W& msg) { return Write(msg, WriteOptions()); } -}; - -/// Client-side interface for streaming reads of message of type \a R. -template <class R> -class ClientReaderInterface : public ClientStreamingInterface, - public ReaderInterface<R> { - public: - /// Blocking wait for initial metadata from server. The received metadata - /// can only be accessed after this call returns. Should only be called before - /// the first read. Calling this method is optional, and if it is not called - /// the metadata will be available in ClientContext after the first read. - virtual void WaitForInitialMetadata() = 0; -}; - -template <class R> -class ClientReader GRPC_FINAL : public ClientReaderInterface<R> { - public: - /// Blocking create a stream and write the first request out. - template <class W> - ClientReader(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, const W& request) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpClientSendClose> ops; - ops.SendInitialMetadata(context->send_initial_metadata_); - // TODO(ctiller): don't assert - GPR_ASSERT(ops.SendMessage(request).ok()); - ops.ClientSendClose(); - call_.PerformOps(&ops); - cq_.Pluck(&ops); - } - - void WaitForInitialMetadata() GRPC_OVERRIDE { - GPR_ASSERT(!context_->initial_metadata_received_); - - CallOpSet<CallOpRecvInitialMetadata> ops; - ops.RecvInitialMetadata(context_); - call_.PerformOps(&ops); - cq_.Pluck(&ops); /// status ignored - } - - bool Read(R* msg) GRPC_OVERRIDE { - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; - if (!context_->initial_metadata_received_) { - ops.RecvInitialMetadata(context_); - } - ops.RecvMessage(msg); - call_.PerformOps(&ops); - return cq_.Pluck(&ops) && ops.got_message; - } - - Status Finish() GRPC_OVERRIDE { - CallOpSet<CallOpClientRecvStatus> ops; - Status status; - ops.ClientRecvStatus(context_, &status); - call_.PerformOps(&ops); - GPR_ASSERT(cq_.Pluck(&ops)); - return status; - } - - private: - ClientContext* context_; - CompletionQueue cq_; - Call call_; -}; - -/// Client-side interface for streaming writes of message of type \a W. -template <class W> -class ClientWriterInterface : public ClientStreamingInterface, - public WriterInterface<W> { - public: - /// Half close writing from the client. - /// Block until writes are completed. - /// - /// \return Whether the writes were successful. - virtual bool WritesDone() = 0; -}; - -template <class W> -class ClientWriter : public ClientWriterInterface<W> { - public: - /// Blocking create a stream. - template <class R> - ClientWriter(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, R* response) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { - finish_ops_.RecvMessage(response); - - CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&ops); - cq_.Pluck(&ops); - } - - using WriterInterface<W>::Write; - bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { - CallOpSet<CallOpSendMessage> ops; - if (!ops.SendMessage(msg, options).ok()) { - return false; - } - call_.PerformOps(&ops); - return cq_.Pluck(&ops); - } - - bool WritesDone() GRPC_OVERRIDE { - CallOpSet<CallOpClientSendClose> ops; - ops.ClientSendClose(); - call_.PerformOps(&ops); - return cq_.Pluck(&ops); - } - - /// Read the final response and wait for the final status. - Status Finish() GRPC_OVERRIDE { - Status status; - finish_ops_.ClientRecvStatus(context_, &status); - call_.PerformOps(&finish_ops_); - GPR_ASSERT(cq_.Pluck(&finish_ops_)); - return status; - } - - private: - ClientContext* context_; - CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_; - CompletionQueue cq_; - Call call_; -}; - -/// Client-side interface for bi-directional streaming. -template <class W, class R> -class ClientReaderWriterInterface : public ClientStreamingInterface, - public WriterInterface<W>, - public ReaderInterface<R> { - public: - /// Blocking wait for initial metadata from server. The received metadata - /// can only be accessed after this call returns. Should only be called before - /// the first read. Calling this method is optional, and if it is not called - /// the metadata will be available in ClientContext after the first read. - virtual void WaitForInitialMetadata() = 0; - - /// Block until writes are completed. - /// - /// \return Whether the writes were successful. - virtual bool WritesDone() = 0; -}; - -template <class W, class R> -class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { - public: - /// Blocking create a stream. - ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { - CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&ops); - cq_.Pluck(&ops); - } - - void WaitForInitialMetadata() GRPC_OVERRIDE { - GPR_ASSERT(!context_->initial_metadata_received_); - - CallOpSet<CallOpRecvInitialMetadata> ops; - ops.RecvInitialMetadata(context_); - call_.PerformOps(&ops); - cq_.Pluck(&ops); // status ignored - } - - bool Read(R* msg) GRPC_OVERRIDE { - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; - if (!context_->initial_metadata_received_) { - ops.RecvInitialMetadata(context_); - } - ops.RecvMessage(msg); - call_.PerformOps(&ops); - return cq_.Pluck(&ops) && ops.got_message; - } - - using WriterInterface<W>::Write; - bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { - CallOpSet<CallOpSendMessage> ops; - if (!ops.SendMessage(msg, options).ok()) return false; - call_.PerformOps(&ops); - return cq_.Pluck(&ops); - } - - bool WritesDone() GRPC_OVERRIDE { - CallOpSet<CallOpClientSendClose> ops; - ops.ClientSendClose(); - call_.PerformOps(&ops); - return cq_.Pluck(&ops); - } - - Status Finish() GRPC_OVERRIDE { - CallOpSet<CallOpClientRecvStatus> ops; - Status status; - ops.ClientRecvStatus(context_, &status); - call_.PerformOps(&ops); - GPR_ASSERT(cq_.Pluck(&ops)); - return status; - } - - private: - ClientContext* context_; - CompletionQueue cq_; - Call call_; -}; - -template <class R> -class ServerReader GRPC_FINAL : public ReaderInterface<R> { - public: - ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} - - void SendInitialMetadata() { - GPR_ASSERT(!ctx_->sent_initial_metadata_); - - CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&ops); - call_->cq()->Pluck(&ops); - } - - bool Read(R* msg) GRPC_OVERRIDE { - CallOpSet<CallOpRecvMessage<R>> ops; - ops.RecvMessage(msg); - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops) && ops.got_message; - } - - private: - Call* const call_; - ServerContext* const ctx_; -}; - -template <class W> -class ServerWriter GRPC_FINAL : public WriterInterface<W> { - public: - ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} - - void SendInitialMetadata() { - GPR_ASSERT(!ctx_->sent_initial_metadata_); - - CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&ops); - call_->cq()->Pluck(&ops); - } - - using WriterInterface<W>::Write; - bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; - if (!ops.SendMessage(msg, options).ok()) { - return false; - } - if (!ctx_->sent_initial_metadata_) { - ops.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops); - } - - private: - Call* const call_; - ServerContext* const ctx_; -}; - -/// Server-side interface for bi-directional streaming. -template <class W, class R> -class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>, - public ReaderInterface<R> { - public: - ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} - - void SendInitialMetadata() { - GPR_ASSERT(!ctx_->sent_initial_metadata_); - - CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&ops); - call_->cq()->Pluck(&ops); - } - - bool Read(R* msg) GRPC_OVERRIDE { - CallOpSet<CallOpRecvMessage<R>> ops; - ops.RecvMessage(msg); - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops) && ops.got_message; - } - - using WriterInterface<W>::Write; - bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; - if (!ops.SendMessage(msg, options).ok()) { - return false; - } - if (!ctx_->sent_initial_metadata_) { - ops.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops); - } - - private: - Call* const call_; - ServerContext* const ctx_; -}; - -} // namespace grpc +#include <grpc++/impl/codegen/sync_stream.h> #endif // GRPCXX_SUPPORT_SYNC_STREAM_H |