diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-02-11 21:08:49 -0800 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-02-11 21:08:49 -0800 |
commit | 0156752a66e5e8f00e7e49fd1aae35a6b8157cca (patch) | |
tree | b9ba6857ea36976c8c47407ef79dec03a76a679e | |
parent | 504bd331aba5817c2753c4f447f40cc83fa4d907 (diff) |
Some streaming progress
-rw-r--r-- | include/grpc++/client_context.h | 12 | ||||
-rw-r--r-- | include/grpc++/impl/call.h | 3 | ||||
-rw-r--r-- | include/grpc++/impl/rpc_service_method.h | 6 | ||||
-rw-r--r-- | include/grpc++/server_context.h | 17 | ||||
-rw-r--r-- | include/grpc++/stream.h | 54 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 4 | ||||
-rw-r--r-- | src/cpp/client/client_unary_call.cc | 5 | ||||
-rw-r--r-- | src/cpp/common/call.cc | 17 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 4 | ||||
-rw-r--r-- | src/cpp/server/server_context.cc | 8 |
10 files changed, 91 insertions, 39 deletions
diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h index 91293d1123..0a81f6a366 100644 --- a/include/grpc++/client_context.h +++ b/include/grpc++/client_context.h @@ -53,9 +53,9 @@ class CallOpBuffer; template <class R> class ClientReader; template <class W> class ClientWriter; template <class R, class W> class ClientReaderWriter; -template <class R> class ServerReader; -template <class W> class ServerWriter; -template <class R, class W> class ServerReaderWriter; +template <class R> class ClientAsyncReader; +template <class W> class ClientAsyncWriter; +template <class R, class W> class ClientAsyncReaderWriter; class ClientContext { public: @@ -80,9 +80,9 @@ class ClientContext { template <class R> friend class ::grpc::ClientReader; template <class W> friend class ::grpc::ClientWriter; template <class R, class W> friend class ::grpc::ClientReaderWriter; - template <class R> friend class ::grpc::ServerReader; - template <class W> friend class ::grpc::ServerWriter; - template <class R, class W> friend class ::grpc::ServerReaderWriter; + template <class R> friend class ::grpc::ClientAsyncReader; + template <class W> friend class ::grpc::ClientAsyncWriter; + template <class R, class W> friend class ::grpc::ClientAsyncReaderWriter; grpc_call *call() { return call_; } void set_call(grpc_call *call) { diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index f4b07625be..a1ef9268f0 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -67,7 +67,7 @@ class CallOpBuffer final : public CompletionQueueTag { void AddRecvInitialMetadata( std::multimap<grpc::string, grpc::string> *metadata); void AddSendMessage(const google::protobuf::Message &message); - void AddRecvMessage(google::protobuf::Message *message); + void AddRecvMessage(google::protobuf::Message *message, bool* got_message); void AddClientSendClose(); void AddClientRecvStatus(std::multimap<grpc::string, grpc::string> *metadata, Status *status); @@ -97,6 +97,7 @@ class CallOpBuffer final : public CompletionQueueTag { grpc_byte_buffer* send_message_buf_ = nullptr; // Recv message google::protobuf::Message* recv_message_ = nullptr; + bool* got_message_ = nullptr; grpc_byte_buffer* recv_message_buf_ = nullptr; // Client send close bool client_send_close_ = false; diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h index 0fb4f79b59..c201676065 100644 --- a/include/grpc++/impl/rpc_service_method.h +++ b/include/grpc++/impl/rpc_service_method.h @@ -107,7 +107,7 @@ class ClientStreamingHandler : public MethodHandler { : func_(func), service_(service) {} Status RunHandler(const HandlerParameter& param) final { - ServerReader<RequestType> reader(param.call); + ServerReader<RequestType> reader(param.call, param.server_context); return func_(service_, param.server_context, &reader, dynamic_cast<ResponseType*>(param.response)); } @@ -129,7 +129,7 @@ class ServerStreamingHandler : public MethodHandler { : func_(func), service_(service) {} Status RunHandler(const HandlerParameter& param) final { - ServerWriter<ResponseType> writer(param.call); + ServerWriter<ResponseType> writer(param.call, param.server_context); return func_(service_, param.server_context, dynamic_cast<const RequestType*>(param.request), &writer); } @@ -152,7 +152,7 @@ class BidiStreamingHandler : public MethodHandler { : func_(func), service_(service) {} Status RunHandler(const HandlerParameter& param) final { - ServerReaderWriter<ResponseType, RequestType> stream(param.call); + ServerReaderWriter<ResponseType, RequestType> stream(param.call, param.server_context); return func_(service_, param.server_context, &stream); } diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h index 6fa01f5f19..a30f6cb51d 100644 --- a/include/grpc++/server_context.h +++ b/include/grpc++/server_context.h @@ -44,6 +44,14 @@ struct gpr_timespec; namespace grpc { +template <class R> class ServerAsyncReader; +template <class W> class ServerAsyncWriter; +template <class R, class W> class ServerAsyncReaderWriter; +template <class R> class ServerReader; +template <class W> class ServerWriter; +template <class R, class W> class ServerReaderWriter; + +class CallOpBuffer; class Server; // Interface of server side rpc context. @@ -58,8 +66,17 @@ class ServerContext { private: friend class ::grpc::Server; + template <class R> friend class ::grpc::ServerAsyncReader; + template <class W> friend class ::grpc::ServerAsyncWriter; + template <class R, class W> friend class ::grpc::ServerAsyncReaderWriter; + template <class R> friend class ::grpc::ServerReader; + template <class W> friend class ::grpc::ServerWriter; + template <class R, class W> friend class ::grpc::ServerReaderWriter; + ServerContext(gpr_timespec deadline, grpc_metadata *metadata, size_t metadata_count); + void SendInitialMetadataIfNeeded(CallOpBuffer *buf); + const std::chrono::system_clock::time_point deadline_; bool sent_initial_metadata_ = false; std::multimap<grpc::string, grpc::string> client_metadata_; diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 2791468490..4a804c7c90 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -37,6 +37,7 @@ #include <grpc++/channel_interface.h> #include <grpc++/client_context.h> #include <grpc++/completion_queue.h> +#include <grpc++/server_context.h> #include <grpc++/impl/call.h> #include <grpc++/status.h> #include <grpc/support/log.h> @@ -98,9 +99,10 @@ class ClientReader final : public ClientStreamingInterface, virtual bool Read(R *msg) override { CallOpBuffer buf; - buf.AddRecvMessage(msg); + bool got_message; + buf.AddRecvMessage(msg, &got_message); call_.PerformOps(&buf); - return cq_.Pluck(&buf); + return cq_.Pluck(&buf) && got_message; } virtual Status Finish() override { @@ -127,7 +129,12 @@ class ClientWriter final : public ClientStreamingInterface, ClientContext *context, google::protobuf::Message *response) : context_(context), response_(response), - call_(channel->CreateCall(method, context, &cq_)) {} + call_(channel->CreateCall(method, context, &cq_)) { + CallOpBuffer buf; + buf.AddSendInitialMetadata(&context->send_initial_metadata_); + call_.PerformOps(&buf); + cq_.Pluck(&buf); + } virtual bool Write(const W& msg) override { CallOpBuffer buf; @@ -147,10 +154,11 @@ class ClientWriter final : public ClientStreamingInterface, virtual Status Finish() override { CallOpBuffer buf; Status status; - buf.AddRecvMessage(response_); + bool got_message; + buf.AddRecvMessage(response_, &got_message); buf.AddClientRecvStatus(&context_->trailing_metadata_, &status); call_.PerformOps(&buf); - GPR_ASSERT(cq_.Pluck(&buf)); + GPR_ASSERT(cq_.Pluck(&buf) && got_message); return status; } @@ -174,9 +182,10 @@ class ClientReaderWriter final : public ClientStreamingInterface, virtual bool Read(R *msg) override { CallOpBuffer buf; - buf.AddRecvMessage(msg); + bool got_message; + buf.AddRecvMessage(msg, &got_message); call_.PerformOps(&buf); - return cq_.Pluck(&buf); + return cq_.Pluck(&buf) && got_message; } virtual bool Write(const W& msg) override { @@ -211,33 +220,37 @@ class ClientReaderWriter final : public ClientStreamingInterface, template <class R> class ServerReader final : public ReaderInterface<R> { public: - explicit ServerReader(Call* call) : call_(call) {} + explicit ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} virtual bool Read(R* msg) override { CallOpBuffer buf; - buf.AddRecvMessage(msg); + bool got_message; + buf.AddRecvMessage(msg, &got_message); call_->PerformOps(&buf); - return call_->cq()->Pluck(&buf); + return call_->cq()->Pluck(&buf) && got_message; } private: - Call* call_; + Call* const call_; + ServerContext* const ctx_; }; template <class W> class ServerWriter final : public WriterInterface<W> { public: - explicit ServerWriter(Call* call) : call_(call) {} + explicit ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} virtual bool Write(const W& msg) override { CallOpBuffer buf; + ctx_->SendInitialMetadataIfNeeded(&buf); buf.AddSendMessage(msg); call_->PerformOps(&buf); return call_->cq()->Pluck(&buf); } private: - Call* call_; + Call* const call_; + ServerContext* const ctx_; }; // Server-side interface for bi-directional streaming. @@ -245,25 +258,27 @@ template <class W, class R> class ServerReaderWriter final : public WriterInterface<W>, public ReaderInterface<R> { public: - explicit ServerReaderWriter(Call* call) : call_(call) {} + explicit ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} virtual bool Read(R* msg) override { CallOpBuffer buf; - buf.AddRecvMessage(msg); + bool got_message; + buf.AddRecvMessage(msg, &got_message); call_->PerformOps(&buf); - return call_->cq()->Pluck(&buf); + return call_->cq()->Pluck(&buf) && got_message; } virtual bool Write(const W& msg) override { CallOpBuffer buf; + ctx_->SendInitialMetadataIfNeeded(&buf); buf.AddSendMessage(msg); call_->PerformOps(&buf); return call_->cq()->Pluck(&buf); } private: - CompletionQueue* cq_; - Call* call_; + Call* const call_; + ServerContext* const ctx_; }; // Async interfaces @@ -353,13 +368,14 @@ class ClientAsyncWriter final : public ClientAsyncStreamingInterface, virtual void Finish(Status* status, void* tag) override { finish_buf_.Reset(tag); - finish_buf_.AddRecvMessage(response_); + finish_buf_.AddRecvMessage(response_, &got_message_); finish_buf_.AddClientRecvStatus(nullptr, status); // TODO metadata call_.PerformOps(&finish_buf_); } private: google::protobuf::Message *const response_; + bool got_message_; CompletionQueue cq_; Call call_; CallOpBuffer write_buf_; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 7d78bfa0cc..65d11f16c7 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -1015,6 +1015,8 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, int had_outgoing; char buffer[GPR_LTOA_MIN_BUFSIZE]; + gpr_log(GPR_DEBUG, "cancel %d", id); + if (s) { /* clear out any unreported input & output: nobody cares anymore */ had_outgoing = s->outgoing_sopb.nops != 0; @@ -1185,6 +1187,8 @@ static void on_header(void *tp, grpc_mdelem *md) { transport *t = tp; stream *s = t->incoming_stream; + gpr_log(GPR_DEBUG, "on_header: %d %s %s", s->id, grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)); + GPR_ASSERT(s); stream_list_join(t, s, PENDING_CALLBACKS); if (md->key == t->str_grpc_timeout) { diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc index 30bf2d0fc2..bc0e83733a 100644 --- a/src/cpp/client/client_unary_call.cc +++ b/src/cpp/client/client_unary_call.cc @@ -51,11 +51,12 @@ Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, Status status; buf.AddSendInitialMetadata(context); buf.AddSendMessage(request); - buf.AddRecvMessage(result); + bool got_message; + buf.AddRecvMessage(result, &got_message); buf.AddClientSendClose(); buf.AddClientRecvStatus(nullptr, &status); // TODO metadata call.PerformOps(&buf); - GPR_ASSERT(cq.Pluck(&buf)); + GPR_ASSERT(cq.Pluck(&buf) && (got_message || !status.IsOk())); return status; } diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index d90ef0311e..a20d4a0d9a 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -58,6 +58,7 @@ void CallOpBuffer::Reset(void* next_return_tag) { } recv_message_ = nullptr; + got_message_ = nullptr; if (recv_message_buf_) { grpc_byte_buffer_destroy(recv_message_buf_); recv_message_buf_ = nullptr; @@ -128,8 +129,9 @@ void CallOpBuffer::AddSendMessage(const google::protobuf::Message& message) { send_message_ = &message; } -void CallOpBuffer::AddRecvMessage(google::protobuf::Message *message) { +void CallOpBuffer::AddRecvMessage(google::protobuf::Message *message, bool* got_message) { recv_message_ = message; + got_message_ = got_message; } void CallOpBuffer::AddClientSendClose() { @@ -239,10 +241,15 @@ void CallOpBuffer::FinalizeResult(void **tag, bool *status) { FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_); } // Parse received message if any. - if (recv_message_ && recv_message_buf_) { - *status = DeserializeProto(recv_message_buf_, recv_message_); - grpc_byte_buffer_destroy(recv_message_buf_); - recv_message_buf_ = nullptr; + if (recv_message_) { + if (recv_message_buf_) { + *got_message_ = true; + *status = DeserializeProto(recv_message_buf_, recv_message_); + grpc_byte_buffer_destroy(recv_message_buf_); + recv_message_buf_ = nullptr; + } else { + *got_message_ = false; + } } // Parse received status. if (recv_status_) { diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index be3d225630..17b0543bcd 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -177,9 +177,7 @@ class Server::MethodRequestData final : public CompletionQueueTag { auto status = method_->handler()->RunHandler( MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get())); CallOpBuffer buf; - if (!ctx_.sent_initial_metadata_) { - buf.AddSendInitialMetadata(&ctx_.initial_metadata_); - } + ctx_.SendInitialMetadataIfNeeded(&buf); if (has_response_payload_) { buf.AddSendMessage(*res); } diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 2bf4104d76..06eeb39297 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -32,6 +32,7 @@ */ #include <grpc++/server_context.h> +#include <grpc++/impl/call.h> #include <grpc/grpc.h> #include "src/cpp/util/time.h" @@ -48,4 +49,11 @@ ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata *metadata, } } +void ServerContext::SendInitialMetadataIfNeeded(CallOpBuffer* buf) { + if (!sent_initial_metadata_) { + buf->AddSendInitialMetadata(&initial_metadata_); + sent_initial_metadata_ = true; + } +} + } // namespace grpc |