diff options
Diffstat (limited to 'include')
-rw-r--r-- | include/grpc++/async_server_context.h | 95 | ||||
-rw-r--r-- | include/grpc++/channel_interface.h | 26 | ||||
-rw-r--r-- | include/grpc++/client_context.h | 76 | ||||
-rw-r--r-- | include/grpc++/completion_queue.h | 88 | ||||
-rw-r--r-- | include/grpc++/config.h | 3 | ||||
-rw-r--r-- | include/grpc++/impl/call.h | 147 | ||||
-rw-r--r-- | include/grpc++/impl/client_unary_call.h | 66 | ||||
-rw-r--r-- | include/grpc++/impl/rpc_method.h | 4 | ||||
-rw-r--r-- | include/grpc++/impl/rpc_service_method.h | 28 | ||||
-rw-r--r-- | include/grpc++/impl/service_type.h | 127 | ||||
-rw-r--r-- | include/grpc++/server.h | 40 | ||||
-rw-r--r-- | include/grpc++/server_builder.h | 14 | ||||
-rw-r--r-- | include/grpc++/server_context.h | 68 | ||||
-rw-r--r-- | include/grpc++/stream.h | 719 | ||||
-rw-r--r-- | include/grpc++/stream_context_interface.h | 64 | ||||
l--------- | include/grpc/grpc | 1 | ||||
-rw-r--r-- | include/grpc/grpc.h | 61 | ||||
-rw-r--r-- | include/grpc/support/cpu.h (renamed from include/grpc++/async_server.h) | 49 |
18 files changed, 1302 insertions, 374 deletions
diff --git a/include/grpc++/async_server_context.h b/include/grpc++/async_server_context.h deleted file mode 100644 index c038286ac1..0000000000 --- a/include/grpc++/async_server_context.h +++ /dev/null @@ -1,95 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef __GRPCPP_ASYNC_SERVER_CONTEXT_H__ -#define __GRPCPP_ASYNC_SERVER_CONTEXT_H__ - -#include <chrono> - -#include <grpc++/config.h> - -struct grpc_byte_buffer; -struct grpc_call; -struct grpc_completion_queue; - -namespace google { -namespace protobuf { -class Message; -} -} - -using std::chrono::system_clock; - -namespace grpc { -class Status; - -// TODO(rocking): wrap grpc c structures. -class AsyncServerContext { - public: - AsyncServerContext(grpc_call* call, const grpc::string& method, - const grpc::string& host, - system_clock::time_point absolute_deadline); - ~AsyncServerContext(); - - // Accept this rpc, bind it to a completion queue. - void Accept(grpc_completion_queue* cq); - - // Read and write calls, all async. Return true for success. - bool StartRead(google::protobuf::Message* request); - bool StartWrite(const google::protobuf::Message& response, int flags); - bool StartWriteStatus(const Status& status); - - bool ParseRead(grpc_byte_buffer* read_buffer); - - grpc::string method() const { return method_; } - grpc::string host() const { return host_; } - system_clock::time_point absolute_deadline() { return absolute_deadline_; } - - grpc_call* call() { return call_; } - - private: - AsyncServerContext(const AsyncServerContext&); - AsyncServerContext& operator=(const AsyncServerContext&); - - // These properties may be moved to a ServerContext class. - const grpc::string method_; - const grpc::string host_; - system_clock::time_point absolute_deadline_; - - google::protobuf::Message* request_; // not owned - grpc_call* call_; // owned -}; - -} // namespace grpc - -#endif // __GRPCPP_ASYNC_SERVER_CONTEXT_H__ diff --git a/include/grpc++/channel_interface.h b/include/grpc++/channel_interface.h index 9ed35422b8..b0366faabb 100644 --- a/include/grpc++/channel_interface.h +++ b/include/grpc++/channel_interface.h @@ -35,32 +35,30 @@ #define __GRPCPP_CHANNEL_INTERFACE_H__ #include <grpc++/status.h> +#include <grpc++/impl/call.h> namespace google { namespace protobuf { class Message; -} -} +} // namespace protobuf +} // namespace google -namespace grpc { +struct grpc_call; +namespace grpc { +class Call; +class CallOpBuffer; class ClientContext; +class CompletionQueue; class RpcMethod; -class StreamContextInterface; +class CallInterface; -class ChannelInterface { +class ChannelInterface : public CallHook { public: virtual ~ChannelInterface() {} - virtual Status StartBlockingRpc(const RpcMethod& method, - ClientContext* context, - const google::protobuf::Message& request, - google::protobuf::Message* result) = 0; - - virtual StreamContextInterface* CreateStream( - const RpcMethod& method, ClientContext* context, - const google::protobuf::Message* request, - google::protobuf::Message* result) = 0; + virtual Call CreateCall(const RpcMethod &method, ClientContext *context, + CompletionQueue *cq) = 0; }; } // namespace grpc diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h index 0cf6bdc647..4594cbaeb6 100644 --- a/include/grpc++/client_context.h +++ b/include/grpc++/client_context.h @@ -35,8 +35,8 @@ #define __GRPCPP_CLIENT_CONTEXT_H__ #include <chrono> +#include <map> #include <string> -#include <vector> #include <grpc/support/log.h> #include <grpc/support/time.h> @@ -47,8 +47,32 @@ using std::chrono::system_clock; struct grpc_call; struct grpc_completion_queue; +namespace google { +namespace protobuf { +class Message; +} // namespace protobuf +} // namespace google + namespace grpc { +class CallOpBuffer; +class ChannelInterface; +class CompletionQueue; +class RpcMethod; +class Status; +template <class R> +class ClientReader; +template <class W> +class ClientWriter; +template <class R, class W> +class ClientReaderWriter; +template <class R> +class ClientAsyncReader; +template <class W> +class ClientAsyncWriter; +template <class R, class W> +class ClientAsyncReaderWriter; + class ClientContext { public: ClientContext(); @@ -57,18 +81,54 @@ class ClientContext { void AddMetadata(const grpc::string &meta_key, const grpc::string &meta_value); + std::multimap<grpc::string, grpc::string> GetServerInitialMetadata() { + GPR_ASSERT(initial_metadata_received_); + return recv_initial_metadata_; + } + + std::multimap<grpc::string, grpc::string> GetServerTrailingMetadata() { + // TODO(yangg) check finished + return trailing_metadata_; + } + void set_absolute_deadline(const system_clock::time_point &deadline); system_clock::time_point absolute_deadline(); - void StartCancel(); + void set_authority(const grpc::string& authority) { + authority_ = authority; + } + + void TryCancel(); private: // Disallow copy and assign. ClientContext(const ClientContext &); ClientContext &operator=(const ClientContext &); + friend class CallOpBuffer; friend class Channel; - friend class StreamContext; + template <class R> + friend class ::grpc::ClientReader; + template <class W> + friend class ::grpc::ClientWriter; + template <class R, class W> + friend class ::grpc::ClientReaderWriter; + template <class R> + friend class ::grpc::ClientAsyncReader; + template <class W> + friend class ::grpc::ClientAsyncWriter; + template <class R, class W> + friend class ::grpc::ClientAsyncReaderWriter; + friend Status BlockingUnaryCall(ChannelInterface *channel, + const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, + google::protobuf::Message *result); + friend void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, + google::protobuf::Message *result, Status *status, + CompletionQueue *cq, void *tag); grpc_call *call() { return call_; } void set_call(grpc_call *call) { @@ -81,10 +141,18 @@ class ClientContext { gpr_timespec RawDeadline() { return absolute_deadline_; } + grpc::string authority() { + return authority_; + } + + bool initial_metadata_received_ = false; grpc_call *call_; grpc_completion_queue *cq_; gpr_timespec absolute_deadline_; - std::vector<std::pair<grpc::string, grpc::string> > metadata_; + grpc::string authority_; + std::multimap<grpc::string, grpc::string> send_initial_metadata_; + std::multimap<grpc::string, grpc::string> recv_initial_metadata_; + std::multimap<grpc::string, grpc::string> trailing_metadata_; }; } // namespace grpc diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index 72f6253f8e..c5267f8563 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -34,52 +34,82 @@ #ifndef __GRPCPP_COMPLETION_QUEUE_H__ #define __GRPCPP_COMPLETION_QUEUE_H__ +#include <grpc++/impl/client_unary_call.h> + struct grpc_completion_queue; namespace grpc { +template <class R> +class ClientReader; +template <class W> +class ClientWriter; +template <class R, class W> +class ClientReaderWriter; +template <class R> +class ServerReader; +template <class W> +class ServerWriter; +template <class R, class W> +class ServerReaderWriter; + +class CompletionQueue; +class Server; + +class CompletionQueueTag { + public: + virtual ~CompletionQueueTag() {} + // Called prior to returning from Next(), return value + // is the status of the operation (return status is the default thing + // to do) + virtual void FinalizeResult(void **tag, bool *status) = 0; +}; + // grpc_completion_queue wrapper class class CompletionQueue { public: CompletionQueue(); + explicit CompletionQueue(grpc_completion_queue *take); ~CompletionQueue(); - enum CompletionType { - QUEUE_CLOSED = 0, // Shutting down. - RPC_END = 1, // An RPC finished. Either at client or server. - CLIENT_READ_OK = 2, // A client-side read has finished successfully. - CLIENT_READ_ERROR = 3, // A client-side read has finished with error. - CLIENT_WRITE_OK = 4, - CLIENT_WRITE_ERROR = 5, - SERVER_RPC_NEW = 6, // A new RPC just arrived at the server. - SERVER_READ_OK = 7, // A server-side read has finished successfully. - SERVER_READ_ERROR = 8, // A server-side read has finished with error. - SERVER_WRITE_OK = 9, - SERVER_WRITE_ERROR = 10, - // Client or server has sent half close successfully. - HALFCLOSE_OK = 11, - // New CompletionTypes may be added in the future, so user code should - // always - // handle the default case of a CompletionType that appears after such code - // was - // written. - DO_NOT_USE = 20, - }; - // Blocking read from queue. - // For QUEUE_CLOSED, *tag is not changed. - // For SERVER_RPC_NEW, *tag will be a newly allocated AsyncServerContext. - // For others, *tag will be the AsyncServerContext of this rpc. - CompletionType Next(void** tag); + // Returns true if an event was received, false if the queue is ready + // for destruction. + bool Next(void **tag, bool *ok); // Shutdown has to be called, and the CompletionQueue can only be - // destructed when the QUEUE_CLOSED message has been read with Next(). + // destructed when false is returned from Next(). void Shutdown(); - grpc_completion_queue* cq() { return cq_; } + grpc_completion_queue *cq() { return cq_; } private: - grpc_completion_queue* cq_; // owned + // Friend synchronous wrappers so that they can access Pluck(), which is + // a semi-private API geared towards the synchronous implementation. + template <class R> + friend class ::grpc::ClientReader; + template <class W> + friend class ::grpc::ClientWriter; + template <class R, class W> + friend class ::grpc::ClientReaderWriter; + template <class R> + friend class ::grpc::ServerReader; + template <class W> + friend class ::grpc::ServerWriter; + template <class R, class W> + friend class ::grpc::ServerReaderWriter; + friend class ::grpc::Server; + friend Status BlockingUnaryCall(ChannelInterface *channel, + const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, + google::protobuf::Message *result); + + // Wraps grpc_completion_queue_pluck. + // Cannot be mixed with calls to Next(). + bool Pluck(CompletionQueueTag *tag); + + grpc_completion_queue *cq_; // owned }; } // namespace grpc diff --git a/include/grpc++/config.h b/include/grpc++/config.h index 52913fbf0f..663e40247d 100644 --- a/include/grpc++/config.h +++ b/include/grpc++/config.h @@ -39,6 +39,7 @@ namespace grpc { typedef std::string string; -} + +} // namespace grpc #endif // __GRPCPP_CONFIG_H__ diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h new file mode 100644 index 0000000000..64f0f890c5 --- /dev/null +++ b/include/grpc++/impl/call.h @@ -0,0 +1,147 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPCPP_CALL_H__ +#define __GRPCPP_CALL_H__ + +#include <grpc/grpc.h> +#include <grpc++/status.h> +#include <grpc++/completion_queue.h> + +#include <memory> +#include <map> + +namespace google { +namespace protobuf { +class Message; +} // namespace protobuf +} // namespace google + +struct grpc_call; +struct grpc_op; + +namespace grpc { + +class Call; + +class CallOpBuffer : public CompletionQueueTag { + public: + CallOpBuffer() : return_tag_(this) {} + ~CallOpBuffer(); + + void Reset(void *next_return_tag); + + // Does not take ownership. + void AddSendInitialMetadata( + std::multimap<grpc::string, grpc::string> *metadata); + void AddSendInitialMetadata(ClientContext *ctx); + void AddRecvInitialMetadata( + std::multimap<grpc::string, grpc::string> *metadata); + void AddSendMessage(const google::protobuf::Message &message); + void AddRecvMessage(google::protobuf::Message *message); + void AddClientSendClose(); + void AddClientRecvStatus(std::multimap<grpc::string, grpc::string> *metadata, + Status *status); + void AddServerSendStatus(std::multimap<grpc::string, grpc::string> *metadata, + const Status &status); + void AddServerRecvClose(bool *cancelled); + + // INTERNAL API: + + // Convert to an array of grpc_op elements + void FillOps(grpc_op *ops, size_t *nops); + + // Called by completion queue just prior to returning from Next() or Pluck() + void FinalizeResult(void **tag, bool *status) override; + + bool got_message = false; + + private: + void *return_tag_ = nullptr; + // Send initial metadata + bool send_initial_metadata_ = false; + size_t initial_metadata_count_ = 0; + grpc_metadata *initial_metadata_ = nullptr; + // Recv initial metadta + std::multimap<grpc::string, grpc::string> *recv_initial_metadata_ = nullptr; + grpc_metadata_array recv_initial_metadata_arr_ = {0, 0, nullptr}; + // Send message + const google::protobuf::Message *send_message_ = nullptr; + grpc_byte_buffer *send_message_buf_ = nullptr; + // Recv message + google::protobuf::Message *recv_message_ = nullptr; + grpc_byte_buffer *recv_message_buf_ = nullptr; + // Client send close + bool client_send_close_ = false; + // Client recv status + std::multimap<grpc::string, grpc::string> *recv_trailing_metadata_ = nullptr; + Status *recv_status_ = nullptr; + grpc_metadata_array recv_trailing_metadata_arr_ = {0, 0, nullptr}; + grpc_status_code status_code_ = GRPC_STATUS_OK; + char *status_details_ = nullptr; + size_t status_details_capacity_ = 0; + // Server send status + const Status *send_status_ = nullptr; + size_t trailing_metadata_count_ = 0; + grpc_metadata *trailing_metadata_ = nullptr; + int cancelled_buf_; + bool *recv_closed_ = nullptr; +}; + +// Channel and Server implement this to allow them to hook performing ops +class CallHook { + public: + virtual ~CallHook() {} + virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) = 0; +}; + +// Straightforward wrapping of the C call object +class Call final { + public: + /* call is owned by the caller */ + Call(grpc_call *call, CallHook *call_hook_, CompletionQueue *cq); + + void PerformOps(CallOpBuffer *buffer); + + grpc_call *call() { return call_; } + CompletionQueue *cq() { return cq_; } + + private: + CallHook *call_hook_; + CompletionQueue *cq_; + grpc_call *call_; +}; + +} // namespace grpc + +#endif // __GRPCPP_CALL_INTERFACE_H__ diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h new file mode 100644 index 0000000000..22a8a04c82 --- /dev/null +++ b/include/grpc++/impl/client_unary_call.h @@ -0,0 +1,66 @@ +/* +* +* Copyright 2014, Google Inc. +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions are +* met: +* +* * Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above +* copyright notice, this list of conditions and the following disclaimer +* in the documentation and/or other materials provided with the +* distribution. +* * Neither the name of Google Inc. nor the names of its +* contributors may be used to endorse or promote products derived from +* this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +* +*/ + +#ifndef __GRPCPP_CLIENT_UNARY_CALL_H__ +#define __GRPCPP_CLIENT_UNARY_CALL_H__ + +namespace google { +namespace protobuf { +class Message; +} // namespace protobuf +} // namespace google + +namespace grpc { + +class ChannelInterface; +class ClientContext; +class CompletionQueue; +class RpcMethod; +class Status; + +// Wrapper that begins an asynchronous unary call +void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, + google::protobuf::Message *result, Status *status, + CompletionQueue *cq, void *tag); + +// Wrapper that performs a blocking unary call +Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, + google::protobuf::Message *result); + +} // namespace grpc + +#endif diff --git a/include/grpc++/impl/rpc_method.h b/include/grpc++/impl/rpc_method.h index 75fec356dd..bb16e64c96 100644 --- a/include/grpc++/impl/rpc_method.h +++ b/include/grpc++/impl/rpc_method.h @@ -37,8 +37,8 @@ namespace google { namespace protobuf { class Message; -} -} +} // namespace protobuf +} // namespace google namespace grpc { diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h index 620de5e67f..bf62871b7d 100644 --- a/include/grpc++/impl/rpc_service_method.h +++ b/include/grpc++/impl/rpc_service_method.h @@ -55,25 +55,14 @@ class MethodHandler { public: virtual ~MethodHandler() {} struct HandlerParameter { - HandlerParameter(ServerContext* context, + HandlerParameter(Call* c, ServerContext* context, const google::protobuf::Message* req, google::protobuf::Message* resp) - : server_context(context), - request(req), - response(resp), - stream_context(nullptr) {} - HandlerParameter(ServerContext* context, - const google::protobuf::Message* req, - google::protobuf::Message* resp, - StreamContextInterface* stream) - : server_context(context), - request(req), - response(resp), - stream_context(stream) {} + : call(c), server_context(context), request(req), response(resp) {} + Call* call; ServerContext* server_context; const google::protobuf::Message* request; google::protobuf::Message* response; - StreamContextInterface* stream_context; }; virtual Status RunHandler(const HandlerParameter& param) = 0; }; @@ -114,7 +103,7 @@ class ClientStreamingHandler : public MethodHandler { : func_(func), service_(service) {} Status RunHandler(const HandlerParameter& param) final { - ServerReader<RequestType> reader(param.stream_context); + ServerReader<RequestType> reader(param.call, param.server_context); return func_(service_, param.server_context, &reader, dynamic_cast<ResponseType*>(param.response)); } @@ -136,7 +125,7 @@ class ServerStreamingHandler : public MethodHandler { : func_(func), service_(service) {} Status RunHandler(const HandlerParameter& param) final { - ServerWriter<ResponseType> writer(param.stream_context); + ServerWriter<ResponseType> writer(param.call, param.server_context); return func_(service_, param.server_context, dynamic_cast<const RequestType*>(param.request), &writer); } @@ -159,7 +148,8 @@ class BidiStreamingHandler : public MethodHandler { : func_(func), service_(service) {} Status RunHandler(const HandlerParameter& param) final { - ServerReaderWriter<ResponseType, RequestType> stream(param.stream_context); + ServerReaderWriter<ResponseType, RequestType> stream(param.call, + param.server_context); return func_(service_, param.server_context, &stream); } @@ -202,9 +192,7 @@ class RpcServiceMethod : public RpcMethod { class RpcService { public: // Takes ownership. - void AddMethod(RpcServiceMethod* method) { - methods_.push_back(std::unique_ptr<RpcServiceMethod>(method)); - } + void AddMethod(RpcServiceMethod* method) { methods_.emplace_back(method); } RpcServiceMethod* GetMethod(int i) { return methods_[i].get(); } int GetMethodCount() const { return methods_.size(); } diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h new file mode 100644 index 0000000000..221664befe --- /dev/null +++ b/include/grpc++/impl/service_type.h @@ -0,0 +1,127 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPCPP_IMPL_SERVICE_TYPE_H__ +#define __GRPCPP_IMPL_SERVICE_TYPE_H__ + +namespace google { +namespace protobuf { +class Message; +} // namespace protobuf +} // namespace google + +namespace grpc { + +class Call; +class RpcService; +class Server; +class ServerContext; +class Status; + +class SynchronousService { + public: + virtual ~SynchronousService() {} + virtual RpcService* service() = 0; +}; + +class ServerAsyncStreamingInterface { + public: + virtual ~ServerAsyncStreamingInterface() {} + + virtual void SendInitialMetadata(void* tag) = 0; + + private: + friend class Server; + virtual void BindCall(Call* call) = 0; +}; + +class AsynchronousService { + public: + // this is Server, but in disguise to avoid a link dependency + class DispatchImpl { + public: + virtual void RequestAsyncCall(void* registered_method, + ServerContext* context, + ::google::protobuf::Message* request, + ServerAsyncStreamingInterface* stream, + CompletionQueue* cq, void* tag) = 0; + }; + + AsynchronousService(CompletionQueue* cq, const char** method_names, + size_t method_count) + : cq_(cq), method_names_(method_names), method_count_(method_count) {} + + ~AsynchronousService() { delete[] request_args_; } + + CompletionQueue* completion_queue() const { return cq_; } + + protected: + void RequestAsyncUnary(int index, ServerContext* context, + ::google::protobuf::Message* request, + ServerAsyncStreamingInterface* stream, + CompletionQueue* cq, void* tag) { + dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, + stream, cq, tag); + } + void RequestClientStreaming(int index, ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* cq, void* tag) { + dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr, + stream, cq, tag); + } + void RequestServerStreaming(int index, ServerContext* context, + ::google::protobuf::Message* request, + ServerAsyncStreamingInterface* stream, + CompletionQueue* cq, void* tag) { + dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, + stream, cq, tag); + } + void RequestBidiStreaming(int index, ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* cq, void* tag) { + dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr, + stream, cq, tag); + } + + private: + friend class Server; + CompletionQueue* const cq_; + DispatchImpl* dispatch_impl_ = nullptr; + const char** const method_names_; + size_t method_count_; + void** request_args_ = nullptr; +}; + +} // namespace grpc + +#endif // __GRPCPP_IMPL_SERVICE_TYPE_H__ diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 5fa371ba62..410c762375 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -35,12 +35,14 @@ #define __GRPCPP_SERVER_H__ #include <condition_variable> -#include <map> +#include <list> #include <memory> #include <mutex> #include <grpc++/completion_queue.h> #include <grpc++/config.h> +#include <grpc++/impl/call.h> +#include <grpc++/impl/service_type.h> #include <grpc++/status.h> struct grpc_server; @@ -48,18 +50,19 @@ struct grpc_server; namespace google { namespace protobuf { class Message; -} -} +} // namespace protobuf +} // namespace google namespace grpc { -class AsyncServerContext; +class AsynchronousService; class RpcService; class RpcServiceMethod; class ServerCredentials; class ThreadPoolInterface; // Currently it only supports handling rpcs in a single thread. -class Server { +class Server final : private CallHook, + private AsynchronousService::DispatchImpl { public: ~Server(); @@ -69,22 +72,34 @@ class Server { private: friend class ServerBuilder; + class SyncRequest; + class AsyncRequest; + // ServerBuilder use only - Server(ThreadPoolInterface* thread_pool, ServerCredentials* creds); + Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, + ServerCredentials* creds); Server(); // Register a service. This call does not take ownership of the service. // The service must exist for the lifetime of the Server instance. - void RegisterService(RpcService* service); + bool RegisterService(RpcService* service); + bool RegisterAsyncService(AsynchronousService* service); // Add a listening port. Can be called multiple times. - void AddPort(const grpc::string& addr); + int AddPort(const grpc::string& addr); // Start the server. - void Start(); + bool Start(); - void AllowOneRpc(); void HandleQueueClosed(); void RunRpc(); void ScheduleCallback(); + void PerformOpsOnCall(CallOpBuffer* ops, Call* call) override; + + // DispatchImpl + void RequestAsyncCall(void* registered_method, ServerContext* context, + ::google::protobuf::Message* request, + ServerAsyncStreamingInterface* stream, + CompletionQueue* cq, void* tag); + // Completion queue. CompletionQueue cq_; @@ -96,12 +111,11 @@ class Server { int num_running_cb_; std::condition_variable callback_cv_; + std::list<SyncRequest> sync_methods_; + // Pointer to the c grpc server. grpc_server* server_; - // A map for all method information. - std::map<grpc::string, RpcServiceMethod*> method_map_; - ThreadPoolInterface* thread_pool_; // Whether the thread pool is created and owned by the server. bool thread_pool_owned_; diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index cf27452010..a550a53afb 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -41,9 +41,12 @@ namespace grpc { +class AsynchronousService; +class CompletionQueue; class RpcService; class Server; class ServerCredentials; +class SynchronousService; class ThreadPoolInterface; class ServerBuilder { @@ -53,7 +56,13 @@ class ServerBuilder { // Register a service. This call does not take ownership of the service. // The service must exist for the lifetime of the Server instance returned by // BuildAndStart(). - void RegisterService(RpcService* service); + void RegisterService(SynchronousService* service); + + // Register an asynchronous service. New calls will be delevered to cq. + // This call does not take ownership of the service or completion queue. + // The service and completion queuemust exist for the lifetime of the Server + // instance returned by BuildAndStart(). + void RegisterAsyncService(AsynchronousService* service); // Add a listening port. Can be called multiple times. void AddPort(const grpc::string& addr); @@ -71,9 +80,10 @@ class ServerBuilder { private: std::vector<RpcService*> services_; + std::vector<AsynchronousService*> async_services_; std::vector<grpc::string> ports_; std::shared_ptr<ServerCredentials> creds_; - ThreadPoolInterface* thread_pool_; + ThreadPoolInterface* thread_pool_ = nullptr; }; } // namespace grpc diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h index 47fd6cf1c8..853f91f467 100644 --- a/include/grpc++/server_context.h +++ b/include/grpc++/server_context.h @@ -35,15 +35,77 @@ #define __GRPCPP_SERVER_CONTEXT_H_ #include <chrono> +#include <map> + +#include "config.h" + +struct gpr_timespec; +struct grpc_metadata; +struct grpc_call; namespace grpc { +template <class W, class R> +class ServerAsyncReader; +template <class W> +class ServerAsyncWriter; +template <class W> +class ServerAsyncResponseWriter; +template <class R, class W> +class ServerAsyncReaderWriter; +template <class R> +class ServerReader; +template <class W> +class ServerWriter; +template <class R, class W> +class ServerReaderWriter; + +class CallOpBuffer; +class Server; + // Interface of server side rpc context. -class ServerContext { +class ServerContext final { public: - virtual ~ServerContext() {} + ServerContext(); // for async calls + ~ServerContext(); + + std::chrono::system_clock::time_point absolute_deadline() { + return deadline_; + } + + void AddInitialMetadata(const grpc::string& key, const grpc::string& value); + void AddTrailingMetadata(const grpc::string& key, const grpc::string& value); + + std::multimap<grpc::string, grpc::string> client_metadata() { + return client_metadata_; + } + + private: + friend class ::grpc::Server; + template <class W, class R> + friend class ::grpc::ServerAsyncReader; + template <class W> + friend class ::grpc::ServerAsyncWriter; + template <class W> + friend class ::grpc::ServerAsyncResponseWriter; + template <class R, class W> + friend class ::grpc::ServerAsyncReaderWriter; + template <class R> + friend class ::grpc::ServerReader; + template <class W> + friend class ::grpc::ServerWriter; + template <class R, class W> + friend class ::grpc::ServerReaderWriter; + + ServerContext(gpr_timespec deadline, grpc_metadata* metadata, + size_t metadata_count); - virtual std::chrono::system_clock::time_point absolute_deadline() const = 0; + std::chrono::system_clock::time_point deadline_; + grpc_call* call_ = nullptr; + bool sent_initial_metadata_ = false; + std::multimap<grpc::string, grpc::string> client_metadata_; + std::multimap<grpc::string, grpc::string> initial_metadata_; + std::multimap<grpc::string, grpc::string> trailing_metadata_; }; } // namespace grpc diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index b8982f4d93..be5b29589f 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -34,7 +34,12 @@ #ifndef __GRPCPP_STREAM_H__ #define __GRPCPP_STREAM_H__ -#include <grpc++/stream_context_interface.h> +#include <grpc++/channel_interface.h> +#include <grpc++/client_context.h> +#include <grpc++/completion_queue.h> +#include <grpc++/server_context.h> +#include <grpc++/impl/call.h> +#include <grpc++/impl/service_type.h> #include <grpc++/status.h> #include <grpc/support/log.h> @@ -45,16 +50,12 @@ class ClientStreamingInterface { public: virtual ~ClientStreamingInterface() {} - // Try to cancel the stream. Wait() still needs to be called to get the final - // status. Cancelling after the stream has finished has no effects. - virtual void Cancel() = 0; - // Wait until the stream finishes, and return the final status. When the // client side declares it has no more message to send, either implicitly or // by calling WritesDone, it needs to make sure there is no more message to // be received from the server, either implicitly or by getting a false from // a Read(). Otherwise, this implicitly cancels the stream. - virtual const Status& Wait() = 0; + virtual Status Finish() = 0; }; // An interface that yields a sequence of R messages. @@ -82,147 +83,703 @@ class WriterInterface { }; template <class R> -class ClientReader : public ClientStreamingInterface, - public ReaderInterface<R> { +class ClientReader final : public ClientStreamingInterface, + public ReaderInterface<R> { public: // Blocking create a stream and write the first request out. - explicit ClientReader(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); - context_->Write(context_->request(), true); + ClientReader(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, const google::protobuf::Message& request) + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + CallOpBuffer buf; + buf.AddSendInitialMetadata(&context->send_initial_metadata_); + buf.AddSendMessage(request); + buf.AddClientSendClose(); + call_.PerformOps(&buf); + cq_.Pluck(&buf); } - ~ClientReader() { delete context_; } - - virtual bool Read(R* msg) { return context_->Read(msg); } + // Blocking wait for initial metadata from server. The received metadata + // can only be accessed after this call returns. Should only be called before + // the first read. Calling this method is optional, and if it is not called + // the metadata will be available in ClientContext after the first read. + void WaitForInitialMetadata() { + GPR_ASSERT(!context_->initial_metadata_received_); + + CallOpBuffer buf; + buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + context_->initial_metadata_received_ = true; + } - virtual void Cancel() { context_->Cancel(); } + virtual bool Read(R* msg) override { + CallOpBuffer buf; + if (!context_->initial_metadata_received_) { + buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } + buf.AddRecvMessage(msg); + call_.PerformOps(&buf); + return cq_.Pluck(&buf) && buf.got_message; + } - virtual const Status& Wait() { return context_->Wait(); } + virtual Status Finish() override { + CallOpBuffer buf; + Status status; + buf.AddClientRecvStatus(&context_->trailing_metadata_, &status); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + return status; + } private: - StreamContextInterface* const context_; + ClientContext* context_; + CompletionQueue cq_; + Call call_; }; template <class W> -class ClientWriter : public ClientStreamingInterface, - public WriterInterface<W> { +class ClientWriter final : public ClientStreamingInterface, + public WriterInterface<W> { public: // Blocking create a stream. - explicit ClientWriter(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(false); + ClientWriter(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, google::protobuf::Message* response) + : context_(context), + response_(response), + call_(channel->CreateCall(method, context, &cq_)) { + CallOpBuffer buf; + buf.AddSendInitialMetadata(&context->send_initial_metadata_); + call_.PerformOps(&buf); + cq_.Pluck(&buf); } - ~ClientWriter() { delete context_; } - - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + virtual bool Write(const W& msg) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); } - virtual void WritesDone() { context_->Write(nullptr, true); } - - virtual void Cancel() { context_->Cancel(); } + virtual bool WritesDone() { + CallOpBuffer buf; + buf.AddClientSendClose(); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); + } // Read the final response and wait for the final status. - virtual const Status& Wait() { - bool success = context_->Read(context_->response()); - if (!success) { - Cancel(); - } else { - success = context_->Read(nullptr); - if (success) { - Cancel(); - } - } - return context_->Wait(); + virtual Status Finish() override { + CallOpBuffer buf; + Status status; + buf.AddRecvMessage(response_); + buf.AddClientRecvStatus(&context_->trailing_metadata_, &status); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf) && buf.got_message); + return status; } private: - StreamContextInterface* const context_; + ClientContext* context_; + google::protobuf::Message* const response_; + CompletionQueue cq_; + Call call_; }; // Client-side interface for bi-directional streaming. template <class W, class R> -class ClientReaderWriter : public ClientStreamingInterface, - public WriterInterface<W>, - public ReaderInterface<R> { +class ClientReaderWriter final : public ClientStreamingInterface, + public WriterInterface<W>, + public ReaderInterface<R> { public: // Blocking create a stream. - explicit ClientReaderWriter(StreamContextInterface* context) - : context_(context) { - GPR_ASSERT(context_); - context_->Start(false); + ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context) + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + CallOpBuffer buf; + buf.AddSendInitialMetadata(&context->send_initial_metadata_); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); } - ~ClientReaderWriter() { delete context_; } + // Blocking wait for initial metadata from server. The received metadata + // can only be accessed after this call returns. Should only be called before + // the first read. Calling this method is optional, and if it is not called + // the metadata will be available in ClientContext after the first read. + void WaitForInitialMetadata() { + GPR_ASSERT(!context_->initial_metadata_received_); + + CallOpBuffer buf; + buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + context_->initial_metadata_received_ = true; + } - virtual bool Read(R* msg) { return context_->Read(msg); } + virtual bool Read(R* msg) override { + CallOpBuffer buf; + if (!context_->initial_metadata_received_) { + buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } + buf.AddRecvMessage(msg); + call_.PerformOps(&buf); + return cq_.Pluck(&buf) && buf.got_message; + } - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + virtual bool Write(const W& msg) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); } - virtual void WritesDone() { context_->Write(nullptr, true); } + virtual bool WritesDone() { + CallOpBuffer buf; + buf.AddClientSendClose(); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); + } - virtual void Cancel() { context_->Cancel(); } + virtual Status Finish() override { + CallOpBuffer buf; + Status status; + buf.AddClientRecvStatus(&context_->trailing_metadata_, &status); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + return status; + } - virtual const Status& Wait() { return context_->Wait(); } + private: + ClientContext* context_; + CompletionQueue cq_; + Call call_; +}; + +template <class R> +class ServerReader final : public ReaderInterface<R> { + public: + ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + + void SendInitialMetadata() { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + CallOpBuffer buf; + buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&buf); + call_->cq()->Pluck(&buf); + } + + virtual bool Read(R* msg) override { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf) && buf.got_message; + } private: - StreamContextInterface* const context_; + Call* const call_; + ServerContext* const ctx_; +}; + +template <class W> +class ServerWriter final : public WriterInterface<W> { + public: + ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + + void SendInitialMetadata() { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + CallOpBuffer buf; + buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&buf); + call_->cq()->Pluck(&buf); + } + + virtual bool Write(const W& msg) override { + CallOpBuffer buf; + if (!ctx_->sent_initial_metadata_) { + buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + buf.AddSendMessage(msg); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); + } + + private: + Call* const call_; + ServerContext* const ctx_; +}; + +// Server-side interface for bi-directional streaming. +template <class W, class R> +class ServerReaderWriter final : public WriterInterface<W>, + public ReaderInterface<R> { + public: + ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + + void SendInitialMetadata() { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + CallOpBuffer buf; + buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&buf); + call_->cq()->Pluck(&buf); + } + + virtual bool Read(R* msg) override { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf) && buf.got_message; + } + + virtual bool Write(const W& msg) override { + CallOpBuffer buf; + if (!ctx_->sent_initial_metadata_) { + buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + buf.AddSendMessage(msg); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); + } + + private: + Call* const call_; + ServerContext* const ctx_; +}; + +// Async interfaces +// Common interface for all client side streaming. +class ClientAsyncStreamingInterface { + public: + virtual ~ClientAsyncStreamingInterface() {} + + virtual void ReadInitialMetadata(void* tag) = 0; + + virtual void Finish(Status* status, void* tag) = 0; +}; + +// An interface that yields a sequence of R messages. +template <class R> +class AsyncReaderInterface { + public: + virtual ~AsyncReaderInterface() {} + + virtual void Read(R* msg, void* tag) = 0; +}; + +// An interface that can be fed a sequence of W messages. +template <class W> +class AsyncWriterInterface { + public: + virtual ~AsyncWriterInterface() {} + + virtual void Write(const W& msg, void* tag) = 0; }; template <class R> -class ServerReader : public ReaderInterface<R> { +class ClientAsyncReader final : public ClientAsyncStreamingInterface, + public AsyncReaderInterface<R> { public: - explicit ServerReader(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); + // Create a stream and write the first request out. + ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, + const RpcMethod& method, ClientContext* context, + const google::protobuf::Message& request, void* tag) + : context_(context), call_(channel->CreateCall(method, context, cq)) { + init_buf_.Reset(tag); + init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); + init_buf_.AddSendMessage(request); + init_buf_.AddClientSendClose(); + call_.PerformOps(&init_buf_); } - virtual bool Read(R* msg) { return context_->Read(msg); } + void ReadInitialMetadata(void* tag) override { + GPR_ASSERT(!context_->initial_metadata_received_); + + meta_buf_.Reset(tag); + meta_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + call_.PerformOps(&meta_buf_); + context_->initial_metadata_received_ = true; + } + + void Read(R* msg, void* tag) override { + read_buf_.Reset(tag); + if (!context_->initial_metadata_received_) { + read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } + read_buf_.AddRecvMessage(msg); + call_.PerformOps(&read_buf_); + } + + void Finish(Status* status, void* tag) override { + finish_buf_.Reset(tag); + if (!context_->initial_metadata_received_) { + finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } + finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status); + call_.PerformOps(&finish_buf_); + } private: - StreamContextInterface* const context_; // not owned + ClientContext* context_ = nullptr; + Call call_; + CallOpBuffer init_buf_; + CallOpBuffer meta_buf_; + CallOpBuffer read_buf_; + CallOpBuffer finish_buf_; }; template <class W> -class ServerWriter : public WriterInterface<W> { +class ClientAsyncWriter final : public ClientAsyncStreamingInterface, + public AsyncWriterInterface<W> { public: - explicit ServerWriter(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); - context_->Read(context_->request()); + ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, + const RpcMethod& method, ClientContext* context, + google::protobuf::Message* response, void* tag) + : context_(context), + response_(response), + call_(channel->CreateCall(method, context, cq)) { + init_buf_.Reset(tag); + init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); + call_.PerformOps(&init_buf_); + } + + void ReadInitialMetadata(void* tag) override { + GPR_ASSERT(!context_->initial_metadata_received_); + + meta_buf_.Reset(tag); + meta_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + call_.PerformOps(&meta_buf_); + context_->initial_metadata_received_ = true; + } + + void Write(const W& msg, void* tag) override { + write_buf_.Reset(tag); + write_buf_.AddSendMessage(msg); + call_.PerformOps(&write_buf_); } - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + void WritesDone(void* tag) { + writes_done_buf_.Reset(tag); + writes_done_buf_.AddClientSendClose(); + call_.PerformOps(&writes_done_buf_); + } + + void Finish(Status* status, void* tag) override { + finish_buf_.Reset(tag); + if (!context_->initial_metadata_received_) { + finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } + finish_buf_.AddRecvMessage(response_); + finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status); + call_.PerformOps(&finish_buf_); + } + + private: + ClientContext* context_ = nullptr; + google::protobuf::Message* const response_; + Call call_; + CallOpBuffer init_buf_; + CallOpBuffer meta_buf_; + CallOpBuffer write_buf_; + CallOpBuffer writes_done_buf_; + CallOpBuffer finish_buf_; +}; + +// Client-side interface for bi-directional streaming. +template <class W, class R> +class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface, + public AsyncWriterInterface<W>, + public AsyncReaderInterface<R> { + public: + ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq, + const RpcMethod& method, ClientContext* context, + void* tag) + : context_(context), call_(channel->CreateCall(method, context, cq)) { + init_buf_.Reset(tag); + init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); + call_.PerformOps(&init_buf_); + } + + void ReadInitialMetadata(void* tag) override { + GPR_ASSERT(!context_->initial_metadata_received_); + + meta_buf_.Reset(tag); + meta_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + call_.PerformOps(&meta_buf_); + context_->initial_metadata_received_ = true; + } + + void Read(R* msg, void* tag) override { + read_buf_.Reset(tag); + if (!context_->initial_metadata_received_) { + read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } + read_buf_.AddRecvMessage(msg); + call_.PerformOps(&read_buf_); + } + + void Write(const W& msg, void* tag) override { + write_buf_.Reset(tag); + write_buf_.AddSendMessage(msg); + call_.PerformOps(&write_buf_); + } + + void WritesDone(void* tag) { + writes_done_buf_.Reset(tag); + writes_done_buf_.AddClientSendClose(); + call_.PerformOps(&writes_done_buf_); + } + + void Finish(Status* status, void* tag) override { + finish_buf_.Reset(tag); + if (!context_->initial_metadata_received_) { + finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } + finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status); + call_.PerformOps(&finish_buf_); + } + + private: + ClientContext* context_ = nullptr; + Call call_; + CallOpBuffer init_buf_; + CallOpBuffer meta_buf_; + CallOpBuffer read_buf_; + CallOpBuffer write_buf_; + CallOpBuffer writes_done_buf_; + CallOpBuffer finish_buf_; +}; + +// TODO(yangg) Move out of stream.h +template <class W> +class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface { + public: + explicit ServerAsyncResponseWriter(ServerContext* ctx) + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + + void SendInitialMetadata(void* tag) { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + meta_buf_.Reset(tag); + meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_.PerformOps(&meta_buf_); + } + + void Finish(const W& msg, const Status& status, void* tag) { + finish_buf_.Reset(tag); + if (!ctx_->sent_initial_metadata_) { + finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + // The response is dropped if the status is not OK. + if (status.IsOk()) { + finish_buf_.AddSendMessage(msg); + } + bool cancelled = false; + finish_buf_.AddServerRecvClose(&cancelled); + finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_buf_); + } + + void FinishWithError(const Status& status, void* tag) { + GPR_ASSERT(!status.IsOk()); + finish_buf_.Reset(tag); + if (!ctx_->sent_initial_metadata_) { + finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + bool cancelled = false; + finish_buf_.AddServerRecvClose(&cancelled); + finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_buf_); + } + + private: + void BindCall(Call* call) override { call_ = *call; } + + Call call_; + ServerContext* ctx_; + CallOpBuffer meta_buf_; + CallOpBuffer finish_buf_; +}; + +template <class W, class R> +class ServerAsyncReader : public ServerAsyncStreamingInterface, + public AsyncReaderInterface<R> { + public: + explicit ServerAsyncReader(ServerContext* ctx) + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + + void SendInitialMetadata(void* tag) override { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + meta_buf_.Reset(tag); + meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_.PerformOps(&meta_buf_); + } + + void Read(R* msg, void* tag) override { + read_buf_.Reset(tag); + read_buf_.AddRecvMessage(msg); + call_.PerformOps(&read_buf_); + } + + void Finish(const W& msg, const Status& status, void* tag) { + finish_buf_.Reset(tag); + if (!ctx_->sent_initial_metadata_) { + finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + // The response is dropped if the status is not OK. + if (status.IsOk()) { + finish_buf_.AddSendMessage(msg); + } + bool cancelled = false; + finish_buf_.AddServerRecvClose(&cancelled); + finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_buf_); + } + + void FinishWithError(const Status& status, void* tag) { + GPR_ASSERT(!status.IsOk()); + finish_buf_.Reset(tag); + if (!ctx_->sent_initial_metadata_) { + finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + bool cancelled = false; + finish_buf_.AddServerRecvClose(&cancelled); + finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_buf_); + } + + private: + void BindCall(Call* call) override { call_ = *call; } + + Call call_; + ServerContext* ctx_; + CallOpBuffer meta_buf_; + CallOpBuffer read_buf_; + CallOpBuffer finish_buf_; +}; + +template <class W> +class ServerAsyncWriter : public ServerAsyncStreamingInterface, + public AsyncWriterInterface<W> { + public: + explicit ServerAsyncWriter(ServerContext* ctx) + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + + void SendInitialMetadata(void* tag) override { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + meta_buf_.Reset(tag); + meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_.PerformOps(&meta_buf_); + } + + void Write(const W& msg, void* tag) override { + write_buf_.Reset(tag); + if (!ctx_->sent_initial_metadata_) { + write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + write_buf_.AddSendMessage(msg); + call_.PerformOps(&write_buf_); + } + + void Finish(const Status& status, void* tag) { + finish_buf_.Reset(tag); + if (!ctx_->sent_initial_metadata_) { + finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + bool cancelled = false; + finish_buf_.AddServerRecvClose(&cancelled); + finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_buf_); } private: - StreamContextInterface* const context_; // not owned + void BindCall(Call* call) override { call_ = *call; } + + Call call_; + ServerContext* ctx_; + CallOpBuffer meta_buf_; + CallOpBuffer write_buf_; + CallOpBuffer finish_buf_; }; // Server-side interface for bi-directional streaming. template <class W, class R> -class ServerReaderWriter : public WriterInterface<W>, - public ReaderInterface<R> { +class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface, + public AsyncWriterInterface<W>, + public AsyncReaderInterface<R> { public: - explicit ServerReaderWriter(StreamContextInterface* context) - : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); + explicit ServerAsyncReaderWriter(ServerContext* ctx) + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + + void SendInitialMetadata(void* tag) override { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + meta_buf_.Reset(tag); + meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_.PerformOps(&meta_buf_); } - virtual bool Read(R* msg) { return context_->Read(msg); } + virtual void Read(R* msg, void* tag) override { + read_buf_.Reset(tag); + read_buf_.AddRecvMessage(msg); + call_.PerformOps(&read_buf_); + } - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + virtual void Write(const W& msg, void* tag) override { + write_buf_.Reset(tag); + if (!ctx_->sent_initial_metadata_) { + write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + write_buf_.AddSendMessage(msg); + call_.PerformOps(&write_buf_); + } + + void Finish(const Status& status, void* tag) { + finish_buf_.Reset(tag); + if (!ctx_->sent_initial_metadata_) { + finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + bool cancelled = false; + finish_buf_.AddServerRecvClose(&cancelled); + finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_buf_); } private: - StreamContextInterface* const context_; // not owned + void BindCall(Call* call) override { call_ = *call; } + + Call call_; + ServerContext* ctx_; + CallOpBuffer meta_buf_; + CallOpBuffer read_buf_; + CallOpBuffer write_buf_; + CallOpBuffer finish_buf_; }; } // namespace grpc diff --git a/include/grpc++/stream_context_interface.h b/include/grpc++/stream_context_interface.h deleted file mode 100644 index a84119800b..0000000000 --- a/include/grpc++/stream_context_interface.h +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef __GRPCPP_STREAM_CONTEXT_INTERFACE_H__ -#define __GRPCPP_STREAM_CONTEXT_INTERFACE_H__ - -namespace google { -namespace protobuf { -class Message; -} -} - -namespace grpc { -class Status; - -// An interface to avoid dependency on internal implementation. -class StreamContextInterface { - public: - virtual ~StreamContextInterface() {} - - virtual void Start(bool buffered) = 0; - - virtual bool Read(google::protobuf::Message* msg) = 0; - virtual bool Write(const google::protobuf::Message* msg, bool is_last) = 0; - virtual const Status& Wait() = 0; - virtual void Cancel() = 0; - - virtual google::protobuf::Message* request() = 0; - virtual google::protobuf::Message* response() = 0; -}; - -} // namespace grpc - -#endif // __GRPCPP_STREAM_CONTEXT_INTERFACE_H__ diff --git a/include/grpc/grpc b/include/grpc/grpc new file mode 120000 index 0000000000..fc80ad1c86 --- /dev/null +++ b/include/grpc/grpc @@ -0,0 +1 @@ +/home/craig/grpc-ct/include/grpc
\ No newline at end of file diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 904853d984..621740e038 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -255,15 +255,18 @@ void grpc_call_details_init(grpc_call_details *details); void grpc_call_details_destroy(grpc_call_details *details); typedef enum { - /* Send initial metadata: one and only one instance MUST be sent for each call, + /* Send initial metadata: one and only one instance MUST be sent for each + call, unless the call was cancelled - in which case this can be skipped */ GRPC_OP_SEND_INITIAL_METADATA = 0, /* Send a message: 0 or more of these operations can occur for each call */ GRPC_OP_SEND_MESSAGE, - /* Send a close from the server: one and only one instance MUST be sent from the client, + /* Send a close from the server: one and only one instance MUST be sent from + the client, unless the call was cancelled - in which case this can be skipped */ GRPC_OP_SEND_CLOSE_FROM_CLIENT, - /* Send status from the server: one and only one instance MUST be sent from the server + /* Send status from the server: one and only one instance MUST be sent from + the server unless the call was cancelled - in which case this can be skipped */ GRPC_OP_SEND_STATUS_FROM_SERVER, /* Receive initial metadata: one and only one MUST be made on the client, must @@ -271,13 +274,16 @@ typedef enum { GRPC_OP_RECV_INITIAL_METADATA, /* Receive a message: 0 or more of these operations can occur for each call */ GRPC_OP_RECV_MESSAGE, - /* Receive status on the client: one and only one must be made on the client */ + /* Receive status on the client: one and only one must be made on the client + */ GRPC_OP_RECV_STATUS_ON_CLIENT, - /* Receive status on the server: one and only one must be made on the server */ + /* Receive status on the server: one and only one must be made on the server + */ GRPC_OP_RECV_CLOSE_ON_SERVER } grpc_op_type; -/* Operation data: one field for each op type (except SEND_CLOSE_FROM_CLIENT which has +/* Operation data: one field for each op type (except SEND_CLOSE_FROM_CLIENT + which has no arguments) */ typedef struct grpc_op { grpc_op_type op; @@ -301,29 +307,33 @@ typedef struct grpc_op { grpc_metadata_array *recv_initial_metadata; grpc_byte_buffer **recv_message; struct { - /* ownership of the array is with the caller, but ownership of the elements + /* ownership of the array is with the caller, but ownership of the + elements stays with the call object (ie key, value members are owned by the call object, trailing_metadata->array is owned by the caller). After the operation completes, call grpc_metadata_array_destroy on this value, or reuse it in a future op. */ grpc_metadata_array *trailing_metadata; grpc_status_code *status; - /* status_details is a buffer owned by the application before the op completes - and after the op has completed. During the operation status_details may be - reallocated to a size larger than *status_details_capacity, in which case + /* status_details is a buffer owned by the application before the op + completes + and after the op has completed. During the operation status_details may + be + reallocated to a size larger than *status_details_capacity, in which + case *status_details_capacity will be updated with the new array capacity. Pre-allocating space: size_t my_capacity = 8; char *my_details = gpr_malloc(my_capacity); x.status_details = &my_details; - x.status_details_capacity = &my_capacity; + x.status_details_capacity = &my_capacity; Not pre-allocating space: size_t my_capacity = 0; char *my_details = NULL; x.status_details = &my_details; - x.status_details_capacity = &my_capacity; + x.status_details_capacity = &my_capacity; After the call: gpr_free(my_details); */ @@ -331,7 +341,8 @@ typedef struct grpc_op { size_t *status_details_capacity; } recv_status_on_client; struct { - /* out argument, set to 1 if the call failed in any way (seen as a cancellation + /* out argument, set to 1 if the call failed in any way (seen as a + cancellation on the server), or 0 if the call succeeded */ int *cancelled; } recv_close_on_server; @@ -393,7 +404,7 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, gpr_timespec deadline); /* Start a batch of operations defined in the array ops; when complete, post a - completion of type 'tag' to the completion queue bound to the call. + completion of type 'tag' to the completion queue bound to the call. The order of ops specified in the batch has no significance. Only one operation of each type can be active at once in any given batch. */ @@ -540,10 +551,30 @@ void grpc_call_destroy(grpc_call *call); grpc_call_error grpc_server_request_call_old(grpc_server *server, void *tag_new); +/* Request notification of a new call */ grpc_call_error grpc_server_request_call( grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *request_metadata, - grpc_completion_queue *completion_queue, void *tag_new); + grpc_completion_queue *cq_bound_to_call, + void *tag_new); + +/* Registers a method in the server. + Methods to this (host, method) pair will not be reported by + grpc_server_request_call, but instead be reported by + grpc_server_request_registered_call when passed the appropriate + registered_method (as returned by this function). + Must be called before grpc_server_start. + Returns NULL on failure. */ +void *grpc_server_register_method(grpc_server *server, const char *method, + const char *host, + grpc_completion_queue *new_call_cq); + +/* Request notification of a new pre-registered call */ +grpc_call_error grpc_server_request_registered_call( + grpc_server *server, void *registered_method, grpc_call **call, + gpr_timespec *deadline, grpc_metadata_array *request_metadata, + grpc_byte_buffer **optional_payload, + grpc_completion_queue *cq_bound_to_call, void *tag_new); /* Create a server */ grpc_server *grpc_server_create(grpc_completion_queue *cq, diff --git a/include/grpc++/async_server.h b/include/grpc/support/cpu.h index fe2c5d9367..9025f7c21f 100644 --- a/include/grpc++/async_server.h +++ b/include/grpc/support/cpu.h @@ -31,40 +31,27 @@ * */ -#ifndef __GRPCPP_ASYNC_SERVER_H__ -#define __GRPCPP_ASYNC_SERVER_H__ +#ifndef __GRPC_INTERNAL_SUPPORT_CPU_H__ +#define __GRPC_INTERNAL_SUPPORT_CPU_H__ -#include <mutex> +#ifdef __cplusplus +extern "C" { +#endif -#include <grpc++/config.h> +/* Interface providing CPU information for currently running system */ -struct grpc_server; +/* Return the number of CPU cores on the current system. Will return 0 if + if information is not available. */ +unsigned gpr_cpu_num_cores(void); -namespace grpc { -class CompletionQueue; +/* Return the CPU on which the current thread is executing; N.B. This should + be considered advisory only - it is possible that the thread is switched + to a different CPU at any time. Returns a value in range + [0, gpr_cpu_num_cores() - 1] */ +unsigned gpr_cpu_current_cpu(void); -class AsyncServer { - public: - explicit AsyncServer(CompletionQueue* cc); - ~AsyncServer(); +#ifdef __cplusplus +} // extern "C" +#endif - void AddPort(const grpc::string& addr); - - void Start(); - - // The user has to call this to get one new rpc on the completion - // queue. - void RequestOneRpc(); - - void Shutdown(); - - private: - bool started_; - std::mutex shutdown_mu_; - bool shutdown_; - grpc_server* server_; -}; - -} // namespace grpc - -#endif // __GRPCPP_ASYNC_SERVER_H__ +#endif /* __GRPC_INTERNAL_SUPPORT_CPU_H__ */ |