aboutsummaryrefslogtreecommitdiffhomepage
path: root/include/grpc++
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2016-01-27 19:21:12 -0800
committerGravatar David Garcia Quintas <dgq@google.com>2016-01-27 19:21:12 -0800
commit6bd7b97dc63dff65616ec3cdd145feb9b8bb104d (patch)
treecf1c4bb68eee30014f4e7981b94be1e57610e5b5 /include/grpc++
parente1300deb87b5fca2b4361a753d0bd4d19b078ea4 (diff)
DONE!!1one
Diffstat (limited to 'include/grpc++')
-rw-r--r--include/grpc++/completion_queue.h165
-rw-r--r--include/grpc++/impl/client_unary_call.h38
-rw-r--r--include/grpc++/impl/codegen/async_unary_call.h159
-rw-r--r--include/grpc++/impl/codegen/client_unary_call.h76
-rw-r--r--include/grpc++/impl/codegen/completion_queue.h202
-rw-r--r--include/grpc++/impl/codegen/method_handler_impl.h211
-rw-r--r--include/grpc++/impl/codegen/rpc_method.h73
-rw-r--r--include/grpc++/impl/codegen/rpc_service_method.h92
-rw-r--r--include/grpc++/impl/codegen/server_interface.h4
-rw-r--r--include/grpc++/impl/codegen/sync_stream.h416
-rw-r--r--include/grpc++/impl/method_handler_impl.h168
-rw-r--r--include/grpc++/impl/rpc_method.h36
-rw-r--r--include/grpc++/impl/rpc_service_method.h55
-rw-r--r--include/grpc++/support/async_unary_call.h119
-rw-r--r--include/grpc++/support/sync_stream.h379
15 files changed, 1239 insertions, 954 deletions
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h
index 911d7919e1..f53f5ee56c 100644
--- a/include/grpc++/completion_queue.h
+++ b/include/grpc++/completion_queue.h
@@ -31,172 +31,9 @@
*
*/
-/// A completion queue implements a concurrent producer-consumer queue, with two
-/// main methods, \a Next and \a AsyncNext.
#ifndef GRPCXX_COMPLETION_QUEUE_H
#define GRPCXX_COMPLETION_QUEUE_H
-#include <grpc++/impl/codegen/grpc_library.h>
-#include <grpc++/impl/codegen/status.h>
-#include <grpc++/impl/codegen/time.h>
-
-struct grpc_completion_queue;
-
-namespace grpc {
-
-template <class R>
-class ClientReader;
-template <class W>
-class ClientWriter;
-template <class W, class R>
-class ClientReaderWriter;
-template <class R>
-class ServerReader;
-template <class W>
-class ServerWriter;
-template <class W, class R>
-class ServerReaderWriter;
-template <class ServiceType, class RequestType, class ResponseType>
-class RpcMethodHandler;
-template <class ServiceType, class RequestType, class ResponseType>
-class ClientStreamingHandler;
-template <class ServiceType, class RequestType, class ResponseType>
-class ServerStreamingHandler;
-template <class ServiceType, class RequestType, class ResponseType>
-class BidiStreamingHandler;
-class UnknownMethodHandler;
-
-class Channel;
-class ChannelInterface;
-class ClientContext;
-class CompletionQueueTag;
-class CompletionQueue;
-class RpcMethod;
-class Server;
-class ServerBuilder;
-class ServerContext;
-
-/// A thin wrapper around \a grpc_completion_queue (see / \a
-/// src/core/surface/completion_queue.h).
-class CompletionQueue : private GrpcLibrary {
- public:
- /// Default constructor. Implicitly creates a \a grpc_completion_queue
- /// instance.
- CompletionQueue();
-
- /// Wrap \a take, taking ownership of the instance.
- ///
- /// \param take The completion queue instance to wrap. Ownership is taken.
- explicit CompletionQueue(grpc_completion_queue* take);
-
- /// Destructor. Destroys the owned wrapped completion queue / instance.
- ~CompletionQueue();
-
- /// Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT.
- enum NextStatus {
- SHUTDOWN, ///< The completion queue has been shutdown.
- GOT_EVENT, ///< Got a new event; \a tag will be filled in with its
- ///< associated value; \a ok indicating its success.
- TIMEOUT ///< deadline was reached.
- };
-
- /// Read from the queue, blocking up to \a deadline (or the queue's shutdown).
- /// Both \a tag and \a ok are updated upon success (if an event is available
- /// within the \a deadline). A \a tag points to an arbitrary location usually
- /// employed to uniquely identify an event.
- ///
- /// \param tag[out] Upon sucess, updated to point to the event's tag.
- /// \param ok[out] Upon sucess, true if read a regular event, false otherwise.
- /// \param deadline[in] How long to block in wait for an event.
- ///
- /// \return The type of event read.
- template <typename T>
- NextStatus AsyncNext(void** tag, bool* ok, const T& deadline) {
- TimePoint<T> deadline_tp(deadline);
- return AsyncNextInternal(tag, ok, deadline_tp.raw_time());
- }
-
- /// Read from the queue, blocking until an event is available or the queue is
- /// shutting down.
- ///
- /// \param tag[out] Updated to point to the read event's tag.
- /// \param ok[out] true if read a regular event, false otherwise.
- ///
- /// \return true if read a regular event, false if the queue is shutting down.
- bool Next(void** tag, bool* ok) {
- return (AsyncNextInternal(tag, ok, gpr_inf_future(GPR_CLOCK_REALTIME)) !=
- SHUTDOWN);
- }
-
- /// Request the shutdown of the queue.
- ///
- /// \warning This method must be called at some point. Once invoked, \a Next
- /// will start to return false and \a AsyncNext will return \a
- /// NextStatus::SHUTDOWN. Only once either one of these methods does that
- /// (that is, once the queue has been \em drained) can an instance of this
- /// class be destroyed.
- void Shutdown();
-
- /// Returns a \em raw pointer to the underlying \a grpc_completion_queue
- /// instance.
- ///
- /// \warning Remember that the returned instance is owned. No transfer of
- /// owership is performed.
- grpc_completion_queue* cq() { return cq_; }
-
- private:
- // Friend synchronous wrappers so that they can access Pluck(), which is
- // a semi-private API geared towards the synchronous implementation.
- template <class R>
- friend class ::grpc::ClientReader;
- template <class W>
- friend class ::grpc::ClientWriter;
- template <class W, class R>
- friend class ::grpc::ClientReaderWriter;
- template <class R>
- friend class ::grpc::ServerReader;
- template <class W>
- friend class ::grpc::ServerWriter;
- template <class W, class R>
- friend class ::grpc::ServerReaderWriter;
- template <class ServiceType, class RequestType, class ResponseType>
- friend class RpcMethodHandler;
- template <class ServiceType, class RequestType, class ResponseType>
- friend class ClientStreamingHandler;
- template <class ServiceType, class RequestType, class ResponseType>
- friend class ServerStreamingHandler;
- template <class ServiceType, class RequestType, class ResponseType>
- friend class BidiStreamingHandler;
- friend class UnknownMethodHandler;
- friend class ::grpc::Server;
- friend class ::grpc::ServerContext;
- template <class InputMessage, class OutputMessage>
- friend Status BlockingUnaryCall(ChannelInterface* channel,
- const RpcMethod& method,
- ClientContext* context,
- const InputMessage& request,
- OutputMessage* result);
-
- NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline);
-
- /// Wraps \a grpc_completion_queue_pluck.
- /// \warning Must not be mixed with calls to \a Next.
- bool Pluck(CompletionQueueTag* tag);
-
- /// Performs a single polling pluck on \a tag.
- void TryPluck(CompletionQueueTag* tag);
-
- grpc_completion_queue* cq_; // owned
-};
-
-/// A specific type of completion queue used by the processing of notifications
-/// by servers. Instantiated by \a ServerBuilder.
-class ServerCompletionQueue : public CompletionQueue {
- private:
- friend class ServerBuilder;
- ServerCompletionQueue() {}
-};
-
-} // namespace grpc
+#include <grpc++/impl/codegen/completion_queue.h>
#endif // GRPCXX_COMPLETION_QUEUE_H
diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h
index 9017bf5e35..abe321eacc 100644
--- a/include/grpc++/impl/client_unary_call.h
+++ b/include/grpc++/impl/client_unary_call.h
@@ -34,42 +34,6 @@
#ifndef GRPCXX_IMPL_CLIENT_UNARY_CALL_H
#define GRPCXX_IMPL_CLIENT_UNARY_CALL_H
-#include <grpc++/impl/codegen/call.h>
-#include <grpc++/impl/codegen/channel_interface.h>
-#include <grpc++/impl/codegen/config.h>
-#include <grpc++/impl/codegen/status.h>
-
-namespace grpc {
-
-class Channel;
-class ClientContext;
-class CompletionQueue;
-class RpcMethod;
-
-// Wrapper that performs a blocking unary call
-template <class InputMessage, class OutputMessage>
-Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context, const InputMessage& request,
- OutputMessage* result) {
- CompletionQueue cq;
- Call call(channel->CreateCall(method, context, &cq));
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>,
- CallOpClientSendClose, CallOpClientRecvStatus> ops;
- Status status = ops.SendMessage(request);
- if (!status.ok()) {
- return status;
- }
- ops.SendInitialMetadata(context->send_initial_metadata_);
- ops.RecvInitialMetadata(context);
- ops.RecvMessage(result);
- ops.ClientSendClose();
- ops.ClientRecvStatus(context, &status);
- call.PerformOps(&ops);
- GPR_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.ok());
- return status;
-}
-
-} // namespace grpc
+#include <grpc++/impl/codegen/client_unary_call.h>
#endif // GRPCXX_IMPL_CLIENT_UNARY_CALL_H
diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h
new file mode 100644
index 0000000000..39e18415ac
--- /dev/null
+++ b/include/grpc++/impl/codegen/async_unary_call.h
@@ -0,0 +1,159 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H
+#define GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H
+
+#include <grpc++/impl/codegen/call.h>
+#include <grpc++/impl/codegen/channel_interface.h>
+#include <grpc++/impl/codegen/client_context.h>
+#include <grpc++/impl/codegen/server_context.h>
+#include <grpc++/impl/codegen/service_type.h>
+#include <grpc++/impl/codegen/status.h>
+#include <grpc/impl/codegen/log.h>
+
+namespace grpc {
+
+class CompletionQueue;
+
+template <class R>
+class ClientAsyncResponseReaderInterface {
+ public:
+ virtual ~ClientAsyncResponseReaderInterface() {}
+ virtual void ReadInitialMetadata(void* tag) = 0;
+ virtual void Finish(R* msg, Status* status, void* tag) = 0;
+};
+
+template <class R>
+class ClientAsyncResponseReader GRPC_FINAL
+ : public ClientAsyncResponseReaderInterface<R> {
+ public:
+ template <class W>
+ ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
+ const RpcMethod& method, ClientContext* context,
+ const W& request)
+ : context_(context), call_(channel->CreateCall(method, context, cq)) {
+ init_buf_.SendInitialMetadata(context->send_initial_metadata_);
+ // TODO(ctiller): don't assert
+ GPR_ASSERT(init_buf_.SendMessage(request).ok());
+ init_buf_.ClientSendClose();
+ call_.PerformOps(&init_buf_);
+ }
+
+ void ReadInitialMetadata(void* tag) {
+ GPR_ASSERT(!context_->initial_metadata_received_);
+
+ meta_buf_.set_output_tag(tag);
+ meta_buf_.RecvInitialMetadata(context_);
+ call_.PerformOps(&meta_buf_);
+ }
+
+ void Finish(R* msg, Status* status, void* tag) {
+ finish_buf_.set_output_tag(tag);
+ if (!context_->initial_metadata_received_) {
+ finish_buf_.RecvInitialMetadata(context_);
+ }
+ finish_buf_.RecvMessage(msg);
+ finish_buf_.ClientRecvStatus(context_, status);
+ call_.PerformOps(&finish_buf_);
+ }
+
+ private:
+ ClientContext* context_;
+ Call call_;
+ SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpClientSendClose>
+ init_buf_;
+ CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>,
+ CallOpClientRecvStatus>
+ finish_buf_;
+};
+
+template <class W>
+class ServerAsyncResponseWriter GRPC_FINAL
+ : public ServerAsyncStreamingInterface {
+ public:
+ explicit ServerAsyncResponseWriter(ServerContext* ctx)
+ : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
+
+ void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ meta_buf_.set_output_tag(tag);
+ meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_.PerformOps(&meta_buf_);
+ }
+
+ void Finish(const W& msg, const Status& status, void* tag) {
+ finish_buf_.set_output_tag(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ // The response is dropped if the status is not OK.
+ if (status.ok()) {
+ finish_buf_.ServerSendStatus(ctx_->trailing_metadata_,
+ finish_buf_.SendMessage(msg));
+ } else {
+ finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ }
+ call_.PerformOps(&finish_buf_);
+ }
+
+ void FinishWithError(const Status& status, void* tag) {
+ GPR_ASSERT(!status.ok());
+ finish_buf_.set_output_tag(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_buf_);
+ }
+
+ private:
+ void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
+
+ Call call_;
+ ServerContext* ctx_;
+ CallOpSet<CallOpSendInitialMetadata> meta_buf_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus>
+ finish_buf_;
+};
+
+} // namespace grpc
+
+#endif // GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H
diff --git a/include/grpc++/impl/codegen/client_unary_call.h b/include/grpc++/impl/codegen/client_unary_call.h
new file mode 100644
index 0000000000..817a98ac1f
--- /dev/null
+++ b/include/grpc++/impl/codegen/client_unary_call.h
@@ -0,0 +1,76 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPCXX_IMPL_CODEGEN_CLIENT_UNARY_CALL_H
+#define GRPCXX_IMPL_CODEGEN_CLIENT_UNARY_CALL_H
+
+#include <grpc++/impl/codegen/call.h>
+#include <grpc++/impl/codegen/channel_interface.h>
+#include <grpc++/impl/codegen/config.h>
+#include <grpc++/impl/codegen/status.h>
+
+namespace grpc {
+
+class Channel;
+class ClientContext;
+class CompletionQueue;
+class RpcMethod;
+
+// Wrapper that performs a blocking unary call
+template <class InputMessage, class OutputMessage>
+Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
+ ClientContext* context, const InputMessage& request,
+ OutputMessage* result) {
+ CompletionQueue cq;
+ Call call(channel->CreateCall(method, context, &cq));
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>,
+ CallOpClientSendClose, CallOpClientRecvStatus>
+ ops;
+ Status status = ops.SendMessage(request);
+ if (!status.ok()) {
+ return status;
+ }
+ ops.SendInitialMetadata(context->send_initial_metadata_);
+ ops.RecvInitialMetadata(context);
+ ops.RecvMessage(result);
+ ops.ClientSendClose();
+ ops.ClientRecvStatus(context, &status);
+ call.PerformOps(&ops);
+ GPR_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.ok());
+ return status;
+}
+
+} // namespace grpc
+
+#endif // GRPCXX_IMPL_CODEGEN_CLIENT_UNARY_CALL_H
diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h
new file mode 100644
index 0000000000..102831e1c9
--- /dev/null
+++ b/include/grpc++/impl/codegen/completion_queue.h
@@ -0,0 +1,202 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/// A completion queue implements a concurrent producer-consumer queue, with two
+/// main methods, \a Next and \a AsyncNext.
+#ifndef GRPCXX_IMPL_CODEGEN_COMPLETION_QUEUE_H
+#define GRPCXX_IMPL_CODEGEN_COMPLETION_QUEUE_H
+
+#include <grpc++/impl/codegen/grpc_library.h>
+#include <grpc++/impl/codegen/status.h>
+#include <grpc++/impl/codegen/time.h>
+
+struct grpc_completion_queue;
+
+namespace grpc {
+
+template <class R>
+class ClientReader;
+template <class W>
+class ClientWriter;
+template <class W, class R>
+class ClientReaderWriter;
+template <class R>
+class ServerReader;
+template <class W>
+class ServerWriter;
+template <class W, class R>
+class ServerReaderWriter;
+template <class ServiceType, class RequestType, class ResponseType>
+class RpcMethodHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class ClientStreamingHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class ServerStreamingHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class BidiStreamingHandler;
+class UnknownMethodHandler;
+
+class Channel;
+class ChannelInterface;
+class ClientContext;
+class CompletionQueueTag;
+class CompletionQueue;
+class RpcMethod;
+class Server;
+class ServerBuilder;
+class ServerContext;
+
+/// A thin wrapper around \a grpc_completion_queue (see / \a
+/// src/core/surface/completion_queue.h).
+class CompletionQueue : private GrpcLibrary {
+ public:
+ /// Default constructor. Implicitly creates a \a grpc_completion_queue
+ /// instance.
+ CompletionQueue();
+
+ /// Wrap \a take, taking ownership of the instance.
+ ///
+ /// \param take The completion queue instance to wrap. Ownership is taken.
+ explicit CompletionQueue(grpc_completion_queue* take);
+
+ /// Destructor. Destroys the owned wrapped completion queue / instance.
+ ~CompletionQueue();
+
+ /// Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT.
+ enum NextStatus {
+ SHUTDOWN, ///< The completion queue has been shutdown.
+ GOT_EVENT, ///< Got a new event; \a tag will be filled in with its
+ ///< associated value; \a ok indicating its success.
+ TIMEOUT ///< deadline was reached.
+ };
+
+ /// Read from the queue, blocking up to \a deadline (or the queue's shutdown).
+ /// Both \a tag and \a ok are updated upon success (if an event is available
+ /// within the \a deadline). A \a tag points to an arbitrary location usually
+ /// employed to uniquely identify an event.
+ ///
+ /// \param tag[out] Upon sucess, updated to point to the event's tag.
+ /// \param ok[out] Upon sucess, true if read a regular event, false otherwise.
+ /// \param deadline[in] How long to block in wait for an event.
+ ///
+ /// \return The type of event read.
+ template <typename T>
+ NextStatus AsyncNext(void** tag, bool* ok, const T& deadline) {
+ TimePoint<T> deadline_tp(deadline);
+ return AsyncNextInternal(tag, ok, deadline_tp.raw_time());
+ }
+
+ /// Read from the queue, blocking until an event is available or the queue is
+ /// shutting down.
+ ///
+ /// \param tag[out] Updated to point to the read event's tag.
+ /// \param ok[out] true if read a regular event, false otherwise.
+ ///
+ /// \return true if read a regular event, false if the queue is shutting down.
+ bool Next(void** tag, bool* ok) {
+ return (AsyncNextInternal(tag, ok, gpr_inf_future(GPR_CLOCK_REALTIME)) !=
+ SHUTDOWN);
+ }
+
+ /// Request the shutdown of the queue.
+ ///
+ /// \warning This method must be called at some point. Once invoked, \a Next
+ /// will start to return false and \a AsyncNext will return \a
+ /// NextStatus::SHUTDOWN. Only once either one of these methods does that
+ /// (that is, once the queue has been \em drained) can an instance of this
+ /// class be destroyed.
+ void Shutdown();
+
+ /// Returns a \em raw pointer to the underlying \a grpc_completion_queue
+ /// instance.
+ ///
+ /// \warning Remember that the returned instance is owned. No transfer of
+ /// owership is performed.
+ grpc_completion_queue* cq() { return cq_; }
+
+ private:
+ // Friend synchronous wrappers so that they can access Pluck(), which is
+ // a semi-private API geared towards the synchronous implementation.
+ template <class R>
+ friend class ::grpc::ClientReader;
+ template <class W>
+ friend class ::grpc::ClientWriter;
+ template <class W, class R>
+ friend class ::grpc::ClientReaderWriter;
+ template <class R>
+ friend class ::grpc::ServerReader;
+ template <class W>
+ friend class ::grpc::ServerWriter;
+ template <class W, class R>
+ friend class ::grpc::ServerReaderWriter;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class RpcMethodHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class ClientStreamingHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class ServerStreamingHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class BidiStreamingHandler;
+ friend class UnknownMethodHandler;
+ friend class ::grpc::Server;
+ friend class ::grpc::ServerContext;
+ template <class InputMessage, class OutputMessage>
+ friend Status BlockingUnaryCall(ChannelInterface* channel,
+ const RpcMethod& method,
+ ClientContext* context,
+ const InputMessage& request,
+ OutputMessage* result);
+
+ NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline);
+
+ /// Wraps \a grpc_completion_queue_pluck.
+ /// \warning Must not be mixed with calls to \a Next.
+ bool Pluck(CompletionQueueTag* tag);
+
+ /// Performs a single polling pluck on \a tag.
+ void TryPluck(CompletionQueueTag* tag);
+
+ grpc_completion_queue* cq_; // owned
+};
+
+/// A specific type of completion queue used by the processing of notifications
+/// by servers. Instantiated by \a ServerBuilder.
+class ServerCompletionQueue : public CompletionQueue {
+ private:
+ friend class ServerBuilder;
+ ServerCompletionQueue() {}
+};
+
+} // namespace grpc
+
+#endif // GRPCXX_IMPL_CODEGEN_COMPLETION_QUEUE_H
diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h
new file mode 100644
index 0000000000..ad65ce9484
--- /dev/null
+++ b/include/grpc++/impl/codegen/method_handler_impl.h
@@ -0,0 +1,211 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
+#define GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
+
+#include <grpc++/impl/codegen/rpc_service_method.h>
+#include <grpc++/impl/codegen/sync_stream.h>
+
+namespace grpc {
+
+// A wrapper class of an application provided rpc method handler.
+template <class ServiceType, class RequestType, class ResponseType>
+class RpcMethodHandler : public MethodHandler {
+ public:
+ RpcMethodHandler(std::function<Status(ServiceType*, ServerContext*,
+ const RequestType*, ResponseType*)>
+ func,
+ ServiceType* service)
+ : func_(func), service_(service) {}
+
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ RequestType req;
+ Status status = SerializationTraits<RequestType>::Deserialize(
+ param.request, &req, param.max_message_size);
+ ResponseType rsp;
+ if (status.ok()) {
+ status = func_(service_, param.server_context, &req, &rsp);
+ }
+
+ GPR_ASSERT(!param.server_context->sent_initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus>
+ ops;
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ if (status.ok()) {
+ status = ops.SendMessage(rsp);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
+ }
+
+ private:
+ // Application provided rpc handler function.
+ std::function<Status(ServiceType*, ServerContext*, const RequestType*,
+ ResponseType*)>
+ func_;
+ // The class the above handler function lives in.
+ ServiceType* service_;
+};
+
+// A wrapper class of an application provided client streaming handler.
+template <class ServiceType, class RequestType, class ResponseType>
+class ClientStreamingHandler : public MethodHandler {
+ public:
+ ClientStreamingHandler(
+ std::function<Status(ServiceType*, ServerContext*,
+ ServerReader<RequestType>*, ResponseType*)>
+ func,
+ ServiceType* service)
+ : func_(func), service_(service) {}
+
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ ServerReader<RequestType> reader(param.call, param.server_context);
+ ResponseType rsp;
+ Status status = func_(service_, param.server_context, &reader, &rsp);
+
+ GPR_ASSERT(!param.server_context->sent_initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus>
+ ops;
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ if (status.ok()) {
+ status = ops.SendMessage(rsp);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
+ }
+
+ private:
+ std::function<Status(ServiceType*, ServerContext*, ServerReader<RequestType>*,
+ ResponseType*)>
+ func_;
+ ServiceType* service_;
+};
+
+// A wrapper class of an application provided server streaming handler.
+template <class ServiceType, class RequestType, class ResponseType>
+class ServerStreamingHandler : public MethodHandler {
+ public:
+ ServerStreamingHandler(
+ std::function<Status(ServiceType*, ServerContext*, const RequestType*,
+ ServerWriter<ResponseType>*)>
+ func,
+ ServiceType* service)
+ : func_(func), service_(service) {}
+
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ RequestType req;
+ Status status = SerializationTraits<RequestType>::Deserialize(
+ param.request, &req, param.max_message_size);
+
+ if (status.ok()) {
+ ServerWriter<ResponseType> writer(param.call, param.server_context);
+ status = func_(service_, param.server_context, &req, &writer);
+ }
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
+ if (!param.server_context->sent_initial_metadata_) {
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
+ }
+
+ private:
+ std::function<Status(ServiceType*, ServerContext*, const RequestType*,
+ ServerWriter<ResponseType>*)>
+ func_;
+ ServiceType* service_;
+};
+
+// A wrapper class of an application provided bidi-streaming handler.
+template <class ServiceType, class RequestType, class ResponseType>
+class BidiStreamingHandler : public MethodHandler {
+ public:
+ BidiStreamingHandler(
+ std::function<Status(ServiceType*, ServerContext*,
+ ServerReaderWriter<ResponseType, RequestType>*)>
+ func,
+ ServiceType* service)
+ : func_(func), service_(service) {}
+
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ ServerReaderWriter<ResponseType, RequestType> stream(param.call,
+ param.server_context);
+ Status status = func_(service_, param.server_context, &stream);
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
+ if (!param.server_context->sent_initial_metadata_) {
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
+ }
+
+ private:
+ std::function<Status(ServiceType*, ServerContext*,
+ ServerReaderWriter<ResponseType, RequestType>*)>
+ func_;
+ ServiceType* service_;
+};
+
+// Handle unknown method by returning UNIMPLEMENTED error.
+class UnknownMethodHandler : public MethodHandler {
+ public:
+ template <class T>
+ static void FillOps(ServerContext* context, T* ops) {
+ Status status(StatusCode::UNIMPLEMENTED, "");
+ if (!context->sent_initial_metadata_) {
+ ops->SendInitialMetadata(context->initial_metadata_);
+ context->sent_initial_metadata_ = true;
+ }
+ ops->ServerSendStatus(context->trailing_metadata_, status);
+ }
+
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
+ FillOps(param.server_context, &ops);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
+ }
+};
+
+} // namespace grpc
+
+#endif // GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
diff --git a/include/grpc++/impl/codegen/rpc_method.h b/include/grpc++/impl/codegen/rpc_method.h
new file mode 100644
index 0000000000..85d5c1cfe2
--- /dev/null
+++ b/include/grpc++/impl/codegen/rpc_method.h
@@ -0,0 +1,73 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPCXX_IMPL_CODEGEN_RPC_METHOD_H
+#define GRPCXX_IMPL_CODEGEN_RPC_METHOD_H
+
+#include <memory>
+
+#include <grpc++/impl/codegen/channel_interface.h>
+
+namespace grpc {
+
+class RpcMethod {
+ public:
+ enum RpcType {
+ NORMAL_RPC = 0,
+ CLIENT_STREAMING, // request streaming
+ SERVER_STREAMING, // response streaming
+ BIDI_STREAMING
+ };
+
+ RpcMethod(const char* name, RpcType type)
+ : name_(name), method_type_(type), channel_tag_(NULL) {}
+
+ RpcMethod(const char* name, RpcType type,
+ const std::shared_ptr<ChannelInterface>& channel)
+ : name_(name),
+ method_type_(type),
+ channel_tag_(channel->RegisterMethod(name)) {}
+
+ const char* name() const { return name_; }
+ RpcType method_type() const { return method_type_; }
+ void* channel_tag() const { return channel_tag_; }
+
+ private:
+ const char* const name_;
+ const RpcType method_type_;
+ void* const channel_tag_;
+};
+
+} // namespace grpc
+
+#endif // GRPCXX_IMPL_CODEGEN_RPC_METHOD_H
diff --git a/include/grpc++/impl/codegen/rpc_service_method.h b/include/grpc++/impl/codegen/rpc_service_method.h
new file mode 100644
index 0000000000..519d942fc4
--- /dev/null
+++ b/include/grpc++/impl/codegen/rpc_service_method.h
@@ -0,0 +1,92 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPCXX_IMPL_CODEGEN_RPC_SERVICE_METHOD_H
+#define GRPCXX_IMPL_CODEGEN_RPC_SERVICE_METHOD_H
+
+#include <climits>
+#include <functional>
+#include <map>
+#include <memory>
+#include <vector>
+
+#include <grpc++/impl/codegen/config.h>
+#include <grpc++/impl/codegen/rpc_method.h>
+#include <grpc++/impl/codegen/status.h>
+
+namespace grpc {
+class ServerContext;
+class StreamContextInterface;
+
+// Base class for running an RPC handler.
+class MethodHandler {
+ public:
+ virtual ~MethodHandler() {}
+ struct HandlerParameter {
+ HandlerParameter(Call* c, ServerContext* context, grpc_byte_buffer* req,
+ int max_size)
+ : call(c),
+ server_context(context),
+ request(req),
+ max_message_size(max_size) {}
+ Call* call;
+ ServerContext* server_context;
+ // Handler required to grpc_byte_buffer_destroy this
+ grpc_byte_buffer* request;
+ int max_message_size;
+ };
+ virtual void RunHandler(const HandlerParameter& param) = 0;
+};
+
+// Server side rpc method class
+class RpcServiceMethod : public RpcMethod {
+ public:
+ // Takes ownership of the handler
+ RpcServiceMethod(const char* name, RpcMethod::RpcType type,
+ MethodHandler* handler)
+ : RpcMethod(name, type), server_tag_(nullptr), handler_(handler) {}
+
+ void set_server_tag(void* tag) { server_tag_ = tag; }
+ void* server_tag() const { return server_tag_; }
+ // if MethodHandler is nullptr, then this is an async method
+ MethodHandler* handler() const { return handler_.get(); }
+ void ResetHandler() { handler_.reset(); }
+
+ private:
+ void* server_tag_;
+ std::unique_ptr<MethodHandler> handler_;
+};
+
+} // namespace grpc
+
+#endif // GRPCXX_IMPL_CODEGEN_RPC_SERVICE_METHOD_H
diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h
index b0bc10dc6d..96934edff3 100644
--- a/include/grpc++/impl/codegen/server_interface.h
+++ b/include/grpc++/impl/codegen/server_interface.h
@@ -34,8 +34,9 @@
#ifndef GRPCXX_IMPL_CODEGEN_SERVER_INTERFACE_H
#define GRPCXX_IMPL_CODEGEN_SERVER_INTERFACE_H
-#include <grpc++/impl/codegen/completion_queue_tag.h>
#include <grpc++/impl/codegen/call_hook.h>
+#include <grpc++/impl/codegen/completion_queue_tag.h>
+#include <grpc++/impl/codegen/rpc_service_method.h>
namespace grpc {
@@ -43,7 +44,6 @@ class AsyncGenericService;
class AsynchronousService;
class GenericServerContext;
class RpcService;
-class RpcServiceMethod;
class ServerAsyncStreamingInterface;
class ServerCompletionQueue;
class ServerContext;
diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h
new file mode 100644
index 0000000000..b8cd44fb09
--- /dev/null
+++ b/include/grpc++/impl/codegen/sync_stream.h
@@ -0,0 +1,416 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
+#define GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
+
+#include <grpc++/impl/codegen/call.h>
+#include <grpc++/impl/codegen/channel_interface.h>
+#include <grpc++/impl/codegen/client_context.h>
+#include <grpc++/impl/codegen/completion_queue.h>
+#include <grpc++/impl/codegen/server_context.h>
+#include <grpc++/impl/codegen/service_type.h>
+#include <grpc++/impl/codegen/status.h>
+#include <grpc/impl/codegen/log.h>
+
+namespace grpc {
+
+/// Common interface for all synchronous client side streaming.
+class ClientStreamingInterface {
+ public:
+ virtual ~ClientStreamingInterface() {}
+
+ /// Wait until the stream finishes, and return the final status. When the
+ /// client side declares it has no more message to send, either implicitly or
+ /// by calling \a WritesDone(), it needs to make sure there is no more message
+ /// to be received from the server, either implicitly or by getting a false
+ /// from a \a Read().
+ ///
+ /// This function will return either:
+ /// - when all incoming messages have been read and the server has returned
+ /// status.
+ /// - OR when the server has returned a non-OK status.
+ virtual Status Finish() = 0;
+};
+
+/// An interface that yields a sequence of messages of type \a R.
+template <class R>
+class ReaderInterface {
+ public:
+ virtual ~ReaderInterface() {}
+
+ /// Blocking read a message and parse to \a msg. Returns \a true on success.
+ ///
+ /// \param[out] msg The read message.
+ ///
+ /// \return \a false when there will be no more incoming messages, either
+ /// because the other side has called \a WritesDone() or the stream has failed
+ /// (or been cancelled).
+ virtual bool Read(R* msg) = 0;
+};
+
+/// An interface that can be fed a sequence of messages of type \a W.
+template <class W>
+class WriterInterface {
+ public:
+ virtual ~WriterInterface() {}
+
+ /// Blocking write \a msg to the stream with options.
+ ///
+ /// \param msg The message to be written to the stream.
+ /// \param options Options affecting the write operation.
+ ///
+ /// \return \a true on success, \a false when the stream has been closed.
+ virtual bool Write(const W& msg, const WriteOptions& options) = 0;
+
+ /// Blocking write \a msg to the stream with default options.
+ ///
+ /// \param msg The message to be written to the stream.
+ ///
+ /// \return \a true on success, \a false when the stream has been closed.
+ inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
+};
+
+/// Client-side interface for streaming reads of message of type \a R.
+template <class R>
+class ClientReaderInterface : public ClientStreamingInterface,
+ public ReaderInterface<R> {
+ public:
+ /// Blocking wait for initial metadata from server. The received metadata
+ /// can only be accessed after this call returns. Should only be called before
+ /// the first read. Calling this method is optional, and if it is not called
+ /// the metadata will be available in ClientContext after the first read.
+ virtual void WaitForInitialMetadata() = 0;
+};
+
+template <class R>
+class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
+ public:
+ /// Blocking create a stream and write the first request out.
+ template <class W>
+ ClientReader(ChannelInterface* channel, const RpcMethod& method,
+ ClientContext* context, const W& request)
+ : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpClientSendClose>
+ ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_);
+ // TODO(ctiller): don't assert
+ GPR_ASSERT(ops.SendMessage(request).ok());
+ ops.ClientSendClose();
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
+ }
+
+ void WaitForInitialMetadata() GRPC_OVERRIDE {
+ GPR_ASSERT(!context_->initial_metadata_received_);
+
+ CallOpSet<CallOpRecvInitialMetadata> ops;
+ ops.RecvInitialMetadata(context_);
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops); /// status ignored
+ }
+
+ bool Read(R* msg) GRPC_OVERRIDE {
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
+ if (!context_->initial_metadata_received_) {
+ ops.RecvInitialMetadata(context_);
+ }
+ ops.RecvMessage(msg);
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops) && ops.got_message;
+ }
+
+ Status Finish() GRPC_OVERRIDE {
+ CallOpSet<CallOpClientRecvStatus> ops;
+ Status status;
+ ops.ClientRecvStatus(context_, &status);
+ call_.PerformOps(&ops);
+ GPR_ASSERT(cq_.Pluck(&ops));
+ return status;
+ }
+
+ private:
+ ClientContext* context_;
+ CompletionQueue cq_;
+ Call call_;
+};
+
+/// Client-side interface for streaming writes of message of type \a W.
+template <class W>
+class ClientWriterInterface : public ClientStreamingInterface,
+ public WriterInterface<W> {
+ public:
+ /// Half close writing from the client.
+ /// Block until writes are completed.
+ ///
+ /// \return Whether the writes were successful.
+ virtual bool WritesDone() = 0;
+};
+
+template <class W>
+class ClientWriter : public ClientWriterInterface<W> {
+ public:
+ /// Blocking create a stream.
+ template <class R>
+ ClientWriter(ChannelInterface* channel, const RpcMethod& method,
+ ClientContext* context, R* response)
+ : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
+ finish_ops_.RecvMessage(response);
+
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_);
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
+ }
+
+ using WriterInterface<W>::Write;
+ bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
+ CallOpSet<CallOpSendMessage> ops;
+ if (!ops.SendMessage(msg, options).ok()) {
+ return false;
+ }
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops);
+ }
+
+ bool WritesDone() GRPC_OVERRIDE {
+ CallOpSet<CallOpClientSendClose> ops;
+ ops.ClientSendClose();
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops);
+ }
+
+ /// Read the final response and wait for the final status.
+ Status Finish() GRPC_OVERRIDE {
+ Status status;
+ finish_ops_.ClientRecvStatus(context_, &status);
+ call_.PerformOps(&finish_ops_);
+ GPR_ASSERT(cq_.Pluck(&finish_ops_));
+ return status;
+ }
+
+ private:
+ ClientContext* context_;
+ CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
+ CompletionQueue cq_;
+ Call call_;
+};
+
+/// Client-side interface for bi-directional streaming.
+template <class W, class R>
+class ClientReaderWriterInterface : public ClientStreamingInterface,
+ public WriterInterface<W>,
+ public ReaderInterface<R> {
+ public:
+ /// Blocking wait for initial metadata from server. The received metadata
+ /// can only be accessed after this call returns. Should only be called before
+ /// the first read. Calling this method is optional, and if it is not called
+ /// the metadata will be available in ClientContext after the first read.
+ virtual void WaitForInitialMetadata() = 0;
+
+ /// Block until writes are completed.
+ ///
+ /// \return Whether the writes were successful.
+ virtual bool WritesDone() = 0;
+};
+
+template <class W, class R>
+class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
+ public:
+ /// Blocking create a stream.
+ ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
+ ClientContext* context)
+ : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_);
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
+ }
+
+ void WaitForInitialMetadata() GRPC_OVERRIDE {
+ GPR_ASSERT(!context_->initial_metadata_received_);
+
+ CallOpSet<CallOpRecvInitialMetadata> ops;
+ ops.RecvInitialMetadata(context_);
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops); // status ignored
+ }
+
+ bool Read(R* msg) GRPC_OVERRIDE {
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
+ if (!context_->initial_metadata_received_) {
+ ops.RecvInitialMetadata(context_);
+ }
+ ops.RecvMessage(msg);
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops) && ops.got_message;
+ }
+
+ using WriterInterface<W>::Write;
+ bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
+ CallOpSet<CallOpSendMessage> ops;
+ if (!ops.SendMessage(msg, options).ok()) return false;
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops);
+ }
+
+ bool WritesDone() GRPC_OVERRIDE {
+ CallOpSet<CallOpClientSendClose> ops;
+ ops.ClientSendClose();
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops);
+ }
+
+ Status Finish() GRPC_OVERRIDE {
+ CallOpSet<CallOpClientRecvStatus> ops;
+ Status status;
+ ops.ClientRecvStatus(context_, &status);
+ call_.PerformOps(&ops);
+ GPR_ASSERT(cq_.Pluck(&ops));
+ return status;
+ }
+
+ private:
+ ClientContext* context_;
+ CompletionQueue cq_;
+ Call call_;
+};
+
+template <class R>
+class ServerReader GRPC_FINAL : public ReaderInterface<R> {
+ public:
+ ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
+
+ void SendInitialMetadata() {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_->PerformOps(&ops);
+ call_->cq()->Pluck(&ops);
+ }
+
+ bool Read(R* msg) GRPC_OVERRIDE {
+ CallOpSet<CallOpRecvMessage<R>> ops;
+ ops.RecvMessage(msg);
+ call_->PerformOps(&ops);
+ return call_->cq()->Pluck(&ops) && ops.got_message;
+ }
+
+ private:
+ Call* const call_;
+ ServerContext* const ctx_;
+};
+
+template <class W>
+class ServerWriter GRPC_FINAL : public WriterInterface<W> {
+ public:
+ ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
+
+ void SendInitialMetadata() {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_->PerformOps(&ops);
+ call_->cq()->Pluck(&ops);
+ }
+
+ using WriterInterface<W>::Write;
+ bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
+ if (!ops.SendMessage(msg, options).ok()) {
+ return false;
+ }
+ if (!ctx_->sent_initial_metadata_) {
+ ops.SendInitialMetadata(ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ call_->PerformOps(&ops);
+ return call_->cq()->Pluck(&ops);
+ }
+
+ private:
+ Call* const call_;
+ ServerContext* const ctx_;
+};
+
+/// Server-side interface for bi-directional streaming.
+template <class W, class R>
+class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
+ public ReaderInterface<R> {
+ public:
+ ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
+
+ void SendInitialMetadata() {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_->PerformOps(&ops);
+ call_->cq()->Pluck(&ops);
+ }
+
+ bool Read(R* msg) GRPC_OVERRIDE {
+ CallOpSet<CallOpRecvMessage<R>> ops;
+ ops.RecvMessage(msg);
+ call_->PerformOps(&ops);
+ return call_->cq()->Pluck(&ops) && ops.got_message;
+ }
+
+ using WriterInterface<W>::Write;
+ bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
+ if (!ops.SendMessage(msg, options).ok()) {
+ return false;
+ }
+ if (!ctx_->sent_initial_metadata_) {
+ ops.SendInitialMetadata(ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ call_->PerformOps(&ops);
+ return call_->cq()->Pluck(&ops);
+ }
+
+ private:
+ Call* const call_;
+ ServerContext* const ctx_;
+};
+
+} // namespace grpc
+
+#endif // GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
diff --git a/include/grpc++/impl/method_handler_impl.h b/include/grpc++/impl/method_handler_impl.h
index 2997cb0e62..305fd2865e 100644
--- a/include/grpc++/impl/method_handler_impl.h
+++ b/include/grpc++/impl/method_handler_impl.h
@@ -34,170 +34,6 @@
#ifndef GRPCXX_IMPL_METHOD_HANDLER_IMPL_H
#define GRPCXX_IMPL_METHOD_HANDLER_IMPL_H
-#include <grpc++/impl/rpc_service_method.h>
-#include <grpc++/support/sync_stream.h>
+#include <grpc++/impl/codegen/method_handler_impl.h>
-namespace grpc {
-
-// A wrapper class of an application provided rpc method handler.
-template <class ServiceType, class RequestType, class ResponseType>
-class RpcMethodHandler : public MethodHandler {
- public:
- RpcMethodHandler(
- std::function<Status(ServiceType*, ServerContext*, const RequestType*,
- ResponseType*)> func,
- ServiceType* service)
- : func_(func), service_(service) {}
-
- void RunHandler(const HandlerParameter& param) GRPC_FINAL {
- RequestType req;
- Status status = SerializationTraits<RequestType>::Deserialize(
- param.request, &req, param.max_message_size);
- ResponseType rsp;
- if (status.ok()) {
- status = func_(service_, param.server_context, &req, &rsp);
- }
-
- GPR_ASSERT(!param.server_context->sent_initial_metadata_);
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpServerSendStatus> ops;
- ops.SendInitialMetadata(param.server_context->initial_metadata_);
- if (status.ok()) {
- status = ops.SendMessage(rsp);
- }
- ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
- param.call->PerformOps(&ops);
- param.call->cq()->Pluck(&ops);
- }
-
- private:
- // Application provided rpc handler function.
- std::function<Status(ServiceType*, ServerContext*, const RequestType*,
- ResponseType*)> func_;
- // The class the above handler function lives in.
- ServiceType* service_;
-};
-
-// A wrapper class of an application provided client streaming handler.
-template <class ServiceType, class RequestType, class ResponseType>
-class ClientStreamingHandler : public MethodHandler {
- public:
- ClientStreamingHandler(
- std::function<Status(ServiceType*, ServerContext*,
- ServerReader<RequestType>*, ResponseType*)> func,
- ServiceType* service)
- : func_(func), service_(service) {}
-
- void RunHandler(const HandlerParameter& param) GRPC_FINAL {
- ServerReader<RequestType> reader(param.call, param.server_context);
- ResponseType rsp;
- Status status = func_(service_, param.server_context, &reader, &rsp);
-
- GPR_ASSERT(!param.server_context->sent_initial_metadata_);
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpServerSendStatus> ops;
- ops.SendInitialMetadata(param.server_context->initial_metadata_);
- if (status.ok()) {
- status = ops.SendMessage(rsp);
- }
- ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
- param.call->PerformOps(&ops);
- param.call->cq()->Pluck(&ops);
- }
-
- private:
- std::function<Status(ServiceType*, ServerContext*, ServerReader<RequestType>*,
- ResponseType*)> func_;
- ServiceType* service_;
-};
-
-// A wrapper class of an application provided server streaming handler.
-template <class ServiceType, class RequestType, class ResponseType>
-class ServerStreamingHandler : public MethodHandler {
- public:
- ServerStreamingHandler(
- std::function<Status(ServiceType*, ServerContext*, const RequestType*,
- ServerWriter<ResponseType>*)> func,
- ServiceType* service)
- : func_(func), service_(service) {}
-
- void RunHandler(const HandlerParameter& param) GRPC_FINAL {
- RequestType req;
- Status status = SerializationTraits<RequestType>::Deserialize(
- param.request, &req, param.max_message_size);
-
- if (status.ok()) {
- ServerWriter<ResponseType> writer(param.call, param.server_context);
- status = func_(service_, param.server_context, &req, &writer);
- }
-
- CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
- if (!param.server_context->sent_initial_metadata_) {
- ops.SendInitialMetadata(param.server_context->initial_metadata_);
- }
- ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
- param.call->PerformOps(&ops);
- param.call->cq()->Pluck(&ops);
- }
-
- private:
- std::function<Status(ServiceType*, ServerContext*, const RequestType*,
- ServerWriter<ResponseType>*)> func_;
- ServiceType* service_;
-};
-
-// A wrapper class of an application provided bidi-streaming handler.
-template <class ServiceType, class RequestType, class ResponseType>
-class BidiStreamingHandler : public MethodHandler {
- public:
- BidiStreamingHandler(
- std::function<Status(ServiceType*, ServerContext*,
- ServerReaderWriter<ResponseType, RequestType>*)>
- func,
- ServiceType* service)
- : func_(func), service_(service) {}
-
- void RunHandler(const HandlerParameter& param) GRPC_FINAL {
- ServerReaderWriter<ResponseType, RequestType> stream(param.call,
- param.server_context);
- Status status = func_(service_, param.server_context, &stream);
-
- CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
- if (!param.server_context->sent_initial_metadata_) {
- ops.SendInitialMetadata(param.server_context->initial_metadata_);
- }
- ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
- param.call->PerformOps(&ops);
- param.call->cq()->Pluck(&ops);
- }
-
- private:
- std::function<Status(ServiceType*, ServerContext*,
- ServerReaderWriter<ResponseType, RequestType>*)> func_;
- ServiceType* service_;
-};
-
-// Handle unknown method by returning UNIMPLEMENTED error.
-class UnknownMethodHandler : public MethodHandler {
- public:
- template <class T>
- static void FillOps(ServerContext* context, T* ops) {
- Status status(StatusCode::UNIMPLEMENTED, "");
- if (!context->sent_initial_metadata_) {
- ops->SendInitialMetadata(context->initial_metadata_);
- context->sent_initial_metadata_ = true;
- }
- ops->ServerSendStatus(context->trailing_metadata_, status);
- }
-
- void RunHandler(const HandlerParameter& param) GRPC_FINAL {
- CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
- FillOps(param.server_context, &ops);
- param.call->PerformOps(&ops);
- param.call->cq()->Pluck(&ops);
- }
-};
-
-} // namespace grpc
-
-#endif // GRPCXX_IMPL_METHOD_HANDLER_IMPL_H \ No newline at end of file
+#endif // GRPCXX_IMPL_METHOD_HANDLER_IMPL_H
diff --git a/include/grpc++/impl/rpc_method.h b/include/grpc++/impl/rpc_method.h
index 387891727d..8b5d1a3282 100644
--- a/include/grpc++/impl/rpc_method.h
+++ b/include/grpc++/impl/rpc_method.h
@@ -34,40 +34,6 @@
#ifndef GRPCXX_IMPL_RPC_METHOD_H
#define GRPCXX_IMPL_RPC_METHOD_H
-#include <memory>
-
-#include <grpc++/impl/codegen/channel_interface.h>
-
-namespace grpc {
-
-class RpcMethod {
- public:
- enum RpcType {
- NORMAL_RPC = 0,
- CLIENT_STREAMING, // request streaming
- SERVER_STREAMING, // response streaming
- BIDI_STREAMING
- };
-
- RpcMethod(const char* name, RpcType type)
- : name_(name), method_type_(type), channel_tag_(NULL) {}
-
- RpcMethod(const char* name, RpcType type,
- const std::shared_ptr<ChannelInterface>& channel)
- : name_(name),
- method_type_(type),
- channel_tag_(channel->RegisterMethod(name)) {}
-
- const char* name() const { return name_; }
- RpcType method_type() const { return method_type_; }
- void* channel_tag() const { return channel_tag_; }
-
- private:
- const char* const name_;
- const RpcType method_type_;
- void* const channel_tag_;
-};
-
-} // namespace grpc
+#include <grpc++/impl/codegen/rpc_method.h>
#endif // GRPCXX_IMPL_RPC_METHOD_H
diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h
index a0bae80dab..101e6a636b 100644
--- a/include/grpc++/impl/rpc_service_method.h
+++ b/include/grpc++/impl/rpc_service_method.h
@@ -34,59 +34,6 @@
#ifndef GRPCXX_IMPL_RPC_SERVICE_METHOD_H
#define GRPCXX_IMPL_RPC_SERVICE_METHOD_H
-#include <climits>
-#include <functional>
-#include <map>
-#include <memory>
-#include <vector>
-
-#include <grpc++/impl/rpc_method.h>
-#include <grpc++/support/config.h>
-#include <grpc++/support/status.h>
-
-namespace grpc {
-class ServerContext;
-class StreamContextInterface;
-
-// Base class for running an RPC handler.
-class MethodHandler {
- public:
- virtual ~MethodHandler() {}
- struct HandlerParameter {
- HandlerParameter(Call* c, ServerContext* context, grpc_byte_buffer* req,
- int max_size)
- : call(c),
- server_context(context),
- request(req),
- max_message_size(max_size) {}
- Call* call;
- ServerContext* server_context;
- // Handler required to grpc_byte_buffer_destroy this
- grpc_byte_buffer* request;
- int max_message_size;
- };
- virtual void RunHandler(const HandlerParameter& param) = 0;
-};
-
-// Server side rpc method class
-class RpcServiceMethod : public RpcMethod {
- public:
- // Takes ownership of the handler
- RpcServiceMethod(const char* name, RpcMethod::RpcType type,
- MethodHandler* handler)
- : RpcMethod(name, type), server_tag_(nullptr), handler_(handler) {}
-
- void set_server_tag(void* tag) { server_tag_ = tag; }
- void* server_tag() const { return server_tag_; }
- // if MethodHandler is nullptr, then this is an async method
- MethodHandler* handler() const { return handler_.get(); }
- void ResetHandler() { handler_.reset(); }
-
- private:
- void* server_tag_;
- std::unique_ptr<MethodHandler> handler_;
-};
-
-} // namespace grpc
+#include <grpc++/impl/codegen/rpc_service_method.h>
#endif // GRPCXX_IMPL_RPC_SERVICE_METHOD_H
diff --git a/include/grpc++/support/async_unary_call.h b/include/grpc++/support/async_unary_call.h
index 3a601e063d..6d74328be5 100644
--- a/include/grpc++/support/async_unary_call.h
+++ b/include/grpc++/support/async_unary_call.h
@@ -34,123 +34,6 @@
#ifndef GRPCXX_SUPPORT_ASYNC_UNARY_CALL_H
#define GRPCXX_SUPPORT_ASYNC_UNARY_CALL_H
-#include <grpc/impl/codegen/log.h>
-#include <grpc++/impl/codegen/channel_interface.h>
-#include <grpc++/impl/codegen/client_context.h>
-#include <grpc++/impl/codegen/server_context.h>
-#include <grpc++/impl/codegen/call.h>
-#include <grpc++/impl/codegen/service_type.h>
-#include <grpc++/impl/codegen/status.h>
-
-namespace grpc {
-
-class CompletionQueue;
-
-template <class R>
-class ClientAsyncResponseReaderInterface {
- public:
- virtual ~ClientAsyncResponseReaderInterface() {}
- virtual void ReadInitialMetadata(void* tag) = 0;
- virtual void Finish(R* msg, Status* status, void* tag) = 0;
-};
-
-template <class R>
-class ClientAsyncResponseReader GRPC_FINAL
- : public ClientAsyncResponseReaderInterface<R> {
- public:
- template <class W>
- ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
- const RpcMethod& method, ClientContext* context,
- const W& request)
- : context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_buf_.SendInitialMetadata(context->send_initial_metadata_);
- // TODO(ctiller): don't assert
- GPR_ASSERT(init_buf_.SendMessage(request).ok());
- init_buf_.ClientSendClose();
- call_.PerformOps(&init_buf_);
- }
-
- void ReadInitialMetadata(void* tag) {
- GPR_ASSERT(!context_->initial_metadata_received_);
-
- meta_buf_.set_output_tag(tag);
- meta_buf_.RecvInitialMetadata(context_);
- call_.PerformOps(&meta_buf_);
- }
-
- void Finish(R* msg, Status* status, void* tag) {
- finish_buf_.set_output_tag(tag);
- if (!context_->initial_metadata_received_) {
- finish_buf_.RecvInitialMetadata(context_);
- }
- finish_buf_.RecvMessage(msg);
- finish_buf_.ClientRecvStatus(context_, status);
- call_.PerformOps(&finish_buf_);
- }
-
- private:
- ClientContext* context_;
- Call call_;
- SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpClientSendClose> init_buf_;
- CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>,
- CallOpClientRecvStatus> finish_buf_;
-};
-
-template <class W>
-class ServerAsyncResponseWriter GRPC_FINAL
- : public ServerAsyncStreamingInterface {
- public:
- explicit ServerAsyncResponseWriter(ServerContext* ctx)
- : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
-
- void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
- GPR_ASSERT(!ctx_->sent_initial_metadata_);
-
- meta_buf_.set_output_tag(tag);
- meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
- ctx_->sent_initial_metadata_ = true;
- call_.PerformOps(&meta_buf_);
- }
-
- void Finish(const W& msg, const Status& status, void* tag) {
- finish_buf_.set_output_tag(tag);
- if (!ctx_->sent_initial_metadata_) {
- finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
- ctx_->sent_initial_metadata_ = true;
- }
- // The response is dropped if the status is not OK.
- if (status.ok()) {
- finish_buf_.ServerSendStatus(ctx_->trailing_metadata_,
- finish_buf_.SendMessage(msg));
- } else {
- finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
- }
- call_.PerformOps(&finish_buf_);
- }
-
- void FinishWithError(const Status& status, void* tag) {
- GPR_ASSERT(!status.ok());
- finish_buf_.set_output_tag(tag);
- if (!ctx_->sent_initial_metadata_) {
- finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
- ctx_->sent_initial_metadata_ = true;
- }
- finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
- call_.PerformOps(&finish_buf_);
- }
-
- private:
- void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
-
- Call call_;
- ServerContext* ctx_;
- CallOpSet<CallOpSendInitialMetadata> meta_buf_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpServerSendStatus> finish_buf_;
-};
-
-} // namespace grpc
+#include <grpc++/impl/codegen/async_unary_call.h>
#endif // GRPCXX_SUPPORT_ASYNC_UNARY_CALL_H
diff --git a/include/grpc++/support/sync_stream.h b/include/grpc++/support/sync_stream.h
index 3557ba5156..2ea2ac5443 100644
--- a/include/grpc++/support/sync_stream.h
+++ b/include/grpc++/support/sync_stream.h
@@ -34,383 +34,6 @@
#ifndef GRPCXX_SUPPORT_SYNC_STREAM_H
#define GRPCXX_SUPPORT_SYNC_STREAM_H
-#include <grpc++/channel.h>
-#include <grpc++/client_context.h>
-#include <grpc++/completion_queue.h>
-#include <grpc++/impl/call.h>
-#include <grpc++/impl/codegen/channel_interface.h>
-#include <grpc++/impl/service_type.h>
-#include <grpc++/server_context.h>
-#include <grpc++/support/status.h>
-#include <grpc/support/log.h>
-
-namespace grpc {
-
-/// Common interface for all synchronous client side streaming.
-class ClientStreamingInterface {
- public:
- virtual ~ClientStreamingInterface() {}
-
- /// Wait until the stream finishes, and return the final status. When the
- /// client side declares it has no more message to send, either implicitly or
- /// by calling \a WritesDone(), it needs to make sure there is no more message
- /// to be received from the server, either implicitly or by getting a false
- /// from a \a Read().
- ///
- /// This function will return either:
- /// - when all incoming messages have been read and the server has returned
- /// status.
- /// - OR when the server has returned a non-OK status.
- virtual Status Finish() = 0;
-};
-
-/// An interface that yields a sequence of messages of type \a R.
-template <class R>
-class ReaderInterface {
- public:
- virtual ~ReaderInterface() {}
-
- /// Blocking read a message and parse to \a msg. Returns \a true on success.
- ///
- /// \param[out] msg The read message.
- ///
- /// \return \a false when there will be no more incoming messages, either
- /// because the other side has called \a WritesDone() or the stream has failed
- /// (or been cancelled).
- virtual bool Read(R* msg) = 0;
-};
-
-/// An interface that can be fed a sequence of messages of type \a W.
-template <class W>
-class WriterInterface {
- public:
- virtual ~WriterInterface() {}
-
- /// Blocking write \a msg to the stream with options.
- ///
- /// \param msg The message to be written to the stream.
- /// \param options Options affecting the write operation.
- ///
- /// \return \a true on success, \a false when the stream has been closed.
- virtual bool Write(const W& msg, const WriteOptions& options) = 0;
-
- /// Blocking write \a msg to the stream with default options.
- ///
- /// \param msg The message to be written to the stream.
- ///
- /// \return \a true on success, \a false when the stream has been closed.
- inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
-};
-
-/// Client-side interface for streaming reads of message of type \a R.
-template <class R>
-class ClientReaderInterface : public ClientStreamingInterface,
- public ReaderInterface<R> {
- public:
- /// Blocking wait for initial metadata from server. The received metadata
- /// can only be accessed after this call returns. Should only be called before
- /// the first read. Calling this method is optional, and if it is not called
- /// the metadata will be available in ClientContext after the first read.
- virtual void WaitForInitialMetadata() = 0;
-};
-
-template <class R>
-class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
- public:
- /// Blocking create a stream and write the first request out.
- template <class W>
- ClientReader(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context, const W& request)
- : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpClientSendClose> ops;
- ops.SendInitialMetadata(context->send_initial_metadata_);
- // TODO(ctiller): don't assert
- GPR_ASSERT(ops.SendMessage(request).ok());
- ops.ClientSendClose();
- call_.PerformOps(&ops);
- cq_.Pluck(&ops);
- }
-
- void WaitForInitialMetadata() GRPC_OVERRIDE {
- GPR_ASSERT(!context_->initial_metadata_received_);
-
- CallOpSet<CallOpRecvInitialMetadata> ops;
- ops.RecvInitialMetadata(context_);
- call_.PerformOps(&ops);
- cq_.Pluck(&ops); /// status ignored
- }
-
- bool Read(R* msg) GRPC_OVERRIDE {
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
- if (!context_->initial_metadata_received_) {
- ops.RecvInitialMetadata(context_);
- }
- ops.RecvMessage(msg);
- call_.PerformOps(&ops);
- return cq_.Pluck(&ops) && ops.got_message;
- }
-
- Status Finish() GRPC_OVERRIDE {
- CallOpSet<CallOpClientRecvStatus> ops;
- Status status;
- ops.ClientRecvStatus(context_, &status);
- call_.PerformOps(&ops);
- GPR_ASSERT(cq_.Pluck(&ops));
- return status;
- }
-
- private:
- ClientContext* context_;
- CompletionQueue cq_;
- Call call_;
-};
-
-/// Client-side interface for streaming writes of message of type \a W.
-template <class W>
-class ClientWriterInterface : public ClientStreamingInterface,
- public WriterInterface<W> {
- public:
- /// Half close writing from the client.
- /// Block until writes are completed.
- ///
- /// \return Whether the writes were successful.
- virtual bool WritesDone() = 0;
-};
-
-template <class W>
-class ClientWriter : public ClientWriterInterface<W> {
- public:
- /// Blocking create a stream.
- template <class R>
- ClientWriter(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context, R* response)
- : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
- finish_ops_.RecvMessage(response);
-
- CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(context->send_initial_metadata_);
- call_.PerformOps(&ops);
- cq_.Pluck(&ops);
- }
-
- using WriterInterface<W>::Write;
- bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
- CallOpSet<CallOpSendMessage> ops;
- if (!ops.SendMessage(msg, options).ok()) {
- return false;
- }
- call_.PerformOps(&ops);
- return cq_.Pluck(&ops);
- }
-
- bool WritesDone() GRPC_OVERRIDE {
- CallOpSet<CallOpClientSendClose> ops;
- ops.ClientSendClose();
- call_.PerformOps(&ops);
- return cq_.Pluck(&ops);
- }
-
- /// Read the final response and wait for the final status.
- Status Finish() GRPC_OVERRIDE {
- Status status;
- finish_ops_.ClientRecvStatus(context_, &status);
- call_.PerformOps(&finish_ops_);
- GPR_ASSERT(cq_.Pluck(&finish_ops_));
- return status;
- }
-
- private:
- ClientContext* context_;
- CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
- CompletionQueue cq_;
- Call call_;
-};
-
-/// Client-side interface for bi-directional streaming.
-template <class W, class R>
-class ClientReaderWriterInterface : public ClientStreamingInterface,
- public WriterInterface<W>,
- public ReaderInterface<R> {
- public:
- /// Blocking wait for initial metadata from server. The received metadata
- /// can only be accessed after this call returns. Should only be called before
- /// the first read. Calling this method is optional, and if it is not called
- /// the metadata will be available in ClientContext after the first read.
- virtual void WaitForInitialMetadata() = 0;
-
- /// Block until writes are completed.
- ///
- /// \return Whether the writes were successful.
- virtual bool WritesDone() = 0;
-};
-
-template <class W, class R>
-class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
- public:
- /// Blocking create a stream.
- ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context)
- : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
- CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(context->send_initial_metadata_);
- call_.PerformOps(&ops);
- cq_.Pluck(&ops);
- }
-
- void WaitForInitialMetadata() GRPC_OVERRIDE {
- GPR_ASSERT(!context_->initial_metadata_received_);
-
- CallOpSet<CallOpRecvInitialMetadata> ops;
- ops.RecvInitialMetadata(context_);
- call_.PerformOps(&ops);
- cq_.Pluck(&ops); // status ignored
- }
-
- bool Read(R* msg) GRPC_OVERRIDE {
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
- if (!context_->initial_metadata_received_) {
- ops.RecvInitialMetadata(context_);
- }
- ops.RecvMessage(msg);
- call_.PerformOps(&ops);
- return cq_.Pluck(&ops) && ops.got_message;
- }
-
- using WriterInterface<W>::Write;
- bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
- CallOpSet<CallOpSendMessage> ops;
- if (!ops.SendMessage(msg, options).ok()) return false;
- call_.PerformOps(&ops);
- return cq_.Pluck(&ops);
- }
-
- bool WritesDone() GRPC_OVERRIDE {
- CallOpSet<CallOpClientSendClose> ops;
- ops.ClientSendClose();
- call_.PerformOps(&ops);
- return cq_.Pluck(&ops);
- }
-
- Status Finish() GRPC_OVERRIDE {
- CallOpSet<CallOpClientRecvStatus> ops;
- Status status;
- ops.ClientRecvStatus(context_, &status);
- call_.PerformOps(&ops);
- GPR_ASSERT(cq_.Pluck(&ops));
- return status;
- }
-
- private:
- ClientContext* context_;
- CompletionQueue cq_;
- Call call_;
-};
-
-template <class R>
-class ServerReader GRPC_FINAL : public ReaderInterface<R> {
- public:
- ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
-
- void SendInitialMetadata() {
- GPR_ASSERT(!ctx_->sent_initial_metadata_);
-
- CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(ctx_->initial_metadata_);
- ctx_->sent_initial_metadata_ = true;
- call_->PerformOps(&ops);
- call_->cq()->Pluck(&ops);
- }
-
- bool Read(R* msg) GRPC_OVERRIDE {
- CallOpSet<CallOpRecvMessage<R>> ops;
- ops.RecvMessage(msg);
- call_->PerformOps(&ops);
- return call_->cq()->Pluck(&ops) && ops.got_message;
- }
-
- private:
- Call* const call_;
- ServerContext* const ctx_;
-};
-
-template <class W>
-class ServerWriter GRPC_FINAL : public WriterInterface<W> {
- public:
- ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
-
- void SendInitialMetadata() {
- GPR_ASSERT(!ctx_->sent_initial_metadata_);
-
- CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(ctx_->initial_metadata_);
- ctx_->sent_initial_metadata_ = true;
- call_->PerformOps(&ops);
- call_->cq()->Pluck(&ops);
- }
-
- using WriterInterface<W>::Write;
- bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
- if (!ops.SendMessage(msg, options).ok()) {
- return false;
- }
- if (!ctx_->sent_initial_metadata_) {
- ops.SendInitialMetadata(ctx_->initial_metadata_);
- ctx_->sent_initial_metadata_ = true;
- }
- call_->PerformOps(&ops);
- return call_->cq()->Pluck(&ops);
- }
-
- private:
- Call* const call_;
- ServerContext* const ctx_;
-};
-
-/// Server-side interface for bi-directional streaming.
-template <class W, class R>
-class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
- public ReaderInterface<R> {
- public:
- ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
-
- void SendInitialMetadata() {
- GPR_ASSERT(!ctx_->sent_initial_metadata_);
-
- CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(ctx_->initial_metadata_);
- ctx_->sent_initial_metadata_ = true;
- call_->PerformOps(&ops);
- call_->cq()->Pluck(&ops);
- }
-
- bool Read(R* msg) GRPC_OVERRIDE {
- CallOpSet<CallOpRecvMessage<R>> ops;
- ops.RecvMessage(msg);
- call_->PerformOps(&ops);
- return call_->cq()->Pluck(&ops) && ops.got_message;
- }
-
- using WriterInterface<W>::Write;
- bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
- if (!ops.SendMessage(msg, options).ok()) {
- return false;
- }
- if (!ctx_->sent_initial_metadata_) {
- ops.SendInitialMetadata(ctx_->initial_metadata_);
- ctx_->sent_initial_metadata_ = true;
- }
- call_->PerformOps(&ops);
- return call_->cq()->Pluck(&ops);
- }
-
- private:
- Call* const call_;
- ServerContext* const ctx_;
-};
-
-} // namespace grpc
+#include <grpc++/impl/codegen/sync_stream.h>
#endif // GRPCXX_SUPPORT_SYNC_STREAM_H