aboutsummaryrefslogtreecommitdiffhomepage
path: root/include
diff options
context:
space:
mode:
Diffstat (limited to 'include')
-rw-r--r--include/grpc++/call.h (renamed from include/grpc++/async_server_context.h)84
-rw-r--r--include/grpc++/channel_interface.h24
-rw-r--r--include/grpc++/completion_queue.h69
-rw-r--r--include/grpc++/config.h3
-rw-r--r--include/grpc++/impl/client_unary_call.h67
-rw-r--r--include/grpc++/impl/rpc_method.h4
-rw-r--r--include/grpc++/impl/rpc_service_method.h27
-rw-r--r--include/grpc++/impl/service_type.h (renamed from include/grpc++/async_server.h)41
-rw-r--r--include/grpc++/server.h13
-rw-r--r--include/grpc++/server_builder.h9
-rw-r--r--include/grpc++/server_context.h8
-rw-r--r--include/grpc++/stream.h408
-rw-r--r--include/grpc/support/cpu.h (renamed from include/grpc++/stream_context_interface.h)43
13 files changed, 556 insertions, 244 deletions
diff --git a/include/grpc++/async_server_context.h b/include/grpc++/call.h
index c038286ac1..de789febe6 100644
--- a/include/grpc++/async_server_context.h
+++ b/include/grpc++/call.h
@@ -31,65 +31,73 @@
*
*/
-#ifndef __GRPCPP_ASYNC_SERVER_CONTEXT_H__
-#define __GRPCPP_ASYNC_SERVER_CONTEXT_H__
+#ifndef __GRPCPP_CALL_H__
+#define __GRPCPP_CALL_H__
-#include <chrono>
+#include <grpc++/status.h>
+#include <grpc++/completion_queue.h>
-#include <grpc++/config.h>
-
-struct grpc_byte_buffer;
-struct grpc_call;
-struct grpc_completion_queue;
+#include <memory>
+#include <vector>
namespace google {
namespace protobuf {
class Message;
-}
-}
+} // namespace protobuf
+} // namespace google
-using std::chrono::system_clock;
+struct grpc_call;
+struct grpc_op;
namespace grpc {
-class Status;
-// TODO(rocking): wrap grpc c structures.
-class AsyncServerContext {
+class ChannelInterface;
+
+class CallOpBuffer final : public CompletionQueueTag {
public:
- AsyncServerContext(grpc_call* call, const grpc::string& method,
- const grpc::string& host,
- system_clock::time_point absolute_deadline);
- ~AsyncServerContext();
+ CallOpBuffer() : return_tag_(this) {}
- // Accept this rpc, bind it to a completion queue.
- void Accept(grpc_completion_queue* cq);
+ void Reset(void *next_return_tag);
- // Read and write calls, all async. Return true for success.
- bool StartRead(google::protobuf::Message* request);
- bool StartWrite(const google::protobuf::Message& response, int flags);
- bool StartWriteStatus(const Status& status);
+ void AddSendInitialMetadata(std::vector<std::pair<grpc::string, grpc::string> > *metadata);
+ void AddSendMessage(const google::protobuf::Message &message);
+ void AddRecvMessage(google::protobuf::Message *message);
+ void AddClientSendClose();
+ void AddClientRecvStatus(Status *status);
- bool ParseRead(grpc_byte_buffer* read_buffer);
+ // INTERNAL API:
- grpc::string method() const { return method_; }
- grpc::string host() const { return host_; }
- system_clock::time_point absolute_deadline() { return absolute_deadline_; }
+ // Convert to an array of grpc_op elements
+ void FillOps(grpc_op *ops, size_t *nops);
- grpc_call* call() { return call_; }
+ // Called by completion queue just prior to returning from Next() or Pluck()
+ void FinalizeResult(void *tag, bool *status) override;
private:
- AsyncServerContext(const AsyncServerContext&);
- AsyncServerContext& operator=(const AsyncServerContext&);
+ void *return_tag_;
+};
+
+class CCallDeleter {
+ public:
+ void operator()(grpc_call *c);
+};
+
+// Straightforward wrapping of the C call object
+class Call final {
+ public:
+ Call(grpc_call *call, ChannelInterface *channel, CompletionQueue *cq);
+
+ void PerformOps(CallOpBuffer *buffer);
- // These properties may be moved to a ServerContext class.
- const grpc::string method_;
- const grpc::string host_;
- system_clock::time_point absolute_deadline_;
+ grpc_call *call() { return call_.get(); }
+ CompletionQueue *cq() { return cq_; }
- google::protobuf::Message* request_; // not owned
- grpc_call* call_; // owned
+ private:
+ ChannelInterface *channel_;
+ CompletionQueue *cq_;
+ std::unique_ptr<grpc_call, CCallDeleter> call_;
};
} // namespace grpc
-#endif // __GRPCPP_ASYNC_SERVER_CONTEXT_H__
+#endif // __GRPCPP_CALL_INTERFACE_H__
diff --git a/include/grpc++/channel_interface.h b/include/grpc++/channel_interface.h
index 9ed35422b8..3631ea4d5d 100644
--- a/include/grpc++/channel_interface.h
+++ b/include/grpc++/channel_interface.h
@@ -39,28 +39,26 @@
namespace google {
namespace protobuf {
class Message;
-}
-}
+} // namespace protobuf
+} // namespace google
-namespace grpc {
+struct grpc_call;
+namespace grpc {
+class Call;
+class CallOpBuffer;
class ClientContext;
+class CompletionQueue;
class RpcMethod;
-class StreamContextInterface;
+class CallInterface;
class ChannelInterface {
public:
virtual ~ChannelInterface() {}
- virtual Status StartBlockingRpc(const RpcMethod& method,
- ClientContext* context,
- const google::protobuf::Message& request,
- google::protobuf::Message* result) = 0;
-
- virtual StreamContextInterface* CreateStream(
- const RpcMethod& method, ClientContext* context,
- const google::protobuf::Message* request,
- google::protobuf::Message* result) = 0;
+ virtual Call CreateCall(const RpcMethod &method, ClientContext *context,
+ CompletionQueue *cq) = 0;
+ virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) = 0;
};
} // namespace grpc
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h
index 72f6253f8e..c976bd5b45 100644
--- a/include/grpc++/completion_queue.h
+++ b/include/grpc++/completion_queue.h
@@ -34,51 +34,66 @@
#ifndef __GRPCPP_COMPLETION_QUEUE_H__
#define __GRPCPP_COMPLETION_QUEUE_H__
+#include <grpc++/impl/client_unary_call.h>
+
struct grpc_completion_queue;
namespace grpc {
+template <class R>
+class ClientReader;
+template <class W>
+class ClientWriter;
+template <class R, class W>
+class ClientReaderWriter;
+template <class R>
+class ServerReader;
+template <class W>
+class ServerWriter;
+template <class R, class W>
+class ServerReaderWriter;
+
+class CompletionQueue;
+
+class CompletionQueueTag {
+ public:
+ // Called prior to returning from Next(), return value
+ // is the status of the operation (return status is the default thing
+ // to do)
+ virtual void FinalizeResult(void *tag, bool *status) = 0;
+};
+
// grpc_completion_queue wrapper class
class CompletionQueue {
public:
CompletionQueue();
~CompletionQueue();
- enum CompletionType {
- QUEUE_CLOSED = 0, // Shutting down.
- RPC_END = 1, // An RPC finished. Either at client or server.
- CLIENT_READ_OK = 2, // A client-side read has finished successfully.
- CLIENT_READ_ERROR = 3, // A client-side read has finished with error.
- CLIENT_WRITE_OK = 4,
- CLIENT_WRITE_ERROR = 5,
- SERVER_RPC_NEW = 6, // A new RPC just arrived at the server.
- SERVER_READ_OK = 7, // A server-side read has finished successfully.
- SERVER_READ_ERROR = 8, // A server-side read has finished with error.
- SERVER_WRITE_OK = 9,
- SERVER_WRITE_ERROR = 10,
- // Client or server has sent half close successfully.
- HALFCLOSE_OK = 11,
- // New CompletionTypes may be added in the future, so user code should
- // always
- // handle the default case of a CompletionType that appears after such code
- // was
- // written.
- DO_NOT_USE = 20,
- };
-
// Blocking read from queue.
- // For QUEUE_CLOSED, *tag is not changed.
- // For SERVER_RPC_NEW, *tag will be a newly allocated AsyncServerContext.
- // For others, *tag will be the AsyncServerContext of this rpc.
- CompletionType Next(void** tag);
+ // Returns true if an event was received, false if the queue is ready
+ // for destruction.
+ bool Next(void **tag, bool *ok);
// Shutdown has to be called, and the CompletionQueue can only be
- // destructed when the QUEUE_CLOSED message has been read with Next().
+ // destructed when false is returned from Next().
void Shutdown();
grpc_completion_queue* cq() { return cq_; }
private:
+ template <class R> friend class ::grpc::ClientReader;
+ template <class W> friend class ::grpc::ClientWriter;
+ template <class R, class W> friend class ::grpc::ClientReaderWriter;
+ template <class R> friend class ::grpc::ServerReader;
+ template <class W> friend class ::grpc::ServerWriter;
+ template <class R, class W> friend class ::grpc::ServerReaderWriter;
+ friend Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request,
+ google::protobuf::Message *result);
+
+ bool Pluck(CompletionQueueTag *tag);
+
grpc_completion_queue* cq_; // owned
};
diff --git a/include/grpc++/config.h b/include/grpc++/config.h
index 52913fbf0f..1b4b463d35 100644
--- a/include/grpc++/config.h
+++ b/include/grpc++/config.h
@@ -39,6 +39,7 @@
namespace grpc {
typedef std::string string;
-}
+
+} // namespace grpc
#endif // __GRPCPP_CONFIG_H__
diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h
new file mode 100644
index 0000000000..091430b884
--- /dev/null
+++ b/include/grpc++/impl/client_unary_call.h
@@ -0,0 +1,67 @@
+/*
+*
+* Copyright 2014, Google Inc.
+* All rights reserved.
+*
+* Redistribution and use in source and binary forms, with or without
+* modification, are permitted provided that the following conditions are
+* met:
+*
+* * Redistributions of source code must retain the above copyright
+* notice, this list of conditions and the following disclaimer.
+* * Redistributions in binary form must reproduce the above
+* copyright notice, this list of conditions and the following disclaimer
+* in the documentation and/or other materials provided with the
+* distribution.
+* * Neither the name of Google Inc. nor the names of its
+* contributors may be used to endorse or promote products derived from
+* this software without specific prior written permission.
+*
+* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*
+*/
+
+#ifndef __GRPCPP_CLIENT_UNARY_CALL_H__
+#define __GRPCPP_CLIENT_UNARY_CALL_H__
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
+namespace grpc {
+
+class ChannelInterface;
+class ClientContext;
+class CompletionQueue;
+class RpcMethod;
+class Status;
+
+// Wrapper that begins an asynchronous unary call
+void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request,
+ google::protobuf::Message *result, Status *status,
+ CompletionQueue *cq, void *tag);
+
+// Wrapper that performs a blocking unary call
+Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request,
+ google::protobuf::Message *result);
+
+} // namespace grpc
+
+#endif
+
diff --git a/include/grpc++/impl/rpc_method.h b/include/grpc++/impl/rpc_method.h
index 75fec356dd..bb16e64c96 100644
--- a/include/grpc++/impl/rpc_method.h
+++ b/include/grpc++/impl/rpc_method.h
@@ -37,8 +37,8 @@
namespace google {
namespace protobuf {
class Message;
-}
-}
+} // namespace protobuf
+} // namespace google
namespace grpc {
diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h
index 620de5e67f..0fb4f79b59 100644
--- a/include/grpc++/impl/rpc_service_method.h
+++ b/include/grpc++/impl/rpc_service_method.h
@@ -55,25 +55,18 @@ class MethodHandler {
public:
virtual ~MethodHandler() {}
struct HandlerParameter {
- HandlerParameter(ServerContext* context,
+ HandlerParameter(Call *c,
+ ServerContext* context,
const google::protobuf::Message* req,
google::protobuf::Message* resp)
- : server_context(context),
+ : call(c),
+ server_context(context),
request(req),
- response(resp),
- stream_context(nullptr) {}
- HandlerParameter(ServerContext* context,
- const google::protobuf::Message* req,
- google::protobuf::Message* resp,
- StreamContextInterface* stream)
- : server_context(context),
- request(req),
- response(resp),
- stream_context(stream) {}
+ response(resp) {}
+ Call* call;
ServerContext* server_context;
const google::protobuf::Message* request;
google::protobuf::Message* response;
- StreamContextInterface* stream_context;
};
virtual Status RunHandler(const HandlerParameter& param) = 0;
};
@@ -114,7 +107,7 @@ class ClientStreamingHandler : public MethodHandler {
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) final {
- ServerReader<RequestType> reader(param.stream_context);
+ ServerReader<RequestType> reader(param.call);
return func_(service_, param.server_context, &reader,
dynamic_cast<ResponseType*>(param.response));
}
@@ -136,7 +129,7 @@ class ServerStreamingHandler : public MethodHandler {
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) final {
- ServerWriter<ResponseType> writer(param.stream_context);
+ ServerWriter<ResponseType> writer(param.call);
return func_(service_, param.server_context,
dynamic_cast<const RequestType*>(param.request), &writer);
}
@@ -159,7 +152,7 @@ class BidiStreamingHandler : public MethodHandler {
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) final {
- ServerReaderWriter<ResponseType, RequestType> stream(param.stream_context);
+ ServerReaderWriter<ResponseType, RequestType> stream(param.call);
return func_(service_, param.server_context, &stream);
}
@@ -203,7 +196,7 @@ class RpcService {
public:
// Takes ownership.
void AddMethod(RpcServiceMethod* method) {
- methods_.push_back(std::unique_ptr<RpcServiceMethod>(method));
+ methods_.emplace_back(method);
}
RpcServiceMethod* GetMethod(int i) { return methods_[i].get(); }
diff --git a/include/grpc++/async_server.h b/include/grpc++/impl/service_type.h
index fe2c5d9367..0684f322d8 100644
--- a/include/grpc++/async_server.h
+++ b/include/grpc++/impl/service_type.h
@@ -31,40 +31,25 @@
*
*/
-#ifndef __GRPCPP_ASYNC_SERVER_H__
-#define __GRPCPP_ASYNC_SERVER_H__
-
-#include <mutex>
-
-#include <grpc++/config.h>
-
-struct grpc_server;
+#ifndef __GRPCPP_IMPL_SERVICE_TYPE_H__
+#define __GRPCPP_IMPL_SERVICE_TYPE_H__
namespace grpc {
-class CompletionQueue;
-
-class AsyncServer {
- public:
- explicit AsyncServer(CompletionQueue* cc);
- ~AsyncServer();
- void AddPort(const grpc::string& addr);
+class RpcService;
- void Start();
-
- // The user has to call this to get one new rpc on the completion
- // queue.
- void RequestOneRpc();
-
- void Shutdown();
+class SynchronousService {
+ public:
+ virtual ~SynchronousService() {}
+ virtual RpcService *service() = 0;
+};
- private:
- bool started_;
- std::mutex shutdown_mu_;
- bool shutdown_;
- grpc_server* server_;
+class AsynchronousService {
+ public:
+ virtual ~AsynchronousService() {}
+ virtual RpcService *service() = 0;
};
} // namespace grpc
-#endif // __GRPCPP_ASYNC_SERVER_H__
+#endif // __GRPCPP_IMPL_SERVICE_TYPE_H__ \ No newline at end of file
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 5fa371ba62..670ffa7815 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -48,8 +48,8 @@ struct grpc_server;
namespace google {
namespace protobuf {
class Message;
-}
-}
+} // namespace protobuf
+} // namespace google
namespace grpc {
class AsyncServerContext;
@@ -70,17 +70,16 @@ class Server {
friend class ServerBuilder;
// ServerBuilder use only
- Server(ThreadPoolInterface* thread_pool, ServerCredentials* creds);
+ Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, ServerCredentials* creds);
Server();
// Register a service. This call does not take ownership of the service.
// The service must exist for the lifetime of the Server instance.
- void RegisterService(RpcService* service);
+ bool RegisterService(RpcService* service);
// Add a listening port. Can be called multiple times.
- void AddPort(const grpc::string& addr);
+ int AddPort(const grpc::string& addr);
// Start the server.
- void Start();
+ bool Start();
- void AllowOneRpc();
void HandleQueueClosed();
void RunRpc();
void ScheduleCallback();
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index cf27452010..8b4c81bc87 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -41,9 +41,11 @@
namespace grpc {
+class AsynchronousService;
class RpcService;
class Server;
class ServerCredentials;
+class SynchronousService;
class ThreadPoolInterface;
class ServerBuilder {
@@ -53,7 +55,9 @@ class ServerBuilder {
// Register a service. This call does not take ownership of the service.
// The service must exist for the lifetime of the Server instance returned by
// BuildAndStart().
- void RegisterService(RpcService* service);
+ void RegisterService(SynchronousService* service);
+
+ void RegisterAsyncService(AsynchronousService *service);
// Add a listening port. Can be called multiple times.
void AddPort(const grpc::string& addr);
@@ -71,9 +75,10 @@ class ServerBuilder {
private:
std::vector<RpcService*> services_;
+ std::vector<AsynchronousService*> async_services_;
std::vector<grpc::string> ports_;
std::shared_ptr<ServerCredentials> creds_;
- ThreadPoolInterface* thread_pool_;
+ ThreadPoolInterface* thread_pool_ = nullptr;
};
} // namespace grpc
diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h
index 47fd6cf1c8..4af9fd6aaa 100644
--- a/include/grpc++/server_context.h
+++ b/include/grpc++/server_context.h
@@ -35,6 +35,9 @@
#define __GRPCPP_SERVER_CONTEXT_H_
#include <chrono>
+#include <vector>
+
+#include "config.h"
namespace grpc {
@@ -43,7 +46,10 @@ class ServerContext {
public:
virtual ~ServerContext() {}
- virtual std::chrono::system_clock::time_point absolute_deadline() const = 0;
+ std::chrono::system_clock::time_point absolute_deadline();
+
+ private:
+ std::vector<std::pair<grpc::string, grpc::string> > metadata_;
};
} // namespace grpc
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index b8982f4d93..c30825a7a5 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -34,7 +34,9 @@
#ifndef __GRPCPP_STREAM_H__
#define __GRPCPP_STREAM_H__
-#include <grpc++/stream_context_interface.h>
+#include <grpc++/call.h>
+#include <grpc++/channel_interface.h>
+#include <grpc++/completion_queue.h>
#include <grpc++/status.h>
#include <grpc/support/log.h>
@@ -45,16 +47,12 @@ class ClientStreamingInterface {
public:
virtual ~ClientStreamingInterface() {}
- // Try to cancel the stream. Wait() still needs to be called to get the final
- // status. Cancelling after the stream has finished has no effects.
- virtual void Cancel() = 0;
-
// Wait until the stream finishes, and return the final status. When the
// client side declares it has no more message to send, either implicitly or
// by calling WritesDone, it needs to make sure there is no more message to
// be received from the server, either implicitly or by getting a false from
// a Read(). Otherwise, this implicitly cancels the stream.
- virtual const Status& Wait() = 0;
+ virtual Status Finish() = 0;
};
// An interface that yields a sequence of R messages.
@@ -82,147 +80,391 @@ class WriterInterface {
};
template <class R>
-class ClientReader : public ClientStreamingInterface,
- public ReaderInterface<R> {
+class ClientReader final : public ClientStreamingInterface,
+ public ReaderInterface<R> {
public:
// Blocking create a stream and write the first request out.
- explicit ClientReader(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
- context_->Write(context_->request(), true);
+ ClientReader(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request)
+ : call_(channel->CreateCall(method, context, &cq_)) {
+ CallOpBuffer buf;
+ buf.AddSendMessage(request);
+ buf.AddClientSendClose();
+ call_.PerformOps(&buf);
+ cq_.Pluck(&buf);
}
- ~ClientReader() { delete context_; }
+ virtual bool Read(R *msg) override {
+ CallOpBuffer buf;
+ buf.AddRecvMessage(msg);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
+ }
- virtual bool Read(R* msg) { return context_->Read(msg); }
+ virtual Status Finish() override {
+ CallOpBuffer buf;
+ Status status;
+ buf.AddClientRecvStatus(&status);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
+ return status;
+ }
- virtual void Cancel() { context_->Cancel(); }
+ private:
+ CompletionQueue cq_;
+ Call call_;
+};
- virtual const Status& Wait() { return context_->Wait(); }
+template <class W>
+class ClientWriter final : public ClientStreamingInterface,
+ public WriterInterface<W> {
+ public:
+ // Blocking create a stream.
+ ClientWriter(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ google::protobuf::Message *response)
+ : response_(response),
+ call_(channel->CreateCall(method, context, &cq_)) {}
+
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
+ }
+
+ virtual bool WritesDone() {
+ CallOpBuffer buf;
+ buf.AddClientSendClose();
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
+ }
+
+ // Read the final response and wait for the final status.
+ virtual Status Finish() override {
+ CallOpBuffer buf;
+ Status status;
+ buf.AddRecvMessage(response_);
+ buf.AddClientRecvStatus(&status);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
+ return status;
+ }
private:
- StreamContextInterface* const context_;
+ google::protobuf::Message *const response_;
+ CompletionQueue cq_;
+ Call call_;
};
-template <class W>
-class ClientWriter : public ClientStreamingInterface,
- public WriterInterface<W> {
+// Client-side interface for bi-directional streaming.
+template <class W, class R>
+class ClientReaderWriter final : public ClientStreamingInterface,
+ public WriterInterface<W>,
+ public ReaderInterface<R> {
public:
// Blocking create a stream.
- explicit ClientWriter(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(false);
+ ClientReaderWriter(ChannelInterface *channel,
+ const RpcMethod &method, ClientContext *context)
+ : call_(channel->CreateCall(method, context, &cq_)) {}
+
+ virtual bool Read(R *msg) override {
+ CallOpBuffer buf;
+ buf.AddRecvMessage(msg);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
}
- ~ClientWriter() { delete context_; }
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
+ }
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ virtual bool WritesDone() {
+ CallOpBuffer buf;
+ buf.AddClientSendClose();
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
}
- virtual void WritesDone() { context_->Write(nullptr, true); }
+ virtual Status Finish() override {
+ CallOpBuffer buf;
+ Status status;
+ buf.AddClientRecvStatus(&status);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
+ return status;
+ }
- virtual void Cancel() { context_->Cancel(); }
+ private:
+ CompletionQueue cq_;
+ Call call_;
+};
- // Read the final response and wait for the final status.
- virtual const Status& Wait() {
- bool success = context_->Read(context_->response());
- if (!success) {
- Cancel();
- } else {
- success = context_->Read(nullptr);
- if (success) {
- Cancel();
- }
- }
- return context_->Wait();
+template <class R>
+class ServerReader final : public ReaderInterface<R> {
+ public:
+ explicit ServerReader(Call* call) : call_(call) {}
+
+ virtual bool Read(R* msg) override {
+ CallOpBuffer buf;
+ buf.AddRecvMessage(msg);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf);
}
private:
- StreamContextInterface* const context_;
+ Call* call_;
};
-// Client-side interface for bi-directional streaming.
+template <class W>
+class ServerWriter final : public WriterInterface<W> {
+ public:
+ explicit ServerWriter(Call* call) : call_(call) {}
+
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf);
+ }
+
+ private:
+ Call* call_;
+};
+
+// Server-side interface for bi-directional streaming.
template <class W, class R>
-class ClientReaderWriter : public ClientStreamingInterface,
- public WriterInterface<W>,
+class ServerReaderWriter final : public WriterInterface<W>,
public ReaderInterface<R> {
public:
- // Blocking create a stream.
- explicit ClientReaderWriter(StreamContextInterface* context)
- : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(false);
+ explicit ServerReaderWriter(Call* call) : call_(call) {}
+
+ virtual bool Read(R* msg) override {
+ CallOpBuffer buf;
+ buf.AddRecvMessage(msg);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf);
}
- ~ClientReaderWriter() { delete context_; }
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf);
+ }
+
+ private:
+ CompletionQueue* cq_;
+ Call* call_;
+};
+
+// Async interfaces
+// Common interface for all client side streaming.
+class ClientAsyncStreamingInterface {
+ public:
+ virtual ~ClientAsyncStreamingInterface() {}
+
+ virtual void Finish(Status* status, void* tag) = 0;
+};
+
+// An interface that yields a sequence of R messages.
+template <class R>
+class AsyncReaderInterface {
+ public:
+ virtual ~AsyncReaderInterface() {}
+
+ virtual void Read(R* msg, void* tag) = 0;
+};
- virtual bool Read(R* msg) { return context_->Read(msg); }
+// An interface that can be fed a sequence of W messages.
+template <class W>
+class AsyncWriterInterface {
+ public:
+ virtual ~AsyncWriterInterface() {}
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ virtual void Write(const W& msg, void* tag) = 0;
+};
+
+template <class R>
+class ClientAsyncReader final : public ClientAsyncStreamingInterface,
+ public AsyncReaderInterface<R> {
+ public:
+ // Blocking create a stream and write the first request out.
+ ClientAsyncReader(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request, void* tag)
+ : call_(channel->CreateCall(method, context, &cq_)) {
+ init_buf_.Reset(tag);
+ init_buf_.AddSendMessage(request);
+ init_buf_.AddClientSendClose();
+ call_.PerformOps(&init_buf_);
}
- virtual void WritesDone() { context_->Write(nullptr, true); }
+ virtual void Read(R *msg, void* tag) override {
+ read_buf_.Reset(tag);
+ read_buf_.AddRecvMessage(msg);
+ call_.PerformOps(&read_buf_);
+ }
- virtual void Cancel() { context_->Cancel(); }
+ virtual void Finish(Status* status, void* tag) override {
+ finish_buf_.Reset(tag);
+ finish_buf_.AddClientRecvStatus(status);
+ call_.PerformOps(&finish_buf_);
+ }
- virtual const Status& Wait() { return context_->Wait(); }
+ private:
+ CompletionQueue cq_;
+ Call call_;
+ CallOpBuffer init_buf_;
+ CallOpBuffer read_buf_;
+ CallOpBuffer finish_buf_;
+};
+
+template <class W>
+class ClientAsyncWriter final : public ClientAsyncStreamingInterface,
+ public WriterInterface<W> {
+ public:
+ // Blocking create a stream.
+ ClientAsyncWriter(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ google::protobuf::Message *response)
+ : response_(response),
+ call_(channel->CreateCall(method, context, &cq_)) {}
+
+ virtual void Write(const W& msg, void* tag) override {
+ write_buf_.Reset(tag);
+ write_buf_.AddSendMessage(msg);
+ call_.PerformOps(&write_buf_);
+ }
+
+ virtual void WritesDone(void* tag) override {
+ writes_done_buf_.Reset(tag);
+ writes_done_buf_.AddClientSendClose();
+ call_.PerformOps(&writes_done_buf_);
+ }
+
+ virtual void Finish(Status* status, void* tag) override {
+ finish_buf_.Reset(tag);
+ finish_buf_.AddRecvMessage(response_);
+ finish_buf_.AddClientRecvStatus(status);
+ call_.PerformOps(&finish_buf_);
+ }
private:
- StreamContextInterface* const context_;
+ google::protobuf::Message *const response_;
+ CompletionQueue cq_;
+ Call call_;
+ CallOpBuffer write_buf_;
+ CallOpBuffer writes_done_buf_;
+ CallOpBuffer finish_buf_;
};
-template <class R>
-class ServerReader : public ReaderInterface<R> {
+// Client-side interface for bi-directional streaming.
+template <class W, class R>
+class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
+ public AsyncWriterInterface<W>,
+ public AsyncReaderInterface<R> {
public:
- explicit ServerReader(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
+ ClientAsyncReaderWriter(ChannelInterface *channel,
+ const RpcMethod &method, ClientContext *context)
+ : call_(channel->CreateCall(method, context, &cq_)) {}
+
+ virtual void Read(R *msg, void* tag) override {
+ read_buf_.Reset(tag);
+ read_buf_.AddRecvMessage(msg);
+ call_.PerformOps(&read_buf_);
+ }
+
+ virtual void Write(const W& msg, void* tag) override {
+ write_buf_.Reset(tag);
+ write_buf_.AddSendMessage(msg);
+ call_.PerformOps(&write_buf_);
}
- virtual bool Read(R* msg) { return context_->Read(msg); }
+ virtual void WritesDone(void* tag) override {
+ writes_done_buf_.Reset(tag);
+ writes_done_buf_.AddClientSendClose();
+ call_.PerformOps(&writes_done_buf_);
+ }
+
+ virtual void Finish(Status* status, void* tag) override {
+ finish_buf_.Reset(tag);
+ finish_buf_.AddClientRecvStatus(status);
+ call_.PerformOps(&finish_buf_);
+ }
private:
- StreamContextInterface* const context_; // not owned
+ CompletionQueue cq_;
+ Call call_;
+ CallOpBuffer read_buf_;
+ CallOpBuffer write_buf_;
+ CallOpBuffer writes_done_buf_;
+ CallOpBuffer finish_buf_;
};
+// TODO(yangg) Move out of stream.h
template <class W>
-class ServerWriter : public WriterInterface<W> {
+class ServerAsyncResponseWriter final {
public:
- explicit ServerWriter(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
- context_->Read(context_->request());
+ explicit ServerAsyncResponseWriter(Call* call) : call_(call) {}
+
+ virtual void Write(const W& msg, void* tag) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_->PerformOps(&buf);
}
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ private:
+ Call* call_;
+};
+
+template <class R>
+class ServerAsyncReader : public AsyncReaderInterface<R> {
+ public:
+ explicit ServerAsyncReader(Call* call) : call_(call) {}
+
+ virtual void Read(R* msg, void* tag) {
+ // TODO
+ }
+
+ private:
+ Call* call_;
+};
+
+template <class W>
+class ServerAsyncWriter : public AsyncWriterInterface<W> {
+ public:
+ explicit ServerAsyncWriter(Call* call) : call_(call) {}
+
+ virtual void Write(const W& msg, void* tag) {
+ // TODO
}
private:
- StreamContextInterface* const context_; // not owned
+ Call* call_;
};
// Server-side interface for bi-directional streaming.
template <class W, class R>
-class ServerReaderWriter : public WriterInterface<W>,
- public ReaderInterface<R> {
+class ServerAsyncReaderWriter : public AsyncWriterInterface<W>,
+ public AsyncReaderInterface<R> {
public:
- explicit ServerReaderWriter(StreamContextInterface* context)
- : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
- }
+ explicit ServerAsyncReaderWriter(Call* call) : call_(call) {}
- virtual bool Read(R* msg) { return context_->Read(msg); }
+ virtual void Read(R* msg, void* tag) {
+ // TODO
+ }
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ virtual void Write(const W& msg, void* tag) {
+ // TODO
}
private:
- StreamContextInterface* const context_; // not owned
+ Call* call_;
};
} // namespace grpc
diff --git a/include/grpc++/stream_context_interface.h b/include/grpc/support/cpu.h
index a84119800b..9025f7c21f 100644
--- a/include/grpc++/stream_context_interface.h
+++ b/include/grpc/support/cpu.h
@@ -31,34 +31,27 @@
*
*/
-#ifndef __GRPCPP_STREAM_CONTEXT_INTERFACE_H__
-#define __GRPCPP_STREAM_CONTEXT_INTERFACE_H__
+#ifndef __GRPC_INTERNAL_SUPPORT_CPU_H__
+#define __GRPC_INTERNAL_SUPPORT_CPU_H__
-namespace google {
-namespace protobuf {
-class Message;
-}
-}
+#ifdef __cplusplus
+extern "C" {
+#endif
-namespace grpc {
-class Status;
+/* Interface providing CPU information for currently running system */
-// An interface to avoid dependency on internal implementation.
-class StreamContextInterface {
- public:
- virtual ~StreamContextInterface() {}
+/* Return the number of CPU cores on the current system. Will return 0 if
+ if information is not available. */
+unsigned gpr_cpu_num_cores(void);
- virtual void Start(bool buffered) = 0;
+/* Return the CPU on which the current thread is executing; N.B. This should
+ be considered advisory only - it is possible that the thread is switched
+ to a different CPU at any time. Returns a value in range
+ [0, gpr_cpu_num_cores() - 1] */
+unsigned gpr_cpu_current_cpu(void);
- virtual bool Read(google::protobuf::Message* msg) = 0;
- virtual bool Write(const google::protobuf::Message* msg, bool is_last) = 0;
- virtual const Status& Wait() = 0;
- virtual void Cancel() = 0;
+#ifdef __cplusplus
+} // extern "C"
+#endif
- virtual google::protobuf::Message* request() = 0;
- virtual google::protobuf::Message* response() = 0;
-};
-
-} // namespace grpc
-
-#endif // __GRPCPP_STREAM_CONTEXT_INTERFACE_H__
+#endif /* __GRPC_INTERNAL_SUPPORT_CPU_H__ */