aboutsummaryrefslogtreecommitdiffhomepage
path: root/include/grpc++
diff options
context:
space:
mode:
Diffstat (limited to 'include/grpc++')
-rw-r--r--include/grpc++/async_server.h70
-rw-r--r--include/grpc++/async_server_context.h95
-rw-r--r--include/grpc++/channel_interface.h26
-rw-r--r--include/grpc++/client_context.h76
-rw-r--r--include/grpc++/completion_queue.h88
-rw-r--r--include/grpc++/config.h3
-rw-r--r--include/grpc++/impl/call.h147
-rw-r--r--include/grpc++/impl/client_unary_call.h66
-rw-r--r--include/grpc++/impl/rpc_method.h4
-rw-r--r--include/grpc++/impl/rpc_service_method.h28
-rw-r--r--include/grpc++/impl/service_type.h127
-rw-r--r--include/grpc++/server.h40
-rw-r--r--include/grpc++/server_builder.h14
-rw-r--r--include/grpc++/server_context.h68
-rw-r--r--include/grpc++/stream.h719
-rw-r--r--include/grpc++/stream_context_interface.h64
16 files changed, 1237 insertions, 398 deletions
diff --git a/include/grpc++/async_server.h b/include/grpc++/async_server.h
deleted file mode 100644
index fe2c5d9367..0000000000
--- a/include/grpc++/async_server.h
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef __GRPCPP_ASYNC_SERVER_H__
-#define __GRPCPP_ASYNC_SERVER_H__
-
-#include <mutex>
-
-#include <grpc++/config.h>
-
-struct grpc_server;
-
-namespace grpc {
-class CompletionQueue;
-
-class AsyncServer {
- public:
- explicit AsyncServer(CompletionQueue* cc);
- ~AsyncServer();
-
- void AddPort(const grpc::string& addr);
-
- void Start();
-
- // The user has to call this to get one new rpc on the completion
- // queue.
- void RequestOneRpc();
-
- void Shutdown();
-
- private:
- bool started_;
- std::mutex shutdown_mu_;
- bool shutdown_;
- grpc_server* server_;
-};
-
-} // namespace grpc
-
-#endif // __GRPCPP_ASYNC_SERVER_H__
diff --git a/include/grpc++/async_server_context.h b/include/grpc++/async_server_context.h
deleted file mode 100644
index c038286ac1..0000000000
--- a/include/grpc++/async_server_context.h
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef __GRPCPP_ASYNC_SERVER_CONTEXT_H__
-#define __GRPCPP_ASYNC_SERVER_CONTEXT_H__
-
-#include <chrono>
-
-#include <grpc++/config.h>
-
-struct grpc_byte_buffer;
-struct grpc_call;
-struct grpc_completion_queue;
-
-namespace google {
-namespace protobuf {
-class Message;
-}
-}
-
-using std::chrono::system_clock;
-
-namespace grpc {
-class Status;
-
-// TODO(rocking): wrap grpc c structures.
-class AsyncServerContext {
- public:
- AsyncServerContext(grpc_call* call, const grpc::string& method,
- const grpc::string& host,
- system_clock::time_point absolute_deadline);
- ~AsyncServerContext();
-
- // Accept this rpc, bind it to a completion queue.
- void Accept(grpc_completion_queue* cq);
-
- // Read and write calls, all async. Return true for success.
- bool StartRead(google::protobuf::Message* request);
- bool StartWrite(const google::protobuf::Message& response, int flags);
- bool StartWriteStatus(const Status& status);
-
- bool ParseRead(grpc_byte_buffer* read_buffer);
-
- grpc::string method() const { return method_; }
- grpc::string host() const { return host_; }
- system_clock::time_point absolute_deadline() { return absolute_deadline_; }
-
- grpc_call* call() { return call_; }
-
- private:
- AsyncServerContext(const AsyncServerContext&);
- AsyncServerContext& operator=(const AsyncServerContext&);
-
- // These properties may be moved to a ServerContext class.
- const grpc::string method_;
- const grpc::string host_;
- system_clock::time_point absolute_deadline_;
-
- google::protobuf::Message* request_; // not owned
- grpc_call* call_; // owned
-};
-
-} // namespace grpc
-
-#endif // __GRPCPP_ASYNC_SERVER_CONTEXT_H__
diff --git a/include/grpc++/channel_interface.h b/include/grpc++/channel_interface.h
index 9ed35422b8..b0366faabb 100644
--- a/include/grpc++/channel_interface.h
+++ b/include/grpc++/channel_interface.h
@@ -35,32 +35,30 @@
#define __GRPCPP_CHANNEL_INTERFACE_H__
#include <grpc++/status.h>
+#include <grpc++/impl/call.h>
namespace google {
namespace protobuf {
class Message;
-}
-}
+} // namespace protobuf
+} // namespace google
-namespace grpc {
+struct grpc_call;
+namespace grpc {
+class Call;
+class CallOpBuffer;
class ClientContext;
+class CompletionQueue;
class RpcMethod;
-class StreamContextInterface;
+class CallInterface;
-class ChannelInterface {
+class ChannelInterface : public CallHook {
public:
virtual ~ChannelInterface() {}
- virtual Status StartBlockingRpc(const RpcMethod& method,
- ClientContext* context,
- const google::protobuf::Message& request,
- google::protobuf::Message* result) = 0;
-
- virtual StreamContextInterface* CreateStream(
- const RpcMethod& method, ClientContext* context,
- const google::protobuf::Message* request,
- google::protobuf::Message* result) = 0;
+ virtual Call CreateCall(const RpcMethod &method, ClientContext *context,
+ CompletionQueue *cq) = 0;
};
} // namespace grpc
diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h
index 0cf6bdc647..4594cbaeb6 100644
--- a/include/grpc++/client_context.h
+++ b/include/grpc++/client_context.h
@@ -35,8 +35,8 @@
#define __GRPCPP_CLIENT_CONTEXT_H__
#include <chrono>
+#include <map>
#include <string>
-#include <vector>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@@ -47,8 +47,32 @@ using std::chrono::system_clock;
struct grpc_call;
struct grpc_completion_queue;
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
namespace grpc {
+class CallOpBuffer;
+class ChannelInterface;
+class CompletionQueue;
+class RpcMethod;
+class Status;
+template <class R>
+class ClientReader;
+template <class W>
+class ClientWriter;
+template <class R, class W>
+class ClientReaderWriter;
+template <class R>
+class ClientAsyncReader;
+template <class W>
+class ClientAsyncWriter;
+template <class R, class W>
+class ClientAsyncReaderWriter;
+
class ClientContext {
public:
ClientContext();
@@ -57,18 +81,54 @@ class ClientContext {
void AddMetadata(const grpc::string &meta_key,
const grpc::string &meta_value);
+ std::multimap<grpc::string, grpc::string> GetServerInitialMetadata() {
+ GPR_ASSERT(initial_metadata_received_);
+ return recv_initial_metadata_;
+ }
+
+ std::multimap<grpc::string, grpc::string> GetServerTrailingMetadata() {
+ // TODO(yangg) check finished
+ return trailing_metadata_;
+ }
+
void set_absolute_deadline(const system_clock::time_point &deadline);
system_clock::time_point absolute_deadline();
- void StartCancel();
+ void set_authority(const grpc::string& authority) {
+ authority_ = authority;
+ }
+
+ void TryCancel();
private:
// Disallow copy and assign.
ClientContext(const ClientContext &);
ClientContext &operator=(const ClientContext &);
+ friend class CallOpBuffer;
friend class Channel;
- friend class StreamContext;
+ template <class R>
+ friend class ::grpc::ClientReader;
+ template <class W>
+ friend class ::grpc::ClientWriter;
+ template <class R, class W>
+ friend class ::grpc::ClientReaderWriter;
+ template <class R>
+ friend class ::grpc::ClientAsyncReader;
+ template <class W>
+ friend class ::grpc::ClientAsyncWriter;
+ template <class R, class W>
+ friend class ::grpc::ClientAsyncReaderWriter;
+ friend Status BlockingUnaryCall(ChannelInterface *channel,
+ const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request,
+ google::protobuf::Message *result);
+ friend void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request,
+ google::protobuf::Message *result, Status *status,
+ CompletionQueue *cq, void *tag);
grpc_call *call() { return call_; }
void set_call(grpc_call *call) {
@@ -81,10 +141,18 @@ class ClientContext {
gpr_timespec RawDeadline() { return absolute_deadline_; }
+ grpc::string authority() {
+ return authority_;
+ }
+
+ bool initial_metadata_received_ = false;
grpc_call *call_;
grpc_completion_queue *cq_;
gpr_timespec absolute_deadline_;
- std::vector<std::pair<grpc::string, grpc::string> > metadata_;
+ grpc::string authority_;
+ std::multimap<grpc::string, grpc::string> send_initial_metadata_;
+ std::multimap<grpc::string, grpc::string> recv_initial_metadata_;
+ std::multimap<grpc::string, grpc::string> trailing_metadata_;
};
} // namespace grpc
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h
index 72f6253f8e..c5267f8563 100644
--- a/include/grpc++/completion_queue.h
+++ b/include/grpc++/completion_queue.h
@@ -34,52 +34,82 @@
#ifndef __GRPCPP_COMPLETION_QUEUE_H__
#define __GRPCPP_COMPLETION_QUEUE_H__
+#include <grpc++/impl/client_unary_call.h>
+
struct grpc_completion_queue;
namespace grpc {
+template <class R>
+class ClientReader;
+template <class W>
+class ClientWriter;
+template <class R, class W>
+class ClientReaderWriter;
+template <class R>
+class ServerReader;
+template <class W>
+class ServerWriter;
+template <class R, class W>
+class ServerReaderWriter;
+
+class CompletionQueue;
+class Server;
+
+class CompletionQueueTag {
+ public:
+ virtual ~CompletionQueueTag() {}
+ // Called prior to returning from Next(), return value
+ // is the status of the operation (return status is the default thing
+ // to do)
+ virtual void FinalizeResult(void **tag, bool *status) = 0;
+};
+
// grpc_completion_queue wrapper class
class CompletionQueue {
public:
CompletionQueue();
+ explicit CompletionQueue(grpc_completion_queue *take);
~CompletionQueue();
- enum CompletionType {
- QUEUE_CLOSED = 0, // Shutting down.
- RPC_END = 1, // An RPC finished. Either at client or server.
- CLIENT_READ_OK = 2, // A client-side read has finished successfully.
- CLIENT_READ_ERROR = 3, // A client-side read has finished with error.
- CLIENT_WRITE_OK = 4,
- CLIENT_WRITE_ERROR = 5,
- SERVER_RPC_NEW = 6, // A new RPC just arrived at the server.
- SERVER_READ_OK = 7, // A server-side read has finished successfully.
- SERVER_READ_ERROR = 8, // A server-side read has finished with error.
- SERVER_WRITE_OK = 9,
- SERVER_WRITE_ERROR = 10,
- // Client or server has sent half close successfully.
- HALFCLOSE_OK = 11,
- // New CompletionTypes may be added in the future, so user code should
- // always
- // handle the default case of a CompletionType that appears after such code
- // was
- // written.
- DO_NOT_USE = 20,
- };
-
// Blocking read from queue.
- // For QUEUE_CLOSED, *tag is not changed.
- // For SERVER_RPC_NEW, *tag will be a newly allocated AsyncServerContext.
- // For others, *tag will be the AsyncServerContext of this rpc.
- CompletionType Next(void** tag);
+ // Returns true if an event was received, false if the queue is ready
+ // for destruction.
+ bool Next(void **tag, bool *ok);
// Shutdown has to be called, and the CompletionQueue can only be
- // destructed when the QUEUE_CLOSED message has been read with Next().
+ // destructed when false is returned from Next().
void Shutdown();
- grpc_completion_queue* cq() { return cq_; }
+ grpc_completion_queue *cq() { return cq_; }
private:
- grpc_completion_queue* cq_; // owned
+ // Friend synchronous wrappers so that they can access Pluck(), which is
+ // a semi-private API geared towards the synchronous implementation.
+ template <class R>
+ friend class ::grpc::ClientReader;
+ template <class W>
+ friend class ::grpc::ClientWriter;
+ template <class R, class W>
+ friend class ::grpc::ClientReaderWriter;
+ template <class R>
+ friend class ::grpc::ServerReader;
+ template <class W>
+ friend class ::grpc::ServerWriter;
+ template <class R, class W>
+ friend class ::grpc::ServerReaderWriter;
+ friend class ::grpc::Server;
+ friend Status BlockingUnaryCall(ChannelInterface *channel,
+ const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request,
+ google::protobuf::Message *result);
+
+ // Wraps grpc_completion_queue_pluck.
+ // Cannot be mixed with calls to Next().
+ bool Pluck(CompletionQueueTag *tag);
+
+ grpc_completion_queue *cq_; // owned
};
} // namespace grpc
diff --git a/include/grpc++/config.h b/include/grpc++/config.h
index 52913fbf0f..663e40247d 100644
--- a/include/grpc++/config.h
+++ b/include/grpc++/config.h
@@ -39,6 +39,7 @@
namespace grpc {
typedef std::string string;
-}
+
+} // namespace grpc
#endif // __GRPCPP_CONFIG_H__
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h
new file mode 100644
index 0000000000..64f0f890c5
--- /dev/null
+++ b/include/grpc++/impl/call.h
@@ -0,0 +1,147 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPCPP_CALL_H__
+#define __GRPCPP_CALL_H__
+
+#include <grpc/grpc.h>
+#include <grpc++/status.h>
+#include <grpc++/completion_queue.h>
+
+#include <memory>
+#include <map>
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
+struct grpc_call;
+struct grpc_op;
+
+namespace grpc {
+
+class Call;
+
+class CallOpBuffer : public CompletionQueueTag {
+ public:
+ CallOpBuffer() : return_tag_(this) {}
+ ~CallOpBuffer();
+
+ void Reset(void *next_return_tag);
+
+ // Does not take ownership.
+ void AddSendInitialMetadata(
+ std::multimap<grpc::string, grpc::string> *metadata);
+ void AddSendInitialMetadata(ClientContext *ctx);
+ void AddRecvInitialMetadata(
+ std::multimap<grpc::string, grpc::string> *metadata);
+ void AddSendMessage(const google::protobuf::Message &message);
+ void AddRecvMessage(google::protobuf::Message *message);
+ void AddClientSendClose();
+ void AddClientRecvStatus(std::multimap<grpc::string, grpc::string> *metadata,
+ Status *status);
+ void AddServerSendStatus(std::multimap<grpc::string, grpc::string> *metadata,
+ const Status &status);
+ void AddServerRecvClose(bool *cancelled);
+
+ // INTERNAL API:
+
+ // Convert to an array of grpc_op elements
+ void FillOps(grpc_op *ops, size_t *nops);
+
+ // Called by completion queue just prior to returning from Next() or Pluck()
+ void FinalizeResult(void **tag, bool *status) override;
+
+ bool got_message = false;
+
+ private:
+ void *return_tag_ = nullptr;
+ // Send initial metadata
+ bool send_initial_metadata_ = false;
+ size_t initial_metadata_count_ = 0;
+ grpc_metadata *initial_metadata_ = nullptr;
+ // Recv initial metadta
+ std::multimap<grpc::string, grpc::string> *recv_initial_metadata_ = nullptr;
+ grpc_metadata_array recv_initial_metadata_arr_ = {0, 0, nullptr};
+ // Send message
+ const google::protobuf::Message *send_message_ = nullptr;
+ grpc_byte_buffer *send_message_buf_ = nullptr;
+ // Recv message
+ google::protobuf::Message *recv_message_ = nullptr;
+ grpc_byte_buffer *recv_message_buf_ = nullptr;
+ // Client send close
+ bool client_send_close_ = false;
+ // Client recv status
+ std::multimap<grpc::string, grpc::string> *recv_trailing_metadata_ = nullptr;
+ Status *recv_status_ = nullptr;
+ grpc_metadata_array recv_trailing_metadata_arr_ = {0, 0, nullptr};
+ grpc_status_code status_code_ = GRPC_STATUS_OK;
+ char *status_details_ = nullptr;
+ size_t status_details_capacity_ = 0;
+ // Server send status
+ const Status *send_status_ = nullptr;
+ size_t trailing_metadata_count_ = 0;
+ grpc_metadata *trailing_metadata_ = nullptr;
+ int cancelled_buf_;
+ bool *recv_closed_ = nullptr;
+};
+
+// Channel and Server implement this to allow them to hook performing ops
+class CallHook {
+ public:
+ virtual ~CallHook() {}
+ virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) = 0;
+};
+
+// Straightforward wrapping of the C call object
+class Call final {
+ public:
+ /* call is owned by the caller */
+ Call(grpc_call *call, CallHook *call_hook_, CompletionQueue *cq);
+
+ void PerformOps(CallOpBuffer *buffer);
+
+ grpc_call *call() { return call_; }
+ CompletionQueue *cq() { return cq_; }
+
+ private:
+ CallHook *call_hook_;
+ CompletionQueue *cq_;
+ grpc_call *call_;
+};
+
+} // namespace grpc
+
+#endif // __GRPCPP_CALL_INTERFACE_H__
diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h
new file mode 100644
index 0000000000..22a8a04c82
--- /dev/null
+++ b/include/grpc++/impl/client_unary_call.h
@@ -0,0 +1,66 @@
+/*
+*
+* Copyright 2014, Google Inc.
+* All rights reserved.
+*
+* Redistribution and use in source and binary forms, with or without
+* modification, are permitted provided that the following conditions are
+* met:
+*
+* * Redistributions of source code must retain the above copyright
+* notice, this list of conditions and the following disclaimer.
+* * Redistributions in binary form must reproduce the above
+* copyright notice, this list of conditions and the following disclaimer
+* in the documentation and/or other materials provided with the
+* distribution.
+* * Neither the name of Google Inc. nor the names of its
+* contributors may be used to endorse or promote products derived from
+* this software without specific prior written permission.
+*
+* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*
+*/
+
+#ifndef __GRPCPP_CLIENT_UNARY_CALL_H__
+#define __GRPCPP_CLIENT_UNARY_CALL_H__
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
+namespace grpc {
+
+class ChannelInterface;
+class ClientContext;
+class CompletionQueue;
+class RpcMethod;
+class Status;
+
+// Wrapper that begins an asynchronous unary call
+void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request,
+ google::protobuf::Message *result, Status *status,
+ CompletionQueue *cq, void *tag);
+
+// Wrapper that performs a blocking unary call
+Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request,
+ google::protobuf::Message *result);
+
+} // namespace grpc
+
+#endif
diff --git a/include/grpc++/impl/rpc_method.h b/include/grpc++/impl/rpc_method.h
index 75fec356dd..bb16e64c96 100644
--- a/include/grpc++/impl/rpc_method.h
+++ b/include/grpc++/impl/rpc_method.h
@@ -37,8 +37,8 @@
namespace google {
namespace protobuf {
class Message;
-}
-}
+} // namespace protobuf
+} // namespace google
namespace grpc {
diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h
index 620de5e67f..bf62871b7d 100644
--- a/include/grpc++/impl/rpc_service_method.h
+++ b/include/grpc++/impl/rpc_service_method.h
@@ -55,25 +55,14 @@ class MethodHandler {
public:
virtual ~MethodHandler() {}
struct HandlerParameter {
- HandlerParameter(ServerContext* context,
+ HandlerParameter(Call* c, ServerContext* context,
const google::protobuf::Message* req,
google::protobuf::Message* resp)
- : server_context(context),
- request(req),
- response(resp),
- stream_context(nullptr) {}
- HandlerParameter(ServerContext* context,
- const google::protobuf::Message* req,
- google::protobuf::Message* resp,
- StreamContextInterface* stream)
- : server_context(context),
- request(req),
- response(resp),
- stream_context(stream) {}
+ : call(c), server_context(context), request(req), response(resp) {}
+ Call* call;
ServerContext* server_context;
const google::protobuf::Message* request;
google::protobuf::Message* response;
- StreamContextInterface* stream_context;
};
virtual Status RunHandler(const HandlerParameter& param) = 0;
};
@@ -114,7 +103,7 @@ class ClientStreamingHandler : public MethodHandler {
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) final {
- ServerReader<RequestType> reader(param.stream_context);
+ ServerReader<RequestType> reader(param.call, param.server_context);
return func_(service_, param.server_context, &reader,
dynamic_cast<ResponseType*>(param.response));
}
@@ -136,7 +125,7 @@ class ServerStreamingHandler : public MethodHandler {
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) final {
- ServerWriter<ResponseType> writer(param.stream_context);
+ ServerWriter<ResponseType> writer(param.call, param.server_context);
return func_(service_, param.server_context,
dynamic_cast<const RequestType*>(param.request), &writer);
}
@@ -159,7 +148,8 @@ class BidiStreamingHandler : public MethodHandler {
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) final {
- ServerReaderWriter<ResponseType, RequestType> stream(param.stream_context);
+ ServerReaderWriter<ResponseType, RequestType> stream(param.call,
+ param.server_context);
return func_(service_, param.server_context, &stream);
}
@@ -202,9 +192,7 @@ class RpcServiceMethod : public RpcMethod {
class RpcService {
public:
// Takes ownership.
- void AddMethod(RpcServiceMethod* method) {
- methods_.push_back(std::unique_ptr<RpcServiceMethod>(method));
- }
+ void AddMethod(RpcServiceMethod* method) { methods_.emplace_back(method); }
RpcServiceMethod* GetMethod(int i) { return methods_[i].get(); }
int GetMethodCount() const { return methods_.size(); }
diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h
new file mode 100644
index 0000000000..221664befe
--- /dev/null
+++ b/include/grpc++/impl/service_type.h
@@ -0,0 +1,127 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPCPP_IMPL_SERVICE_TYPE_H__
+#define __GRPCPP_IMPL_SERVICE_TYPE_H__
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
+namespace grpc {
+
+class Call;
+class RpcService;
+class Server;
+class ServerContext;
+class Status;
+
+class SynchronousService {
+ public:
+ virtual ~SynchronousService() {}
+ virtual RpcService* service() = 0;
+};
+
+class ServerAsyncStreamingInterface {
+ public:
+ virtual ~ServerAsyncStreamingInterface() {}
+
+ virtual void SendInitialMetadata(void* tag) = 0;
+
+ private:
+ friend class Server;
+ virtual void BindCall(Call* call) = 0;
+};
+
+class AsynchronousService {
+ public:
+ // this is Server, but in disguise to avoid a link dependency
+ class DispatchImpl {
+ public:
+ virtual void RequestAsyncCall(void* registered_method,
+ ServerContext* context,
+ ::google::protobuf::Message* request,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* cq, void* tag) = 0;
+ };
+
+ AsynchronousService(CompletionQueue* cq, const char** method_names,
+ size_t method_count)
+ : cq_(cq), method_names_(method_names), method_count_(method_count) {}
+
+ ~AsynchronousService() { delete[] request_args_; }
+
+ CompletionQueue* completion_queue() const { return cq_; }
+
+ protected:
+ void RequestAsyncUnary(int index, ServerContext* context,
+ ::google::protobuf::Message* request,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* cq, void* tag) {
+ dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
+ stream, cq, tag);
+ }
+ void RequestClientStreaming(int index, ServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* cq, void* tag) {
+ dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
+ stream, cq, tag);
+ }
+ void RequestServerStreaming(int index, ServerContext* context,
+ ::google::protobuf::Message* request,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* cq, void* tag) {
+ dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
+ stream, cq, tag);
+ }
+ void RequestBidiStreaming(int index, ServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* cq, void* tag) {
+ dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
+ stream, cq, tag);
+ }
+
+ private:
+ friend class Server;
+ CompletionQueue* const cq_;
+ DispatchImpl* dispatch_impl_ = nullptr;
+ const char** const method_names_;
+ size_t method_count_;
+ void** request_args_ = nullptr;
+};
+
+} // namespace grpc
+
+#endif // __GRPCPP_IMPL_SERVICE_TYPE_H__
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 5fa371ba62..410c762375 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -35,12 +35,14 @@
#define __GRPCPP_SERVER_H__
#include <condition_variable>
-#include <map>
+#include <list>
#include <memory>
#include <mutex>
#include <grpc++/completion_queue.h>
#include <grpc++/config.h>
+#include <grpc++/impl/call.h>
+#include <grpc++/impl/service_type.h>
#include <grpc++/status.h>
struct grpc_server;
@@ -48,18 +50,19 @@ struct grpc_server;
namespace google {
namespace protobuf {
class Message;
-}
-}
+} // namespace protobuf
+} // namespace google
namespace grpc {
-class AsyncServerContext;
+class AsynchronousService;
class RpcService;
class RpcServiceMethod;
class ServerCredentials;
class ThreadPoolInterface;
// Currently it only supports handling rpcs in a single thread.
-class Server {
+class Server final : private CallHook,
+ private AsynchronousService::DispatchImpl {
public:
~Server();
@@ -69,22 +72,34 @@ class Server {
private:
friend class ServerBuilder;
+ class SyncRequest;
+ class AsyncRequest;
+
// ServerBuilder use only
- Server(ThreadPoolInterface* thread_pool, ServerCredentials* creds);
+ Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
+ ServerCredentials* creds);
Server();
// Register a service. This call does not take ownership of the service.
// The service must exist for the lifetime of the Server instance.
- void RegisterService(RpcService* service);
+ bool RegisterService(RpcService* service);
+ bool RegisterAsyncService(AsynchronousService* service);
// Add a listening port. Can be called multiple times.
- void AddPort(const grpc::string& addr);
+ int AddPort(const grpc::string& addr);
// Start the server.
- void Start();
+ bool Start();
- void AllowOneRpc();
void HandleQueueClosed();
void RunRpc();
void ScheduleCallback();
+ void PerformOpsOnCall(CallOpBuffer* ops, Call* call) override;
+
+ // DispatchImpl
+ void RequestAsyncCall(void* registered_method, ServerContext* context,
+ ::google::protobuf::Message* request,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* cq, void* tag);
+
// Completion queue.
CompletionQueue cq_;
@@ -96,12 +111,11 @@ class Server {
int num_running_cb_;
std::condition_variable callback_cv_;
+ std::list<SyncRequest> sync_methods_;
+
// Pointer to the c grpc server.
grpc_server* server_;
- // A map for all method information.
- std::map<grpc::string, RpcServiceMethod*> method_map_;
-
ThreadPoolInterface* thread_pool_;
// Whether the thread pool is created and owned by the server.
bool thread_pool_owned_;
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index cf27452010..a550a53afb 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -41,9 +41,12 @@
namespace grpc {
+class AsynchronousService;
+class CompletionQueue;
class RpcService;
class Server;
class ServerCredentials;
+class SynchronousService;
class ThreadPoolInterface;
class ServerBuilder {
@@ -53,7 +56,13 @@ class ServerBuilder {
// Register a service. This call does not take ownership of the service.
// The service must exist for the lifetime of the Server instance returned by
// BuildAndStart().
- void RegisterService(RpcService* service);
+ void RegisterService(SynchronousService* service);
+
+ // Register an asynchronous service. New calls will be delevered to cq.
+ // This call does not take ownership of the service or completion queue.
+ // The service and completion queuemust exist for the lifetime of the Server
+ // instance returned by BuildAndStart().
+ void RegisterAsyncService(AsynchronousService* service);
// Add a listening port. Can be called multiple times.
void AddPort(const grpc::string& addr);
@@ -71,9 +80,10 @@ class ServerBuilder {
private:
std::vector<RpcService*> services_;
+ std::vector<AsynchronousService*> async_services_;
std::vector<grpc::string> ports_;
std::shared_ptr<ServerCredentials> creds_;
- ThreadPoolInterface* thread_pool_;
+ ThreadPoolInterface* thread_pool_ = nullptr;
};
} // namespace grpc
diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h
index 47fd6cf1c8..853f91f467 100644
--- a/include/grpc++/server_context.h
+++ b/include/grpc++/server_context.h
@@ -35,15 +35,77 @@
#define __GRPCPP_SERVER_CONTEXT_H_
#include <chrono>
+#include <map>
+
+#include "config.h"
+
+struct gpr_timespec;
+struct grpc_metadata;
+struct grpc_call;
namespace grpc {
+template <class W, class R>
+class ServerAsyncReader;
+template <class W>
+class ServerAsyncWriter;
+template <class W>
+class ServerAsyncResponseWriter;
+template <class R, class W>
+class ServerAsyncReaderWriter;
+template <class R>
+class ServerReader;
+template <class W>
+class ServerWriter;
+template <class R, class W>
+class ServerReaderWriter;
+
+class CallOpBuffer;
+class Server;
+
// Interface of server side rpc context.
-class ServerContext {
+class ServerContext final {
public:
- virtual ~ServerContext() {}
+ ServerContext(); // for async calls
+ ~ServerContext();
+
+ std::chrono::system_clock::time_point absolute_deadline() {
+ return deadline_;
+ }
+
+ void AddInitialMetadata(const grpc::string& key, const grpc::string& value);
+ void AddTrailingMetadata(const grpc::string& key, const grpc::string& value);
+
+ std::multimap<grpc::string, grpc::string> client_metadata() {
+ return client_metadata_;
+ }
+
+ private:
+ friend class ::grpc::Server;
+ template <class W, class R>
+ friend class ::grpc::ServerAsyncReader;
+ template <class W>
+ friend class ::grpc::ServerAsyncWriter;
+ template <class W>
+ friend class ::grpc::ServerAsyncResponseWriter;
+ template <class R, class W>
+ friend class ::grpc::ServerAsyncReaderWriter;
+ template <class R>
+ friend class ::grpc::ServerReader;
+ template <class W>
+ friend class ::grpc::ServerWriter;
+ template <class R, class W>
+ friend class ::grpc::ServerReaderWriter;
+
+ ServerContext(gpr_timespec deadline, grpc_metadata* metadata,
+ size_t metadata_count);
- virtual std::chrono::system_clock::time_point absolute_deadline() const = 0;
+ std::chrono::system_clock::time_point deadline_;
+ grpc_call* call_ = nullptr;
+ bool sent_initial_metadata_ = false;
+ std::multimap<grpc::string, grpc::string> client_metadata_;
+ std::multimap<grpc::string, grpc::string> initial_metadata_;
+ std::multimap<grpc::string, grpc::string> trailing_metadata_;
};
} // namespace grpc
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index b8982f4d93..be5b29589f 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -34,7 +34,12 @@
#ifndef __GRPCPP_STREAM_H__
#define __GRPCPP_STREAM_H__
-#include <grpc++/stream_context_interface.h>
+#include <grpc++/channel_interface.h>
+#include <grpc++/client_context.h>
+#include <grpc++/completion_queue.h>
+#include <grpc++/server_context.h>
+#include <grpc++/impl/call.h>
+#include <grpc++/impl/service_type.h>
#include <grpc++/status.h>
#include <grpc/support/log.h>
@@ -45,16 +50,12 @@ class ClientStreamingInterface {
public:
virtual ~ClientStreamingInterface() {}
- // Try to cancel the stream. Wait() still needs to be called to get the final
- // status. Cancelling after the stream has finished has no effects.
- virtual void Cancel() = 0;
-
// Wait until the stream finishes, and return the final status. When the
// client side declares it has no more message to send, either implicitly or
// by calling WritesDone, it needs to make sure there is no more message to
// be received from the server, either implicitly or by getting a false from
// a Read(). Otherwise, this implicitly cancels the stream.
- virtual const Status& Wait() = 0;
+ virtual Status Finish() = 0;
};
// An interface that yields a sequence of R messages.
@@ -82,147 +83,703 @@ class WriterInterface {
};
template <class R>
-class ClientReader : public ClientStreamingInterface,
- public ReaderInterface<R> {
+class ClientReader final : public ClientStreamingInterface,
+ public ReaderInterface<R> {
public:
// Blocking create a stream and write the first request out.
- explicit ClientReader(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
- context_->Write(context_->request(), true);
+ ClientReader(ChannelInterface* channel, const RpcMethod& method,
+ ClientContext* context, const google::protobuf::Message& request)
+ : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
+ CallOpBuffer buf;
+ buf.AddSendInitialMetadata(&context->send_initial_metadata_);
+ buf.AddSendMessage(request);
+ buf.AddClientSendClose();
+ call_.PerformOps(&buf);
+ cq_.Pluck(&buf);
}
- ~ClientReader() { delete context_; }
-
- virtual bool Read(R* msg) { return context_->Read(msg); }
+ // Blocking wait for initial metadata from server. The received metadata
+ // can only be accessed after this call returns. Should only be called before
+ // the first read. Calling this method is optional, and if it is not called
+ // the metadata will be available in ClientContext after the first read.
+ void WaitForInitialMetadata() {
+ GPR_ASSERT(!context_->initial_metadata_received_);
+
+ CallOpBuffer buf;
+ buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
+ context_->initial_metadata_received_ = true;
+ }
- virtual void Cancel() { context_->Cancel(); }
+ virtual bool Read(R* msg) override {
+ CallOpBuffer buf;
+ if (!context_->initial_metadata_received_) {
+ buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ context_->initial_metadata_received_ = true;
+ }
+ buf.AddRecvMessage(msg);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf) && buf.got_message;
+ }
- virtual const Status& Wait() { return context_->Wait(); }
+ virtual Status Finish() override {
+ CallOpBuffer buf;
+ Status status;
+ buf.AddClientRecvStatus(&context_->trailing_metadata_, &status);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
+ return status;
+ }
private:
- StreamContextInterface* const context_;
+ ClientContext* context_;
+ CompletionQueue cq_;
+ Call call_;
};
template <class W>
-class ClientWriter : public ClientStreamingInterface,
- public WriterInterface<W> {
+class ClientWriter final : public ClientStreamingInterface,
+ public WriterInterface<W> {
public:
// Blocking create a stream.
- explicit ClientWriter(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(false);
+ ClientWriter(ChannelInterface* channel, const RpcMethod& method,
+ ClientContext* context, google::protobuf::Message* response)
+ : context_(context),
+ response_(response),
+ call_(channel->CreateCall(method, context, &cq_)) {
+ CallOpBuffer buf;
+ buf.AddSendInitialMetadata(&context->send_initial_metadata_);
+ call_.PerformOps(&buf);
+ cq_.Pluck(&buf);
}
- ~ClientWriter() { delete context_; }
-
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
}
- virtual void WritesDone() { context_->Write(nullptr, true); }
-
- virtual void Cancel() { context_->Cancel(); }
+ virtual bool WritesDone() {
+ CallOpBuffer buf;
+ buf.AddClientSendClose();
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
+ }
// Read the final response and wait for the final status.
- virtual const Status& Wait() {
- bool success = context_->Read(context_->response());
- if (!success) {
- Cancel();
- } else {
- success = context_->Read(nullptr);
- if (success) {
- Cancel();
- }
- }
- return context_->Wait();
+ virtual Status Finish() override {
+ CallOpBuffer buf;
+ Status status;
+ buf.AddRecvMessage(response_);
+ buf.AddClientRecvStatus(&context_->trailing_metadata_, &status);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf) && buf.got_message);
+ return status;
}
private:
- StreamContextInterface* const context_;
+ ClientContext* context_;
+ google::protobuf::Message* const response_;
+ CompletionQueue cq_;
+ Call call_;
};
// Client-side interface for bi-directional streaming.
template <class W, class R>
-class ClientReaderWriter : public ClientStreamingInterface,
- public WriterInterface<W>,
- public ReaderInterface<R> {
+class ClientReaderWriter final : public ClientStreamingInterface,
+ public WriterInterface<W>,
+ public ReaderInterface<R> {
public:
// Blocking create a stream.
- explicit ClientReaderWriter(StreamContextInterface* context)
- : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(false);
+ ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
+ ClientContext* context)
+ : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
+ CallOpBuffer buf;
+ buf.AddSendInitialMetadata(&context->send_initial_metadata_);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
}
- ~ClientReaderWriter() { delete context_; }
+ // Blocking wait for initial metadata from server. The received metadata
+ // can only be accessed after this call returns. Should only be called before
+ // the first read. Calling this method is optional, and if it is not called
+ // the metadata will be available in ClientContext after the first read.
+ void WaitForInitialMetadata() {
+ GPR_ASSERT(!context_->initial_metadata_received_);
+
+ CallOpBuffer buf;
+ buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
+ context_->initial_metadata_received_ = true;
+ }
- virtual bool Read(R* msg) { return context_->Read(msg); }
+ virtual bool Read(R* msg) override {
+ CallOpBuffer buf;
+ if (!context_->initial_metadata_received_) {
+ buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ context_->initial_metadata_received_ = true;
+ }
+ buf.AddRecvMessage(msg);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf) && buf.got_message;
+ }
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
}
- virtual void WritesDone() { context_->Write(nullptr, true); }
+ virtual bool WritesDone() {
+ CallOpBuffer buf;
+ buf.AddClientSendClose();
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
+ }
- virtual void Cancel() { context_->Cancel(); }
+ virtual Status Finish() override {
+ CallOpBuffer buf;
+ Status status;
+ buf.AddClientRecvStatus(&context_->trailing_metadata_, &status);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
+ return status;
+ }
- virtual const Status& Wait() { return context_->Wait(); }
+ private:
+ ClientContext* context_;
+ CompletionQueue cq_;
+ Call call_;
+};
+
+template <class R>
+class ServerReader final : public ReaderInterface<R> {
+ public:
+ ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
+
+ void SendInitialMetadata() {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ CallOpBuffer buf;
+ buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_->PerformOps(&buf);
+ call_->cq()->Pluck(&buf);
+ }
+
+ virtual bool Read(R* msg) override {
+ CallOpBuffer buf;
+ buf.AddRecvMessage(msg);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf) && buf.got_message;
+ }
private:
- StreamContextInterface* const context_;
+ Call* const call_;
+ ServerContext* const ctx_;
+};
+
+template <class W>
+class ServerWriter final : public WriterInterface<W> {
+ public:
+ ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
+
+ void SendInitialMetadata() {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ CallOpBuffer buf;
+ buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_->PerformOps(&buf);
+ call_->cq()->Pluck(&buf);
+ }
+
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ if (!ctx_->sent_initial_metadata_) {
+ buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ buf.AddSendMessage(msg);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf);
+ }
+
+ private:
+ Call* const call_;
+ ServerContext* const ctx_;
+};
+
+// Server-side interface for bi-directional streaming.
+template <class W, class R>
+class ServerReaderWriter final : public WriterInterface<W>,
+ public ReaderInterface<R> {
+ public:
+ ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
+
+ void SendInitialMetadata() {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ CallOpBuffer buf;
+ buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_->PerformOps(&buf);
+ call_->cq()->Pluck(&buf);
+ }
+
+ virtual bool Read(R* msg) override {
+ CallOpBuffer buf;
+ buf.AddRecvMessage(msg);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf) && buf.got_message;
+ }
+
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ if (!ctx_->sent_initial_metadata_) {
+ buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ buf.AddSendMessage(msg);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf);
+ }
+
+ private:
+ Call* const call_;
+ ServerContext* const ctx_;
+};
+
+// Async interfaces
+// Common interface for all client side streaming.
+class ClientAsyncStreamingInterface {
+ public:
+ virtual ~ClientAsyncStreamingInterface() {}
+
+ virtual void ReadInitialMetadata(void* tag) = 0;
+
+ virtual void Finish(Status* status, void* tag) = 0;
+};
+
+// An interface that yields a sequence of R messages.
+template <class R>
+class AsyncReaderInterface {
+ public:
+ virtual ~AsyncReaderInterface() {}
+
+ virtual void Read(R* msg, void* tag) = 0;
+};
+
+// An interface that can be fed a sequence of W messages.
+template <class W>
+class AsyncWriterInterface {
+ public:
+ virtual ~AsyncWriterInterface() {}
+
+ virtual void Write(const W& msg, void* tag) = 0;
};
template <class R>
-class ServerReader : public ReaderInterface<R> {
+class ClientAsyncReader final : public ClientAsyncStreamingInterface,
+ public AsyncReaderInterface<R> {
public:
- explicit ServerReader(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
+ // Create a stream and write the first request out.
+ ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq,
+ const RpcMethod& method, ClientContext* context,
+ const google::protobuf::Message& request, void* tag)
+ : context_(context), call_(channel->CreateCall(method, context, cq)) {
+ init_buf_.Reset(tag);
+ init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
+ init_buf_.AddSendMessage(request);
+ init_buf_.AddClientSendClose();
+ call_.PerformOps(&init_buf_);
}
- virtual bool Read(R* msg) { return context_->Read(msg); }
+ void ReadInitialMetadata(void* tag) override {
+ GPR_ASSERT(!context_->initial_metadata_received_);
+
+ meta_buf_.Reset(tag);
+ meta_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ call_.PerformOps(&meta_buf_);
+ context_->initial_metadata_received_ = true;
+ }
+
+ void Read(R* msg, void* tag) override {
+ read_buf_.Reset(tag);
+ if (!context_->initial_metadata_received_) {
+ read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ context_->initial_metadata_received_ = true;
+ }
+ read_buf_.AddRecvMessage(msg);
+ call_.PerformOps(&read_buf_);
+ }
+
+ void Finish(Status* status, void* tag) override {
+ finish_buf_.Reset(tag);
+ if (!context_->initial_metadata_received_) {
+ finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ context_->initial_metadata_received_ = true;
+ }
+ finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status);
+ call_.PerformOps(&finish_buf_);
+ }
private:
- StreamContextInterface* const context_; // not owned
+ ClientContext* context_ = nullptr;
+ Call call_;
+ CallOpBuffer init_buf_;
+ CallOpBuffer meta_buf_;
+ CallOpBuffer read_buf_;
+ CallOpBuffer finish_buf_;
};
template <class W>
-class ServerWriter : public WriterInterface<W> {
+class ClientAsyncWriter final : public ClientAsyncStreamingInterface,
+ public AsyncWriterInterface<W> {
public:
- explicit ServerWriter(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
- context_->Read(context_->request());
+ ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq,
+ const RpcMethod& method, ClientContext* context,
+ google::protobuf::Message* response, void* tag)
+ : context_(context),
+ response_(response),
+ call_(channel->CreateCall(method, context, cq)) {
+ init_buf_.Reset(tag);
+ init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
+ call_.PerformOps(&init_buf_);
+ }
+
+ void ReadInitialMetadata(void* tag) override {
+ GPR_ASSERT(!context_->initial_metadata_received_);
+
+ meta_buf_.Reset(tag);
+ meta_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ call_.PerformOps(&meta_buf_);
+ context_->initial_metadata_received_ = true;
+ }
+
+ void Write(const W& msg, void* tag) override {
+ write_buf_.Reset(tag);
+ write_buf_.AddSendMessage(msg);
+ call_.PerformOps(&write_buf_);
}
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ void WritesDone(void* tag) {
+ writes_done_buf_.Reset(tag);
+ writes_done_buf_.AddClientSendClose();
+ call_.PerformOps(&writes_done_buf_);
+ }
+
+ void Finish(Status* status, void* tag) override {
+ finish_buf_.Reset(tag);
+ if (!context_->initial_metadata_received_) {
+ finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ context_->initial_metadata_received_ = true;
+ }
+ finish_buf_.AddRecvMessage(response_);
+ finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status);
+ call_.PerformOps(&finish_buf_);
+ }
+
+ private:
+ ClientContext* context_ = nullptr;
+ google::protobuf::Message* const response_;
+ Call call_;
+ CallOpBuffer init_buf_;
+ CallOpBuffer meta_buf_;
+ CallOpBuffer write_buf_;
+ CallOpBuffer writes_done_buf_;
+ CallOpBuffer finish_buf_;
+};
+
+// Client-side interface for bi-directional streaming.
+template <class W, class R>
+class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
+ public AsyncWriterInterface<W>,
+ public AsyncReaderInterface<R> {
+ public:
+ ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq,
+ const RpcMethod& method, ClientContext* context,
+ void* tag)
+ : context_(context), call_(channel->CreateCall(method, context, cq)) {
+ init_buf_.Reset(tag);
+ init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
+ call_.PerformOps(&init_buf_);
+ }
+
+ void ReadInitialMetadata(void* tag) override {
+ GPR_ASSERT(!context_->initial_metadata_received_);
+
+ meta_buf_.Reset(tag);
+ meta_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ call_.PerformOps(&meta_buf_);
+ context_->initial_metadata_received_ = true;
+ }
+
+ void Read(R* msg, void* tag) override {
+ read_buf_.Reset(tag);
+ if (!context_->initial_metadata_received_) {
+ read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ context_->initial_metadata_received_ = true;
+ }
+ read_buf_.AddRecvMessage(msg);
+ call_.PerformOps(&read_buf_);
+ }
+
+ void Write(const W& msg, void* tag) override {
+ write_buf_.Reset(tag);
+ write_buf_.AddSendMessage(msg);
+ call_.PerformOps(&write_buf_);
+ }
+
+ void WritesDone(void* tag) {
+ writes_done_buf_.Reset(tag);
+ writes_done_buf_.AddClientSendClose();
+ call_.PerformOps(&writes_done_buf_);
+ }
+
+ void Finish(Status* status, void* tag) override {
+ finish_buf_.Reset(tag);
+ if (!context_->initial_metadata_received_) {
+ finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ context_->initial_metadata_received_ = true;
+ }
+ finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status);
+ call_.PerformOps(&finish_buf_);
+ }
+
+ private:
+ ClientContext* context_ = nullptr;
+ Call call_;
+ CallOpBuffer init_buf_;
+ CallOpBuffer meta_buf_;
+ CallOpBuffer read_buf_;
+ CallOpBuffer write_buf_;
+ CallOpBuffer writes_done_buf_;
+ CallOpBuffer finish_buf_;
+};
+
+// TODO(yangg) Move out of stream.h
+template <class W>
+class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
+ public:
+ explicit ServerAsyncResponseWriter(ServerContext* ctx)
+ : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
+
+ void SendInitialMetadata(void* tag) {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ meta_buf_.Reset(tag);
+ meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_.PerformOps(&meta_buf_);
+ }
+
+ void Finish(const W& msg, const Status& status, void* tag) {
+ finish_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ // The response is dropped if the status is not OK.
+ if (status.IsOk()) {
+ finish_buf_.AddSendMessage(msg);
+ }
+ bool cancelled = false;
+ finish_buf_.AddServerRecvClose(&cancelled);
+ finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_buf_);
+ }
+
+ void FinishWithError(const Status& status, void* tag) {
+ GPR_ASSERT(!status.IsOk());
+ finish_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ bool cancelled = false;
+ finish_buf_.AddServerRecvClose(&cancelled);
+ finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_buf_);
+ }
+
+ private:
+ void BindCall(Call* call) override { call_ = *call; }
+
+ Call call_;
+ ServerContext* ctx_;
+ CallOpBuffer meta_buf_;
+ CallOpBuffer finish_buf_;
+};
+
+template <class W, class R>
+class ServerAsyncReader : public ServerAsyncStreamingInterface,
+ public AsyncReaderInterface<R> {
+ public:
+ explicit ServerAsyncReader(ServerContext* ctx)
+ : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
+
+ void SendInitialMetadata(void* tag) override {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ meta_buf_.Reset(tag);
+ meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_.PerformOps(&meta_buf_);
+ }
+
+ void Read(R* msg, void* tag) override {
+ read_buf_.Reset(tag);
+ read_buf_.AddRecvMessage(msg);
+ call_.PerformOps(&read_buf_);
+ }
+
+ void Finish(const W& msg, const Status& status, void* tag) {
+ finish_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ // The response is dropped if the status is not OK.
+ if (status.IsOk()) {
+ finish_buf_.AddSendMessage(msg);
+ }
+ bool cancelled = false;
+ finish_buf_.AddServerRecvClose(&cancelled);
+ finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_buf_);
+ }
+
+ void FinishWithError(const Status& status, void* tag) {
+ GPR_ASSERT(!status.IsOk());
+ finish_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ bool cancelled = false;
+ finish_buf_.AddServerRecvClose(&cancelled);
+ finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_buf_);
+ }
+
+ private:
+ void BindCall(Call* call) override { call_ = *call; }
+
+ Call call_;
+ ServerContext* ctx_;
+ CallOpBuffer meta_buf_;
+ CallOpBuffer read_buf_;
+ CallOpBuffer finish_buf_;
+};
+
+template <class W>
+class ServerAsyncWriter : public ServerAsyncStreamingInterface,
+ public AsyncWriterInterface<W> {
+ public:
+ explicit ServerAsyncWriter(ServerContext* ctx)
+ : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
+
+ void SendInitialMetadata(void* tag) override {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ meta_buf_.Reset(tag);
+ meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_.PerformOps(&meta_buf_);
+ }
+
+ void Write(const W& msg, void* tag) override {
+ write_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ write_buf_.AddSendMessage(msg);
+ call_.PerformOps(&write_buf_);
+ }
+
+ void Finish(const Status& status, void* tag) {
+ finish_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ bool cancelled = false;
+ finish_buf_.AddServerRecvClose(&cancelled);
+ finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_buf_);
}
private:
- StreamContextInterface* const context_; // not owned
+ void BindCall(Call* call) override { call_ = *call; }
+
+ Call call_;
+ ServerContext* ctx_;
+ CallOpBuffer meta_buf_;
+ CallOpBuffer write_buf_;
+ CallOpBuffer finish_buf_;
};
// Server-side interface for bi-directional streaming.
template <class W, class R>
-class ServerReaderWriter : public WriterInterface<W>,
- public ReaderInterface<R> {
+class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface,
+ public AsyncWriterInterface<W>,
+ public AsyncReaderInterface<R> {
public:
- explicit ServerReaderWriter(StreamContextInterface* context)
- : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
+ explicit ServerAsyncReaderWriter(ServerContext* ctx)
+ : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
+
+ void SendInitialMetadata(void* tag) override {
+ GPR_ASSERT(!ctx_->sent_initial_metadata_);
+
+ meta_buf_.Reset(tag);
+ meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ call_.PerformOps(&meta_buf_);
}
- virtual bool Read(R* msg) { return context_->Read(msg); }
+ virtual void Read(R* msg, void* tag) override {
+ read_buf_.Reset(tag);
+ read_buf_.AddRecvMessage(msg);
+ call_.PerformOps(&read_buf_);
+ }
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ virtual void Write(const W& msg, void* tag) override {
+ write_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ write_buf_.AddSendMessage(msg);
+ call_.PerformOps(&write_buf_);
+ }
+
+ void Finish(const Status& status, void* tag) {
+ finish_buf_.Reset(tag);
+ if (!ctx_->sent_initial_metadata_) {
+ finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ctx_->sent_initial_metadata_ = true;
+ }
+ bool cancelled = false;
+ finish_buf_.AddServerRecvClose(&cancelled);
+ finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_buf_);
}
private:
- StreamContextInterface* const context_; // not owned
+ void BindCall(Call* call) override { call_ = *call; }
+
+ Call call_;
+ ServerContext* ctx_;
+ CallOpBuffer meta_buf_;
+ CallOpBuffer read_buf_;
+ CallOpBuffer write_buf_;
+ CallOpBuffer finish_buf_;
};
} // namespace grpc
diff --git a/include/grpc++/stream_context_interface.h b/include/grpc++/stream_context_interface.h
deleted file mode 100644
index a84119800b..0000000000
--- a/include/grpc++/stream_context_interface.h
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef __GRPCPP_STREAM_CONTEXT_INTERFACE_H__
-#define __GRPCPP_STREAM_CONTEXT_INTERFACE_H__
-
-namespace google {
-namespace protobuf {
-class Message;
-}
-}
-
-namespace grpc {
-class Status;
-
-// An interface to avoid dependency on internal implementation.
-class StreamContextInterface {
- public:
- virtual ~StreamContextInterface() {}
-
- virtual void Start(bool buffered) = 0;
-
- virtual bool Read(google::protobuf::Message* msg) = 0;
- virtual bool Write(const google::protobuf::Message* msg, bool is_last) = 0;
- virtual const Status& Wait() = 0;
- virtual void Cancel() = 0;
-
- virtual google::protobuf::Message* request() = 0;
- virtual google::protobuf::Message* response() = 0;
-};
-
-} // namespace grpc
-
-#endif // __GRPCPP_STREAM_CONTEXT_INTERFACE_H__