aboutsummaryrefslogtreecommitdiffhomepage
path: root/include
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-11 21:08:49 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-11 21:08:49 -0800
commit0156752a66e5e8f00e7e49fd1aae35a6b8157cca (patch)
treeb9ba6857ea36976c8c47407ef79dec03a76a679e /include
parent504bd331aba5817c2753c4f447f40cc83fa4d907 (diff)
Some streaming progress
Diffstat (limited to 'include')
-rw-r--r--include/grpc++/client_context.h12
-rw-r--r--include/grpc++/impl/call.h3
-rw-r--r--include/grpc++/impl/rpc_service_method.h6
-rw-r--r--include/grpc++/server_context.h17
-rw-r--r--include/grpc++/stream.h54
5 files changed, 63 insertions, 29 deletions
diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h
index 91293d1123..0a81f6a366 100644
--- a/include/grpc++/client_context.h
+++ b/include/grpc++/client_context.h
@@ -53,9 +53,9 @@ class CallOpBuffer;
template <class R> class ClientReader;
template <class W> class ClientWriter;
template <class R, class W> class ClientReaderWriter;
-template <class R> class ServerReader;
-template <class W> class ServerWriter;
-template <class R, class W> class ServerReaderWriter;
+template <class R> class ClientAsyncReader;
+template <class W> class ClientAsyncWriter;
+template <class R, class W> class ClientAsyncReaderWriter;
class ClientContext {
public:
@@ -80,9 +80,9 @@ class ClientContext {
template <class R> friend class ::grpc::ClientReader;
template <class W> friend class ::grpc::ClientWriter;
template <class R, class W> friend class ::grpc::ClientReaderWriter;
- template <class R> friend class ::grpc::ServerReader;
- template <class W> friend class ::grpc::ServerWriter;
- template <class R, class W> friend class ::grpc::ServerReaderWriter;
+ template <class R> friend class ::grpc::ClientAsyncReader;
+ template <class W> friend class ::grpc::ClientAsyncWriter;
+ template <class R, class W> friend class ::grpc::ClientAsyncReaderWriter;
grpc_call *call() { return call_; }
void set_call(grpc_call *call) {
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h
index f4b07625be..a1ef9268f0 100644
--- a/include/grpc++/impl/call.h
+++ b/include/grpc++/impl/call.h
@@ -67,7 +67,7 @@ class CallOpBuffer final : public CompletionQueueTag {
void AddRecvInitialMetadata(
std::multimap<grpc::string, grpc::string> *metadata);
void AddSendMessage(const google::protobuf::Message &message);
- void AddRecvMessage(google::protobuf::Message *message);
+ void AddRecvMessage(google::protobuf::Message *message, bool* got_message);
void AddClientSendClose();
void AddClientRecvStatus(std::multimap<grpc::string, grpc::string> *metadata,
Status *status);
@@ -97,6 +97,7 @@ class CallOpBuffer final : public CompletionQueueTag {
grpc_byte_buffer* send_message_buf_ = nullptr;
// Recv message
google::protobuf::Message* recv_message_ = nullptr;
+ bool* got_message_ = nullptr;
grpc_byte_buffer* recv_message_buf_ = nullptr;
// Client send close
bool client_send_close_ = false;
diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h
index 0fb4f79b59..c201676065 100644
--- a/include/grpc++/impl/rpc_service_method.h
+++ b/include/grpc++/impl/rpc_service_method.h
@@ -107,7 +107,7 @@ class ClientStreamingHandler : public MethodHandler {
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) final {
- ServerReader<RequestType> reader(param.call);
+ ServerReader<RequestType> reader(param.call, param.server_context);
return func_(service_, param.server_context, &reader,
dynamic_cast<ResponseType*>(param.response));
}
@@ -129,7 +129,7 @@ class ServerStreamingHandler : public MethodHandler {
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) final {
- ServerWriter<ResponseType> writer(param.call);
+ ServerWriter<ResponseType> writer(param.call, param.server_context);
return func_(service_, param.server_context,
dynamic_cast<const RequestType*>(param.request), &writer);
}
@@ -152,7 +152,7 @@ class BidiStreamingHandler : public MethodHandler {
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) final {
- ServerReaderWriter<ResponseType, RequestType> stream(param.call);
+ ServerReaderWriter<ResponseType, RequestType> stream(param.call, param.server_context);
return func_(service_, param.server_context, &stream);
}
diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h
index 6fa01f5f19..a30f6cb51d 100644
--- a/include/grpc++/server_context.h
+++ b/include/grpc++/server_context.h
@@ -44,6 +44,14 @@ struct gpr_timespec;
namespace grpc {
+template <class R> class ServerAsyncReader;
+template <class W> class ServerAsyncWriter;
+template <class R, class W> class ServerAsyncReaderWriter;
+template <class R> class ServerReader;
+template <class W> class ServerWriter;
+template <class R, class W> class ServerReaderWriter;
+
+class CallOpBuffer;
class Server;
// Interface of server side rpc context.
@@ -58,8 +66,17 @@ class ServerContext {
private:
friend class ::grpc::Server;
+ template <class R> friend class ::grpc::ServerAsyncReader;
+ template <class W> friend class ::grpc::ServerAsyncWriter;
+ template <class R, class W> friend class ::grpc::ServerAsyncReaderWriter;
+ template <class R> friend class ::grpc::ServerReader;
+ template <class W> friend class ::grpc::ServerWriter;
+ template <class R, class W> friend class ::grpc::ServerReaderWriter;
+
ServerContext(gpr_timespec deadline, grpc_metadata *metadata, size_t metadata_count);
+ void SendInitialMetadataIfNeeded(CallOpBuffer *buf);
+
const std::chrono::system_clock::time_point deadline_;
bool sent_initial_metadata_ = false;
std::multimap<grpc::string, grpc::string> client_metadata_;
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index 2791468490..4a804c7c90 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -37,6 +37,7 @@
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
+#include <grpc++/server_context.h>
#include <grpc++/impl/call.h>
#include <grpc++/status.h>
#include <grpc/support/log.h>
@@ -98,9 +99,10 @@ class ClientReader final : public ClientStreamingInterface,
virtual bool Read(R *msg) override {
CallOpBuffer buf;
- buf.AddRecvMessage(msg);
+ bool got_message;
+ buf.AddRecvMessage(msg, &got_message);
call_.PerformOps(&buf);
- return cq_.Pluck(&buf);
+ return cq_.Pluck(&buf) && got_message;
}
virtual Status Finish() override {
@@ -127,7 +129,12 @@ class ClientWriter final : public ClientStreamingInterface,
ClientContext *context,
google::protobuf::Message *response)
: context_(context), response_(response),
- call_(channel->CreateCall(method, context, &cq_)) {}
+ call_(channel->CreateCall(method, context, &cq_)) {
+ CallOpBuffer buf;
+ buf.AddSendInitialMetadata(&context->send_initial_metadata_);
+ call_.PerformOps(&buf);
+ cq_.Pluck(&buf);
+ }
virtual bool Write(const W& msg) override {
CallOpBuffer buf;
@@ -147,10 +154,11 @@ class ClientWriter final : public ClientStreamingInterface,
virtual Status Finish() override {
CallOpBuffer buf;
Status status;
- buf.AddRecvMessage(response_);
+ bool got_message;
+ buf.AddRecvMessage(response_, &got_message);
buf.AddClientRecvStatus(&context_->trailing_metadata_, &status);
call_.PerformOps(&buf);
- GPR_ASSERT(cq_.Pluck(&buf));
+ GPR_ASSERT(cq_.Pluck(&buf) && got_message);
return status;
}
@@ -174,9 +182,10 @@ class ClientReaderWriter final : public ClientStreamingInterface,
virtual bool Read(R *msg) override {
CallOpBuffer buf;
- buf.AddRecvMessage(msg);
+ bool got_message;
+ buf.AddRecvMessage(msg, &got_message);
call_.PerformOps(&buf);
- return cq_.Pluck(&buf);
+ return cq_.Pluck(&buf) && got_message;
}
virtual bool Write(const W& msg) override {
@@ -211,33 +220,37 @@ class ClientReaderWriter final : public ClientStreamingInterface,
template <class R>
class ServerReader final : public ReaderInterface<R> {
public:
- explicit ServerReader(Call* call) : call_(call) {}
+ explicit ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
virtual bool Read(R* msg) override {
CallOpBuffer buf;
- buf.AddRecvMessage(msg);
+ bool got_message;
+ buf.AddRecvMessage(msg, &got_message);
call_->PerformOps(&buf);
- return call_->cq()->Pluck(&buf);
+ return call_->cq()->Pluck(&buf) && got_message;
}
private:
- Call* call_;
+ Call* const call_;
+ ServerContext* const ctx_;
};
template <class W>
class ServerWriter final : public WriterInterface<W> {
public:
- explicit ServerWriter(Call* call) : call_(call) {}
+ explicit ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
virtual bool Write(const W& msg) override {
CallOpBuffer buf;
+ ctx_->SendInitialMetadataIfNeeded(&buf);
buf.AddSendMessage(msg);
call_->PerformOps(&buf);
return call_->cq()->Pluck(&buf);
}
private:
- Call* call_;
+ Call* const call_;
+ ServerContext* const ctx_;
};
// Server-side interface for bi-directional streaming.
@@ -245,25 +258,27 @@ template <class W, class R>
class ServerReaderWriter final : public WriterInterface<W>,
public ReaderInterface<R> {
public:
- explicit ServerReaderWriter(Call* call) : call_(call) {}
+ explicit ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
virtual bool Read(R* msg) override {
CallOpBuffer buf;
- buf.AddRecvMessage(msg);
+ bool got_message;
+ buf.AddRecvMessage(msg, &got_message);
call_->PerformOps(&buf);
- return call_->cq()->Pluck(&buf);
+ return call_->cq()->Pluck(&buf) && got_message;
}
virtual bool Write(const W& msg) override {
CallOpBuffer buf;
+ ctx_->SendInitialMetadataIfNeeded(&buf);
buf.AddSendMessage(msg);
call_->PerformOps(&buf);
return call_->cq()->Pluck(&buf);
}
private:
- CompletionQueue* cq_;
- Call* call_;
+ Call* const call_;
+ ServerContext* const ctx_;
};
// Async interfaces
@@ -353,13 +368,14 @@ class ClientAsyncWriter final : public ClientAsyncStreamingInterface,
virtual void Finish(Status* status, void* tag) override {
finish_buf_.Reset(tag);
- finish_buf_.AddRecvMessage(response_);
+ finish_buf_.AddRecvMessage(response_, &got_message_);
finish_buf_.AddClientRecvStatus(nullptr, status); // TODO metadata
call_.PerformOps(&finish_buf_);
}
private:
google::protobuf::Message *const response_;
+ bool got_message_;
CompletionQueue cq_;
Call call_;
CallOpBuffer write_buf_;