diff options
Diffstat (limited to 'include')
-rw-r--r-- | include/grpc++/call.h (renamed from include/grpc++/async_server_context.h) | 84 | ||||
-rw-r--r-- | include/grpc++/channel_interface.h | 24 | ||||
-rw-r--r-- | include/grpc++/completion_queue.h | 69 | ||||
-rw-r--r-- | include/grpc++/config.h | 3 | ||||
-rw-r--r-- | include/grpc++/impl/client_unary_call.h | 67 | ||||
-rw-r--r-- | include/grpc++/impl/rpc_method.h | 4 | ||||
-rw-r--r-- | include/grpc++/impl/rpc_service_method.h | 27 | ||||
-rw-r--r-- | include/grpc++/impl/service_type.h (renamed from include/grpc++/async_server.h) | 41 | ||||
-rw-r--r-- | include/grpc++/server.h | 13 | ||||
-rw-r--r-- | include/grpc++/server_builder.h | 9 | ||||
-rw-r--r-- | include/grpc++/server_context.h | 8 | ||||
-rw-r--r-- | include/grpc++/stream.h | 408 | ||||
-rw-r--r-- | include/grpc/support/cpu.h (renamed from include/grpc++/stream_context_interface.h) | 43 |
13 files changed, 556 insertions, 244 deletions
diff --git a/include/grpc++/async_server_context.h b/include/grpc++/call.h index c038286ac1..de789febe6 100644 --- a/include/grpc++/async_server_context.h +++ b/include/grpc++/call.h @@ -31,65 +31,73 @@ * */ -#ifndef __GRPCPP_ASYNC_SERVER_CONTEXT_H__ -#define __GRPCPP_ASYNC_SERVER_CONTEXT_H__ +#ifndef __GRPCPP_CALL_H__ +#define __GRPCPP_CALL_H__ -#include <chrono> +#include <grpc++/status.h> +#include <grpc++/completion_queue.h> -#include <grpc++/config.h> - -struct grpc_byte_buffer; -struct grpc_call; -struct grpc_completion_queue; +#include <memory> +#include <vector> namespace google { namespace protobuf { class Message; -} -} +} // namespace protobuf +} // namespace google -using std::chrono::system_clock; +struct grpc_call; +struct grpc_op; namespace grpc { -class Status; -// TODO(rocking): wrap grpc c structures. -class AsyncServerContext { +class ChannelInterface; + +class CallOpBuffer final : public CompletionQueueTag { public: - AsyncServerContext(grpc_call* call, const grpc::string& method, - const grpc::string& host, - system_clock::time_point absolute_deadline); - ~AsyncServerContext(); + CallOpBuffer() : return_tag_(this) {} - // Accept this rpc, bind it to a completion queue. - void Accept(grpc_completion_queue* cq); + void Reset(void *next_return_tag); - // Read and write calls, all async. Return true for success. - bool StartRead(google::protobuf::Message* request); - bool StartWrite(const google::protobuf::Message& response, int flags); - bool StartWriteStatus(const Status& status); + void AddSendInitialMetadata(std::vector<std::pair<grpc::string, grpc::string> > *metadata); + void AddSendMessage(const google::protobuf::Message &message); + void AddRecvMessage(google::protobuf::Message *message); + void AddClientSendClose(); + void AddClientRecvStatus(Status *status); - bool ParseRead(grpc_byte_buffer* read_buffer); + // INTERNAL API: - grpc::string method() const { return method_; } - grpc::string host() const { return host_; } - system_clock::time_point absolute_deadline() { return absolute_deadline_; } + // Convert to an array of grpc_op elements + void FillOps(grpc_op *ops, size_t *nops); - grpc_call* call() { return call_; } + // Called by completion queue just prior to returning from Next() or Pluck() + void FinalizeResult(void *tag, bool *status) override; private: - AsyncServerContext(const AsyncServerContext&); - AsyncServerContext& operator=(const AsyncServerContext&); + void *return_tag_; +}; + +class CCallDeleter { + public: + void operator()(grpc_call *c); +}; + +// Straightforward wrapping of the C call object +class Call final { + public: + Call(grpc_call *call, ChannelInterface *channel, CompletionQueue *cq); + + void PerformOps(CallOpBuffer *buffer); - // These properties may be moved to a ServerContext class. - const grpc::string method_; - const grpc::string host_; - system_clock::time_point absolute_deadline_; + grpc_call *call() { return call_.get(); } + CompletionQueue *cq() { return cq_; } - google::protobuf::Message* request_; // not owned - grpc_call* call_; // owned + private: + ChannelInterface *channel_; + CompletionQueue *cq_; + std::unique_ptr<grpc_call, CCallDeleter> call_; }; } // namespace grpc -#endif // __GRPCPP_ASYNC_SERVER_CONTEXT_H__ +#endif // __GRPCPP_CALL_INTERFACE_H__ diff --git a/include/grpc++/channel_interface.h b/include/grpc++/channel_interface.h index 9ed35422b8..3631ea4d5d 100644 --- a/include/grpc++/channel_interface.h +++ b/include/grpc++/channel_interface.h @@ -39,28 +39,26 @@ namespace google { namespace protobuf { class Message; -} -} +} // namespace protobuf +} // namespace google -namespace grpc { +struct grpc_call; +namespace grpc { +class Call; +class CallOpBuffer; class ClientContext; +class CompletionQueue; class RpcMethod; -class StreamContextInterface; +class CallInterface; class ChannelInterface { public: virtual ~ChannelInterface() {} - virtual Status StartBlockingRpc(const RpcMethod& method, - ClientContext* context, - const google::protobuf::Message& request, - google::protobuf::Message* result) = 0; - - virtual StreamContextInterface* CreateStream( - const RpcMethod& method, ClientContext* context, - const google::protobuf::Message* request, - google::protobuf::Message* result) = 0; + virtual Call CreateCall(const RpcMethod &method, ClientContext *context, + CompletionQueue *cq) = 0; + virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) = 0; }; } // namespace grpc diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index 72f6253f8e..c976bd5b45 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -34,51 +34,66 @@ #ifndef __GRPCPP_COMPLETION_QUEUE_H__ #define __GRPCPP_COMPLETION_QUEUE_H__ +#include <grpc++/impl/client_unary_call.h> + struct grpc_completion_queue; namespace grpc { +template <class R> +class ClientReader; +template <class W> +class ClientWriter; +template <class R, class W> +class ClientReaderWriter; +template <class R> +class ServerReader; +template <class W> +class ServerWriter; +template <class R, class W> +class ServerReaderWriter; + +class CompletionQueue; + +class CompletionQueueTag { + public: + // Called prior to returning from Next(), return value + // is the status of the operation (return status is the default thing + // to do) + virtual void FinalizeResult(void *tag, bool *status) = 0; +}; + // grpc_completion_queue wrapper class class CompletionQueue { public: CompletionQueue(); ~CompletionQueue(); - enum CompletionType { - QUEUE_CLOSED = 0, // Shutting down. - RPC_END = 1, // An RPC finished. Either at client or server. - CLIENT_READ_OK = 2, // A client-side read has finished successfully. - CLIENT_READ_ERROR = 3, // A client-side read has finished with error. - CLIENT_WRITE_OK = 4, - CLIENT_WRITE_ERROR = 5, - SERVER_RPC_NEW = 6, // A new RPC just arrived at the server. - SERVER_READ_OK = 7, // A server-side read has finished successfully. - SERVER_READ_ERROR = 8, // A server-side read has finished with error. - SERVER_WRITE_OK = 9, - SERVER_WRITE_ERROR = 10, - // Client or server has sent half close successfully. - HALFCLOSE_OK = 11, - // New CompletionTypes may be added in the future, so user code should - // always - // handle the default case of a CompletionType that appears after such code - // was - // written. - DO_NOT_USE = 20, - }; - // Blocking read from queue. - // For QUEUE_CLOSED, *tag is not changed. - // For SERVER_RPC_NEW, *tag will be a newly allocated AsyncServerContext. - // For others, *tag will be the AsyncServerContext of this rpc. - CompletionType Next(void** tag); + // Returns true if an event was received, false if the queue is ready + // for destruction. + bool Next(void **tag, bool *ok); // Shutdown has to be called, and the CompletionQueue can only be - // destructed when the QUEUE_CLOSED message has been read with Next(). + // destructed when false is returned from Next(). void Shutdown(); grpc_completion_queue* cq() { return cq_; } private: + template <class R> friend class ::grpc::ClientReader; + template <class W> friend class ::grpc::ClientWriter; + template <class R, class W> friend class ::grpc::ClientReaderWriter; + template <class R> friend class ::grpc::ServerReader; + template <class W> friend class ::grpc::ServerWriter; + template <class R, class W> friend class ::grpc::ServerReaderWriter; + friend Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, + google::protobuf::Message *result); + + bool Pluck(CompletionQueueTag *tag); + grpc_completion_queue* cq_; // owned }; diff --git a/include/grpc++/config.h b/include/grpc++/config.h index 52913fbf0f..1b4b463d35 100644 --- a/include/grpc++/config.h +++ b/include/grpc++/config.h @@ -39,6 +39,7 @@ namespace grpc { typedef std::string string; -} + +} // namespace grpc #endif // __GRPCPP_CONFIG_H__ diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h new file mode 100644 index 0000000000..091430b884 --- /dev/null +++ b/include/grpc++/impl/client_unary_call.h @@ -0,0 +1,67 @@ +/* +* +* Copyright 2014, Google Inc. +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions are +* met: +* +* * Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above +* copyright notice, this list of conditions and the following disclaimer +* in the documentation and/or other materials provided with the +* distribution. +* * Neither the name of Google Inc. nor the names of its +* contributors may be used to endorse or promote products derived from +* this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +* +*/ + +#ifndef __GRPCPP_CLIENT_UNARY_CALL_H__ +#define __GRPCPP_CLIENT_UNARY_CALL_H__ + +namespace google { +namespace protobuf { +class Message; +} // namespace protobuf +} // namespace google + +namespace grpc { + +class ChannelInterface; +class ClientContext; +class CompletionQueue; +class RpcMethod; +class Status; + +// Wrapper that begins an asynchronous unary call +void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, + google::protobuf::Message *result, Status *status, + CompletionQueue *cq, void *tag); + +// Wrapper that performs a blocking unary call +Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, + google::protobuf::Message *result); + +} // namespace grpc + +#endif + diff --git a/include/grpc++/impl/rpc_method.h b/include/grpc++/impl/rpc_method.h index 75fec356dd..bb16e64c96 100644 --- a/include/grpc++/impl/rpc_method.h +++ b/include/grpc++/impl/rpc_method.h @@ -37,8 +37,8 @@ namespace google { namespace protobuf { class Message; -} -} +} // namespace protobuf +} // namespace google namespace grpc { diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h index 620de5e67f..0fb4f79b59 100644 --- a/include/grpc++/impl/rpc_service_method.h +++ b/include/grpc++/impl/rpc_service_method.h @@ -55,25 +55,18 @@ class MethodHandler { public: virtual ~MethodHandler() {} struct HandlerParameter { - HandlerParameter(ServerContext* context, + HandlerParameter(Call *c, + ServerContext* context, const google::protobuf::Message* req, google::protobuf::Message* resp) - : server_context(context), + : call(c), + server_context(context), request(req), - response(resp), - stream_context(nullptr) {} - HandlerParameter(ServerContext* context, - const google::protobuf::Message* req, - google::protobuf::Message* resp, - StreamContextInterface* stream) - : server_context(context), - request(req), - response(resp), - stream_context(stream) {} + response(resp) {} + Call* call; ServerContext* server_context; const google::protobuf::Message* request; google::protobuf::Message* response; - StreamContextInterface* stream_context; }; virtual Status RunHandler(const HandlerParameter& param) = 0; }; @@ -114,7 +107,7 @@ class ClientStreamingHandler : public MethodHandler { : func_(func), service_(service) {} Status RunHandler(const HandlerParameter& param) final { - ServerReader<RequestType> reader(param.stream_context); + ServerReader<RequestType> reader(param.call); return func_(service_, param.server_context, &reader, dynamic_cast<ResponseType*>(param.response)); } @@ -136,7 +129,7 @@ class ServerStreamingHandler : public MethodHandler { : func_(func), service_(service) {} Status RunHandler(const HandlerParameter& param) final { - ServerWriter<ResponseType> writer(param.stream_context); + ServerWriter<ResponseType> writer(param.call); return func_(service_, param.server_context, dynamic_cast<const RequestType*>(param.request), &writer); } @@ -159,7 +152,7 @@ class BidiStreamingHandler : public MethodHandler { : func_(func), service_(service) {} Status RunHandler(const HandlerParameter& param) final { - ServerReaderWriter<ResponseType, RequestType> stream(param.stream_context); + ServerReaderWriter<ResponseType, RequestType> stream(param.call); return func_(service_, param.server_context, &stream); } @@ -203,7 +196,7 @@ class RpcService { public: // Takes ownership. void AddMethod(RpcServiceMethod* method) { - methods_.push_back(std::unique_ptr<RpcServiceMethod>(method)); + methods_.emplace_back(method); } RpcServiceMethod* GetMethod(int i) { return methods_[i].get(); } diff --git a/include/grpc++/async_server.h b/include/grpc++/impl/service_type.h index fe2c5d9367..0684f322d8 100644 --- a/include/grpc++/async_server.h +++ b/include/grpc++/impl/service_type.h @@ -31,40 +31,25 @@ * */ -#ifndef __GRPCPP_ASYNC_SERVER_H__ -#define __GRPCPP_ASYNC_SERVER_H__ - -#include <mutex> - -#include <grpc++/config.h> - -struct grpc_server; +#ifndef __GRPCPP_IMPL_SERVICE_TYPE_H__ +#define __GRPCPP_IMPL_SERVICE_TYPE_H__ namespace grpc { -class CompletionQueue; - -class AsyncServer { - public: - explicit AsyncServer(CompletionQueue* cc); - ~AsyncServer(); - void AddPort(const grpc::string& addr); +class RpcService; - void Start(); - - // The user has to call this to get one new rpc on the completion - // queue. - void RequestOneRpc(); - - void Shutdown(); +class SynchronousService { + public: + virtual ~SynchronousService() {} + virtual RpcService *service() = 0; +}; - private: - bool started_; - std::mutex shutdown_mu_; - bool shutdown_; - grpc_server* server_; +class AsynchronousService { + public: + virtual ~AsynchronousService() {} + virtual RpcService *service() = 0; }; } // namespace grpc -#endif // __GRPCPP_ASYNC_SERVER_H__ +#endif // __GRPCPP_IMPL_SERVICE_TYPE_H__
\ No newline at end of file diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 5fa371ba62..670ffa7815 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -48,8 +48,8 @@ struct grpc_server; namespace google { namespace protobuf { class Message; -} -} +} // namespace protobuf +} // namespace google namespace grpc { class AsyncServerContext; @@ -70,17 +70,16 @@ class Server { friend class ServerBuilder; // ServerBuilder use only - Server(ThreadPoolInterface* thread_pool, ServerCredentials* creds); + Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, ServerCredentials* creds); Server(); // Register a service. This call does not take ownership of the service. // The service must exist for the lifetime of the Server instance. - void RegisterService(RpcService* service); + bool RegisterService(RpcService* service); // Add a listening port. Can be called multiple times. - void AddPort(const grpc::string& addr); + int AddPort(const grpc::string& addr); // Start the server. - void Start(); + bool Start(); - void AllowOneRpc(); void HandleQueueClosed(); void RunRpc(); void ScheduleCallback(); diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index cf27452010..8b4c81bc87 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -41,9 +41,11 @@ namespace grpc { +class AsynchronousService; class RpcService; class Server; class ServerCredentials; +class SynchronousService; class ThreadPoolInterface; class ServerBuilder { @@ -53,7 +55,9 @@ class ServerBuilder { // Register a service. This call does not take ownership of the service. // The service must exist for the lifetime of the Server instance returned by // BuildAndStart(). - void RegisterService(RpcService* service); + void RegisterService(SynchronousService* service); + + void RegisterAsyncService(AsynchronousService *service); // Add a listening port. Can be called multiple times. void AddPort(const grpc::string& addr); @@ -71,9 +75,10 @@ class ServerBuilder { private: std::vector<RpcService*> services_; + std::vector<AsynchronousService*> async_services_; std::vector<grpc::string> ports_; std::shared_ptr<ServerCredentials> creds_; - ThreadPoolInterface* thread_pool_; + ThreadPoolInterface* thread_pool_ = nullptr; }; } // namespace grpc diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h index 47fd6cf1c8..4af9fd6aaa 100644 --- a/include/grpc++/server_context.h +++ b/include/grpc++/server_context.h @@ -35,6 +35,9 @@ #define __GRPCPP_SERVER_CONTEXT_H_ #include <chrono> +#include <vector> + +#include "config.h" namespace grpc { @@ -43,7 +46,10 @@ class ServerContext { public: virtual ~ServerContext() {} - virtual std::chrono::system_clock::time_point absolute_deadline() const = 0; + std::chrono::system_clock::time_point absolute_deadline(); + + private: + std::vector<std::pair<grpc::string, grpc::string> > metadata_; }; } // namespace grpc diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index b8982f4d93..c30825a7a5 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -34,7 +34,9 @@ #ifndef __GRPCPP_STREAM_H__ #define __GRPCPP_STREAM_H__ -#include <grpc++/stream_context_interface.h> +#include <grpc++/call.h> +#include <grpc++/channel_interface.h> +#include <grpc++/completion_queue.h> #include <grpc++/status.h> #include <grpc/support/log.h> @@ -45,16 +47,12 @@ class ClientStreamingInterface { public: virtual ~ClientStreamingInterface() {} - // Try to cancel the stream. Wait() still needs to be called to get the final - // status. Cancelling after the stream has finished has no effects. - virtual void Cancel() = 0; - // Wait until the stream finishes, and return the final status. When the // client side declares it has no more message to send, either implicitly or // by calling WritesDone, it needs to make sure there is no more message to // be received from the server, either implicitly or by getting a false from // a Read(). Otherwise, this implicitly cancels the stream. - virtual const Status& Wait() = 0; + virtual Status Finish() = 0; }; // An interface that yields a sequence of R messages. @@ -82,147 +80,391 @@ class WriterInterface { }; template <class R> -class ClientReader : public ClientStreamingInterface, - public ReaderInterface<R> { +class ClientReader final : public ClientStreamingInterface, + public ReaderInterface<R> { public: // Blocking create a stream and write the first request out. - explicit ClientReader(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); - context_->Write(context_->request(), true); + ClientReader(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request) + : call_(channel->CreateCall(method, context, &cq_)) { + CallOpBuffer buf; + buf.AddSendMessage(request); + buf.AddClientSendClose(); + call_.PerformOps(&buf); + cq_.Pluck(&buf); } - ~ClientReader() { delete context_; } + virtual bool Read(R *msg) override { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); + } - virtual bool Read(R* msg) { return context_->Read(msg); } + virtual Status Finish() override { + CallOpBuffer buf; + Status status; + buf.AddClientRecvStatus(&status); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + return status; + } - virtual void Cancel() { context_->Cancel(); } + private: + CompletionQueue cq_; + Call call_; +}; - virtual const Status& Wait() { return context_->Wait(); } +template <class W> +class ClientWriter final : public ClientStreamingInterface, + public WriterInterface<W> { + public: + // Blocking create a stream. + ClientWriter(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + google::protobuf::Message *response) + : response_(response), + call_(channel->CreateCall(method, context, &cq_)) {} + + virtual bool Write(const W& msg) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); + } + + virtual bool WritesDone() { + CallOpBuffer buf; + buf.AddClientSendClose(); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); + } + + // Read the final response and wait for the final status. + virtual Status Finish() override { + CallOpBuffer buf; + Status status; + buf.AddRecvMessage(response_); + buf.AddClientRecvStatus(&status); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + return status; + } private: - StreamContextInterface* const context_; + google::protobuf::Message *const response_; + CompletionQueue cq_; + Call call_; }; -template <class W> -class ClientWriter : public ClientStreamingInterface, - public WriterInterface<W> { +// Client-side interface for bi-directional streaming. +template <class W, class R> +class ClientReaderWriter final : public ClientStreamingInterface, + public WriterInterface<W>, + public ReaderInterface<R> { public: // Blocking create a stream. - explicit ClientWriter(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(false); + ClientReaderWriter(ChannelInterface *channel, + const RpcMethod &method, ClientContext *context) + : call_(channel->CreateCall(method, context, &cq_)) {} + + virtual bool Read(R *msg) override { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); } - ~ClientWriter() { delete context_; } + virtual bool Write(const W& msg) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); + } - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + virtual bool WritesDone() { + CallOpBuffer buf; + buf.AddClientSendClose(); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); } - virtual void WritesDone() { context_->Write(nullptr, true); } + virtual Status Finish() override { + CallOpBuffer buf; + Status status; + buf.AddClientRecvStatus(&status); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + return status; + } - virtual void Cancel() { context_->Cancel(); } + private: + CompletionQueue cq_; + Call call_; +}; - // Read the final response and wait for the final status. - virtual const Status& Wait() { - bool success = context_->Read(context_->response()); - if (!success) { - Cancel(); - } else { - success = context_->Read(nullptr); - if (success) { - Cancel(); - } - } - return context_->Wait(); +template <class R> +class ServerReader final : public ReaderInterface<R> { + public: + explicit ServerReader(Call* call) : call_(call) {} + + virtual bool Read(R* msg) override { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); } private: - StreamContextInterface* const context_; + Call* call_; }; -// Client-side interface for bi-directional streaming. +template <class W> +class ServerWriter final : public WriterInterface<W> { + public: + explicit ServerWriter(Call* call) : call_(call) {} + + virtual bool Write(const W& msg) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); + } + + private: + Call* call_; +}; + +// Server-side interface for bi-directional streaming. template <class W, class R> -class ClientReaderWriter : public ClientStreamingInterface, - public WriterInterface<W>, +class ServerReaderWriter final : public WriterInterface<W>, public ReaderInterface<R> { public: - // Blocking create a stream. - explicit ClientReaderWriter(StreamContextInterface* context) - : context_(context) { - GPR_ASSERT(context_); - context_->Start(false); + explicit ServerReaderWriter(Call* call) : call_(call) {} + + virtual bool Read(R* msg) override { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); } - ~ClientReaderWriter() { delete context_; } + virtual bool Write(const W& msg) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); + } + + private: + CompletionQueue* cq_; + Call* call_; +}; + +// Async interfaces +// Common interface for all client side streaming. +class ClientAsyncStreamingInterface { + public: + virtual ~ClientAsyncStreamingInterface() {} + + virtual void Finish(Status* status, void* tag) = 0; +}; + +// An interface that yields a sequence of R messages. +template <class R> +class AsyncReaderInterface { + public: + virtual ~AsyncReaderInterface() {} + + virtual void Read(R* msg, void* tag) = 0; +}; - virtual bool Read(R* msg) { return context_->Read(msg); } +// An interface that can be fed a sequence of W messages. +template <class W> +class AsyncWriterInterface { + public: + virtual ~AsyncWriterInterface() {} - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + virtual void Write(const W& msg, void* tag) = 0; +}; + +template <class R> +class ClientAsyncReader final : public ClientAsyncStreamingInterface, + public AsyncReaderInterface<R> { + public: + // Blocking create a stream and write the first request out. + ClientAsyncReader(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, void* tag) + : call_(channel->CreateCall(method, context, &cq_)) { + init_buf_.Reset(tag); + init_buf_.AddSendMessage(request); + init_buf_.AddClientSendClose(); + call_.PerformOps(&init_buf_); } - virtual void WritesDone() { context_->Write(nullptr, true); } + virtual void Read(R *msg, void* tag) override { + read_buf_.Reset(tag); + read_buf_.AddRecvMessage(msg); + call_.PerformOps(&read_buf_); + } - virtual void Cancel() { context_->Cancel(); } + virtual void Finish(Status* status, void* tag) override { + finish_buf_.Reset(tag); + finish_buf_.AddClientRecvStatus(status); + call_.PerformOps(&finish_buf_); + } - virtual const Status& Wait() { return context_->Wait(); } + private: + CompletionQueue cq_; + Call call_; + CallOpBuffer init_buf_; + CallOpBuffer read_buf_; + CallOpBuffer finish_buf_; +}; + +template <class W> +class ClientAsyncWriter final : public ClientAsyncStreamingInterface, + public WriterInterface<W> { + public: + // Blocking create a stream. + ClientAsyncWriter(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + google::protobuf::Message *response) + : response_(response), + call_(channel->CreateCall(method, context, &cq_)) {} + + virtual void Write(const W& msg, void* tag) override { + write_buf_.Reset(tag); + write_buf_.AddSendMessage(msg); + call_.PerformOps(&write_buf_); + } + + virtual void WritesDone(void* tag) override { + writes_done_buf_.Reset(tag); + writes_done_buf_.AddClientSendClose(); + call_.PerformOps(&writes_done_buf_); + } + + virtual void Finish(Status* status, void* tag) override { + finish_buf_.Reset(tag); + finish_buf_.AddRecvMessage(response_); + finish_buf_.AddClientRecvStatus(status); + call_.PerformOps(&finish_buf_); + } private: - StreamContextInterface* const context_; + google::protobuf::Message *const response_; + CompletionQueue cq_; + Call call_; + CallOpBuffer write_buf_; + CallOpBuffer writes_done_buf_; + CallOpBuffer finish_buf_; }; -template <class R> -class ServerReader : public ReaderInterface<R> { +// Client-side interface for bi-directional streaming. +template <class W, class R> +class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface, + public AsyncWriterInterface<W>, + public AsyncReaderInterface<R> { public: - explicit ServerReader(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); + ClientAsyncReaderWriter(ChannelInterface *channel, + const RpcMethod &method, ClientContext *context) + : call_(channel->CreateCall(method, context, &cq_)) {} + + virtual void Read(R *msg, void* tag) override { + read_buf_.Reset(tag); + read_buf_.AddRecvMessage(msg); + call_.PerformOps(&read_buf_); + } + + virtual void Write(const W& msg, void* tag) override { + write_buf_.Reset(tag); + write_buf_.AddSendMessage(msg); + call_.PerformOps(&write_buf_); } - virtual bool Read(R* msg) { return context_->Read(msg); } + virtual void WritesDone(void* tag) override { + writes_done_buf_.Reset(tag); + writes_done_buf_.AddClientSendClose(); + call_.PerformOps(&writes_done_buf_); + } + + virtual void Finish(Status* status, void* tag) override { + finish_buf_.Reset(tag); + finish_buf_.AddClientRecvStatus(status); + call_.PerformOps(&finish_buf_); + } private: - StreamContextInterface* const context_; // not owned + CompletionQueue cq_; + Call call_; + CallOpBuffer read_buf_; + CallOpBuffer write_buf_; + CallOpBuffer writes_done_buf_; + CallOpBuffer finish_buf_; }; +// TODO(yangg) Move out of stream.h template <class W> -class ServerWriter : public WriterInterface<W> { +class ServerAsyncResponseWriter final { public: - explicit ServerWriter(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); - context_->Read(context_->request()); + explicit ServerAsyncResponseWriter(Call* call) : call_(call) {} + + virtual void Write(const W& msg, void* tag) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_->PerformOps(&buf); } - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + private: + Call* call_; +}; + +template <class R> +class ServerAsyncReader : public AsyncReaderInterface<R> { + public: + explicit ServerAsyncReader(Call* call) : call_(call) {} + + virtual void Read(R* msg, void* tag) { + // TODO + } + + private: + Call* call_; +}; + +template <class W> +class ServerAsyncWriter : public AsyncWriterInterface<W> { + public: + explicit ServerAsyncWriter(Call* call) : call_(call) {} + + virtual void Write(const W& msg, void* tag) { + // TODO } private: - StreamContextInterface* const context_; // not owned + Call* call_; }; // Server-side interface for bi-directional streaming. template <class W, class R> -class ServerReaderWriter : public WriterInterface<W>, - public ReaderInterface<R> { +class ServerAsyncReaderWriter : public AsyncWriterInterface<W>, + public AsyncReaderInterface<R> { public: - explicit ServerReaderWriter(StreamContextInterface* context) - : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); - } + explicit ServerAsyncReaderWriter(Call* call) : call_(call) {} - virtual bool Read(R* msg) { return context_->Read(msg); } + virtual void Read(R* msg, void* tag) { + // TODO + } - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + virtual void Write(const W& msg, void* tag) { + // TODO } private: - StreamContextInterface* const context_; // not owned + Call* call_; }; } // namespace grpc diff --git a/include/grpc++/stream_context_interface.h b/include/grpc/support/cpu.h index a84119800b..9025f7c21f 100644 --- a/include/grpc++/stream_context_interface.h +++ b/include/grpc/support/cpu.h @@ -31,34 +31,27 @@ * */ -#ifndef __GRPCPP_STREAM_CONTEXT_INTERFACE_H__ -#define __GRPCPP_STREAM_CONTEXT_INTERFACE_H__ +#ifndef __GRPC_INTERNAL_SUPPORT_CPU_H__ +#define __GRPC_INTERNAL_SUPPORT_CPU_H__ -namespace google { -namespace protobuf { -class Message; -} -} +#ifdef __cplusplus +extern "C" { +#endif -namespace grpc { -class Status; +/* Interface providing CPU information for currently running system */ -// An interface to avoid dependency on internal implementation. -class StreamContextInterface { - public: - virtual ~StreamContextInterface() {} +/* Return the number of CPU cores on the current system. Will return 0 if + if information is not available. */ +unsigned gpr_cpu_num_cores(void); - virtual void Start(bool buffered) = 0; +/* Return the CPU on which the current thread is executing; N.B. This should + be considered advisory only - it is possible that the thread is switched + to a different CPU at any time. Returns a value in range + [0, gpr_cpu_num_cores() - 1] */ +unsigned gpr_cpu_current_cpu(void); - virtual bool Read(google::protobuf::Message* msg) = 0; - virtual bool Write(const google::protobuf::Message* msg, bool is_last) = 0; - virtual const Status& Wait() = 0; - virtual void Cancel() = 0; +#ifdef __cplusplus +} // extern "C" +#endif - virtual google::protobuf::Message* request() = 0; - virtual google::protobuf::Message* response() = 0; -}; - -} // namespace grpc - -#endif // __GRPCPP_STREAM_CONTEXT_INTERFACE_H__ +#endif /* __GRPC_INTERNAL_SUPPORT_CPU_H__ */ |