diff options
-rw-r--r-- | include/grpc++/async_unary_call.h | 47 | ||||
-rw-r--r-- | include/grpc++/completion_queue.h | 9 | ||||
-rw-r--r-- | include/grpc++/config.h | 45 | ||||
-rw-r--r-- | include/grpc++/config_protobuf.h | 75 | ||||
-rw-r--r-- | include/grpc++/impl/call.h | 132 | ||||
-rw-r--r-- | include/grpc++/impl/client_unary_call.h | 27 | ||||
-rw-r--r-- | include/grpc++/impl/serialization_traits.h (renamed from src/cpp/client/client_unary_call.cc) | 37 | ||||
-rw-r--r-- | include/grpc++/impl/service_type.h | 36 | ||||
-rw-r--r-- | include/grpc++/server.h | 18 | ||||
-rw-r--r-- | include/grpc++/stream.h | 438 | ||||
-rw-r--r-- | src/compiler/config.h | 1 |
11 files changed, 521 insertions, 344 deletions
diff --git a/include/grpc++/async_unary_call.h b/include/grpc++/async_unary_call.h index 786f8c7184..a296102c3a 100644 --- a/include/grpc++/async_unary_call.h +++ b/include/grpc++/async_unary_call.h @@ -58,40 +58,41 @@ template <class R> class ClientAsyncResponseReader GRPC_FINAL : public ClientAsyncResponseReaderInterface<R> { public: + template <class W> ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, - const grpc::protobuf::Message& request) + const W& request) : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); - init_buf_.AddSendMessage(request); - init_buf_.AddClientSendClose(); + init_buf_.SendInitialMetadata(context->send_initial_metadata_); + init_buf_.SendMessage(request); + init_buf_.ClientSendClose(); call_.PerformOps(&init_buf_); } void ReadInitialMetadata(void* tag) { GPR_ASSERT(!context_->initial_metadata_received_); - meta_buf_.Reset(tag); - meta_buf_.AddRecvInitialMetadata(context_); + meta_buf_.SetOutputTag(tag); + meta_buf_.RecvInitialMetadata(context_); call_.PerformOps(&meta_buf_); } void Finish(R* msg, Status* status, void* tag) { - finish_buf_.Reset(tag); + finish_buf_.SetOutputTag(tag); if (!context_->initial_metadata_received_) { finish_buf_.AddRecvInitialMetadata(context_); } - finish_buf_.AddRecvMessage(msg); - finish_buf_.AddClientRecvStatus(context_, status); + finish_buf_.RecvMessage(msg); + finish_buf_.ClientRecvStatus(context_, status); call_.PerformOps(&finish_buf_); } private: ClientContext* context_; Call call_; - SneakyCallOpBuffer init_buf_; - CallOpBuffer meta_buf_; - CallOpBuffer finish_buf_; + SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> init_buf_; + CallOpSet<CallOpRecvInitialMetadata> meta_buf_; + CallOpSet<CallOpRecvMessage<R>, CallOpClientRecvStatus> finish_buf_; }; template <class W> @@ -104,34 +105,34 @@ class ServerAsyncResponseWriter GRPC_FINAL void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_buf_.Reset(tag); - meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + meta_buf_.SetOutputTag(tag); + meta_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; call_.PerformOps(&meta_buf_); } void Finish(const W& msg, const Status& status, void* tag) { - finish_buf_.Reset(tag); + finish_buf_.SetOutputTag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (status.IsOk()) { - finish_buf_.AddSendMessage(msg); + finish_buf_.SendMessage(msg); } - finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); + finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } void FinishWithError(const Status& status, void* tag) { GPR_ASSERT(!status.IsOk()); - finish_buf_.Reset(tag); + finish_buf_.SetOutputTag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); + finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } @@ -140,8 +141,8 @@ class ServerAsyncResponseWriter GRPC_FINAL Call call_; ServerContext* ctx_; - CallOpBuffer meta_buf_; - CallOpBuffer finish_buf_; + CallOpSet<CallOpSendInitialMetadata> meta_buf_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpServerSendStatus> finish_buf_; }; } // namespace grpc diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index e8429c8f41..b45b5e2e71 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -35,7 +35,6 @@ #define GRPCXX_COMPLETION_QUEUE_H #include <grpc/support/time.h> -#include <grpc++/impl/client_unary_call.h> #include <grpc++/impl/grpc_library.h> #include <grpc++/time.h> @@ -56,7 +55,10 @@ class ServerWriter; template <class R, class W> class ServerReaderWriter; +class ChannelInterface; +class ClientContext; class CompletionQueue; +class RpcMethod; class Server; class ServerBuilder; class ServerContext; @@ -120,11 +122,12 @@ class CompletionQueue : public GrpcLibrary { friend class ::grpc::ServerReaderWriter; friend class ::grpc::Server; friend class ::grpc::ServerContext; + template <class InputMessage, class OutputMessage> friend Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, - const grpc::protobuf::Message& request, - grpc::protobuf::Message* result); + const InputMessage& request, + OutputMessage* result); NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); diff --git a/include/grpc++/config.h b/include/grpc++/config.h index 55b2a64482..af248b66e3 100644 --- a/include/grpc++/config.h +++ b/include/grpc++/config.h @@ -77,33 +77,6 @@ #define GRPC_OVERRIDE override #endif -#ifndef GRPC_CUSTOM_PROTOBUF_INT64 -#include <google/protobuf/stubs/common.h> -#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64 -#endif - -#ifndef GRPC_CUSTOM_MESSAGE -#include <google/protobuf/message.h> -#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message -#endif - -#ifndef GRPC_CUSTOM_STRING -#include <string> -#define GRPC_CUSTOM_STRING std::string -#endif - -#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM -#include <google/protobuf/io/coded_stream.h> -#include <google/protobuf/io/zero_copy_stream.h> -#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \ - ::google::protobuf::io::ZeroCopyOutputStream -#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \ - ::google::protobuf::io::ZeroCopyInputStream -#define GRPC_CUSTOM_CODEDINPUTSTREAM \ - ::google::protobuf::io::CodedInputStream -#endif - - #ifdef GRPC_CXX0X_NO_NULLPTR #include <memory> const class { @@ -121,23 +94,15 @@ private: } nullptr = {}; #endif +#ifndef GRPC_CUSTOM_STRING +#include <string> +#define GRPC_CUSTOM_STRING std::string +#endif + namespace grpc { typedef GRPC_CUSTOM_STRING string; -namespace protobuf { - -typedef GRPC_CUSTOM_MESSAGE Message; -typedef GRPC_CUSTOM_PROTOBUF_INT64 int64; - -namespace io { -typedef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ZeroCopyOutputStream; -typedef GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ZeroCopyInputStream; -typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream; -} // namespace io - -} // namespace protobuf - } // namespace grpc #endif // GRPCXX_CONFIG_H diff --git a/include/grpc++/config_protobuf.h b/include/grpc++/config_protobuf.h new file mode 100644 index 0000000000..5ef1be1aa9 --- /dev/null +++ b/include/grpc++/config_protobuf.h @@ -0,0 +1,75 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_CONFIG_PROTOBUF_H +#define GRPCXX_CONFIG_PROTOBUF_H + +#include <grpc++/impl/serialization_traits.h> + +#ifndef GRPC_CUSTOM_PROTOBUF_INT64 +#include <google/protobuf/stubs/common.h> +#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64 +#endif + +#ifndef GRPC_CUSTOM_MESSAGE +#include <google/protobuf/message.h> +#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message +#endif + +#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream.h> +#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \ + ::google::protobuf::io::ZeroCopyOutputStream +#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \ + ::google::protobuf::io::ZeroCopyInputStream +#define GRPC_CUSTOM_CODEDINPUTSTREAM \ + ::google::protobuf::io::CodedInputStream +#endif + +namespace grpc { +namespace protobuf { + +typedef GRPC_CUSTOM_MESSAGE Message; +typedef GRPC_CUSTOM_PROTOBUF_INT64 int64; + +namespace io { +typedef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ZeroCopyOutputStream; +typedef GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ZeroCopyInputStream; +typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream; +} // namespace io + +} // namespace protobuf +} // namespace grpc + +#endif // GRPCXX_CONFIG_PROTOBUF_H diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index aae199db1b..f8b290a851 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -50,6 +50,128 @@ namespace grpc { class ByteBuffer; class Call; +class CallNoOp { + protected: + void AddOp(grpc_op* ops, size_t* nops) {} + void FinishOp(void* tag, bool* status) {} +}; + +class CallOpSendInitialMetadata { + public: + void SendInitialMetadata(const std::multimap<grpc::string, grpc::string>& metadata); + + protected: + void AddOp(grpc_op* ops, size_t* nops); + void FinishOp(void* tag, bool* status); +}; + +class CallOpSendMessage { + public: + template <class M> + void SendMessage(const M& message); + + protected: + void AddOp(grpc_op* ops, size_t* nops); + void FinishOp(void* tag, bool* status); +}; + +template <class M> +class CallOpRecvMessage { + protected: + void AddOp(grpc_op* ops, size_t* nops); + void FinishOp(void* tag, bool* status); +}; + +class CallOpGenericRecvMessage { + public: + template <class R> + void RecvMessage(R* message); + + protected: + void AddOp(grpc_op* ops, size_t* nops); + void FinishOp(void* tag, bool* status); +}; + +class CallOpClientSendClose { + public: + void ClientSendClose(); + + protected: + void AddOp(grpc_op* ops, size_t* nops); + void FinishOp(void* tag, bool* status); +}; + +class CallOpServerSendStatus { + public: + void ServerSendStatus(const std::multimap<grpc::string, grpc::string>& trailing_metadata, const Status& status); + + protected: + void AddOp(grpc_op* ops, size_t* nops); + void FinishOp(void* tag, bool* status); +}; + +class CallOpRecvInitialMetadata { + public: + void RecvInitialMetadata(ClientContext* context); + + protected: + void AddOp(grpc_op* ops, size_t* nops); + void FinishOp(void* tag, bool* status); +}; + +class CallOpClientRecvStatus { + public: + void ClientRecvStatus(ClientContext* context, Status* status); + + protected: + void AddOp(grpc_op* ops, size_t* nops); + void FinishOp(void* tag, bool* status); +}; + +class CallOpSetInterface : public CompletionQueueTag { + public: + virtual void FillOps(grpc_op* ops, size_t* nops) = 0; +}; + +template <class T, int I> +class WrapAndDerive : public T {}; + +template <class Op1 = CallNoOp, class Op2 = CallNoOp, class Op3 = CallNoOp, class Op4 = CallNoOp, class Op5 = CallNoOp, class Op6 = CallNoOp> +class CallOpSet : public CallOpSetInterface, +public WrapAndDerive<Op1, 1>, +public WrapAndDerive<Op2, 2>, +public WrapAndDerive<Op3, 3>, +public WrapAndDerive<Op4, 4>, +public WrapAndDerive<Op5, 5>, +public WrapAndDerive<Op6, 6> { + public: + void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE { + this->Op1::AddOp(ops, nops); + this->Op2::AddOp(ops, nops); + this->Op3::AddOp(ops, nops); + this->Op4::AddOp(ops, nops); + this->Op5::AddOp(ops, nops); + this->Op6::AddOp(ops, nops); + } + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { + this->Op1::FinishOp(*tag, status); + this->Op2::FinishOp(*tag, status); + this->Op3::FinishOp(*tag, status); + this->Op4::FinishOp(*tag, status); + this->Op5::FinishOp(*tag, status); + this->Op6::FinishOp(*tag, status); + *tag = return_tag_; + return true; + } + + void SetOutputTag(void* return_tag) { return_tag_ = return_tag; } + + private: + void *return_tag_; +}; + +#if 0 class CallOpBuffer : public CompletionQueueTag { public: CallOpBuffer(); @@ -122,12 +244,14 @@ class CallOpBuffer : public CompletionQueueTag { int cancelled_buf_; bool* recv_closed_; }; +#endif // SneakyCallOpBuffer does not post completions to the completion queue -class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer { +template <class Op1 = CallNoOp, class Op2 = CallNoOp, class Op3 = CallNoOp, class Op4 = CallNoOp, class Op5 = CallNoOp, class Op6 = CallNoOp> +class SneakyCallOpSet GRPC_FINAL : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> { public: bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { - return CallOpBuffer::FinalizeResult(tag, status) && false; + return CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6>::FinalizeResult(tag, status) && false; } }; @@ -135,7 +259,7 @@ class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer { class CallHook { public: virtual ~CallHook() {} - virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) = 0; + virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; }; // Straightforward wrapping of the C call object @@ -146,7 +270,7 @@ class Call GRPC_FINAL { Call(grpc_call* call, CallHook* call_hook_, CompletionQueue* cq, int max_message_size); - void PerformOps(CallOpBuffer* buffer); + void PerformOps(CallOpSetInterface* ops); grpc_call* call() { return call_; } CompletionQueue* cq() { return cq_; } diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h index 0e8aeed781..561c4721ef 100644 --- a/include/grpc++/impl/client_unary_call.h +++ b/include/grpc++/impl/client_unary_call.h @@ -36,6 +36,8 @@ #include <grpc++/config.h> +#include <grpc++/impl/call.h> + namespace grpc { class ChannelInterface; @@ -45,10 +47,31 @@ class RpcMethod; class Status; // Wrapper that performs a blocking unary call +template <class InputMessage, class OutputMessage> Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, - const grpc::protobuf::Message& request, - grpc::protobuf::Message* result); + const InputMessage& request, + OutputMessage* result) { + CompletionQueue cq; + Call call(channel->CreateCall(method, context, &cq)); + CallOpSet< + CallOpSendInitialMetadata, + CallOpSendMessage, + CallOpRecvInitialMetadata, + CallOpRecvMessage<OutputMessage>, + CallOpClientSendClose, + CallOpClientRecvStatus> ops; + Status status; + ops.AddSendInitialMetadata(context); + ops.AddSendMessage(request); + ops.AddRecvInitialMetadata(context); + ops.AddRecvMessage(result); + ops.AddClientSendClose(); + ops.AddClientRecvStatus(context, &status); + call.PerformOps(&ops); + GPR_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.IsOk()); + return status; +} } // namespace grpc diff --git a/src/cpp/client/client_unary_call.cc b/include/grpc++/impl/serialization_traits.h index 7e7ea78bcd..d21ad92475 100644 --- a/src/cpp/client/client_unary_call.cc +++ b/include/grpc++/impl/serialization_traits.h @@ -31,34 +31,19 @@ * */ -#include <grpc++/impl/client_unary_call.h> -#include <grpc++/impl/call.h> -#include <grpc++/channel_interface.h> -#include <grpc++/client_context.h> -#include <grpc++/completion_queue.h> -#include <grpc++/status.h> -#include <grpc/support/log.h> +#ifndef GRPCXX_IMPL_SERIALIZATION_TRAITS_H +#define GRPCXX_IMPL_SERIALIZATION_TRAITS_H + +struct grpc_byte_buffer; namespace grpc { -// Wrapper that performs a blocking unary call -Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, - const grpc::protobuf::Message& request, - grpc::protobuf::Message* result) { - CompletionQueue cq; - Call call(channel->CreateCall(method, context, &cq)); - CallOpBuffer buf; - Status status; - buf.AddSendInitialMetadata(context); - buf.AddSendMessage(request); - buf.AddRecvInitialMetadata(context); - buf.AddRecvMessage(result); - buf.AddClientSendClose(); - buf.AddClientRecvStatus(context, &status); - call.PerformOps(&buf); - GPR_ASSERT((cq.Pluck(&buf) && buf.got_message) || !status.IsOk()); - return status; -} +template <class Message> +class SerializationTraits; + +typedef bool (*SerializationTraitsReadFunction)(grpc_byte_buffer* src, void* dest); +typedef bool (*SerializationTraitsWriteFunction)(const void* src, grpc_byte_buffer* dst); } // namespace grpc + +#endif // GRPCXX_IMPL_SERIALIZATION_TRAITS_H diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h index bc39bb82ac..af21d9b8cf 100644 --- a/include/grpc++/impl/service_type.h +++ b/include/grpc++/impl/service_type.h @@ -35,6 +35,8 @@ #define GRPCXX_IMPL_SERVICE_TYPE_H #include <grpc++/config.h> +#include <grpc++/impl/serialization_traits.h> +#include <grpc++/server.h> namespace grpc { @@ -65,20 +67,8 @@ class ServerAsyncStreamingInterface { class AsynchronousService { public: - // this is Server, but in disguise to avoid a link dependency - class DispatchImpl { - public: - virtual void RequestAsyncCall(void* registered_method, - ServerContext* context, - ::grpc::protobuf::Message* request, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) = 0; - }; - AsynchronousService(const char** method_names, size_t method_count) - : dispatch_impl_(nullptr), + : server_(nullptr), method_names_(method_names), method_count_(method_count), request_args_(nullptr) {} @@ -86,42 +76,44 @@ class AsynchronousService { ~AsynchronousService() { delete[] request_args_; } protected: + template <class Message> void RequestAsyncUnary(int index, ServerContext* context, - grpc::protobuf::Message* request, + Message* request, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, - stream, call_cq, notification_cq, tag); + server_->RequestAsyncCall(request_args_[index], context, + stream, call_cq, notification_cq, tag, request); } void RequestClientStreaming(int index, ServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr, + server_->RequestAsyncCall(request_args_[index], context, stream, call_cq, notification_cq, tag); } + template <class Message> void RequestServerStreaming(int index, ServerContext* context, - grpc::protobuf::Message* request, + Message* request, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, - stream, call_cq, notification_cq, tag); + server_->RequestAsyncCall(request_args_[index], context, + stream, call_cq, notification_cq, tag, request); } void RequestBidiStreaming(int index, ServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr, + server_->RequestAsyncCall(request_args_[index], context, stream, call_cq, notification_cq, tag); } private: friend class Server; - DispatchImpl* dispatch_impl_; + Server* server_; const char** const method_names_; size_t method_count_; void** request_args_; diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 2cfeb359fc..e0599ee768 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -53,13 +53,13 @@ class GenericServerContext; class AsyncGenericService; class RpcService; class RpcServiceMethod; +class ServerAsyncStreamingInterface; class ServerCredentials; class ThreadPoolInterface; // Currently it only supports handling rpcs in a single thread. class Server GRPC_FINAL : public GrpcLibrary, - private CallHook, - private AsynchronousService::DispatchImpl { + private CallHook { public: ~Server(); @@ -73,6 +73,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private: friend class AsyncGenericService; + friend class AsynchronousService; friend class ServerBuilder; class SyncRequest; @@ -96,15 +97,20 @@ class Server GRPC_FINAL : public GrpcLibrary, void RunRpc(); void ScheduleCallback(); - void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE; + void PerformOpsOnCall(CallOpSetInterface *ops, Call* call) GRPC_OVERRIDE; - // DispatchImpl + template <class Message> void RequestAsyncCall(void* registered_method, ServerContext* context, - grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, - void* tag) GRPC_OVERRIDE; + void* tag, Message *message); + + void RequestAsyncCall(void* registered_method, ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag); void RequestAsyncGenericCall(GenericServerContext* context, ServerAsyncStreamingInterface* stream, diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index c836f98c2a..32ba03f8d8 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -93,15 +93,16 @@ template <class R> class ClientReader GRPC_FINAL : public ClientReaderInterface<R> { public: // Blocking create a stream and write the first request out. + template <class W> ClientReader(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, const grpc::protobuf::Message& request) + ClientContext* context, const W& request) : context_(context), call_(channel->CreateCall(method, context, &cq_)) { - CallOpBuffer buf; - buf.AddSendInitialMetadata(&context->send_initial_metadata_); - buf.AddSendMessage(request); - buf.AddClientSendClose(); - call_.PerformOps(&buf); - cq_.Pluck(&buf); + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> ops; + ops.SendInitialMetadata(context->send_initial_metadata_); + ops.SendMessage(request); + ops.ClientSendClose(); + call_.PerformOps(&ops); + cq_.Pluck(&ops); } // Blocking wait for initial metadata from server. The received metadata @@ -111,28 +112,28 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> { void WaitForInitialMetadata() { GPR_ASSERT(!context_->initial_metadata_received_); - CallOpBuffer buf; - buf.AddRecvInitialMetadata(context_); - call_.PerformOps(&buf); - cq_.Pluck(&buf); // status ignored + CallOpSet<CallOpRecvInitialMetadata> ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); // status ignored } bool Read(R* msg) GRPC_OVERRIDE { - CallOpBuffer buf; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; if (!context_->initial_metadata_received_) { - buf.AddRecvInitialMetadata(context_); + ops.RecvInitialMetadata(context_); } - buf.AddRecvMessage(msg); - call_.PerformOps(&buf); - return cq_.Pluck(&buf) && buf.got_message; + ops.RecvMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops) && ops.got_message; } Status Finish() GRPC_OVERRIDE { - CallOpBuffer buf; + CallOpSet<CallOpClientRecvStatus> ops; Status status; - buf.AddClientRecvStatus(context_, &status); - call_.PerformOps(&buf); - GPR_ASSERT(cq_.Pluck(&buf)); + ops.ClientRecvStatus(context_, &status); + call_.PerformOps(&ops); + GPR_ASSERT(cq_.Pluck(&ops)); return status; } @@ -150,48 +151,48 @@ class ClientWriterInterface : public ClientStreamingInterface, }; template <class W> -class ClientWriter GRPC_FINAL : public ClientWriterInterface<W> { +class ClientWriter : public ClientWriterInterface<W> { public: // Blocking create a stream. + template <class R> ClientWriter(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, grpc::protobuf::Message* response) + ClientContext* context, R* response) : context_(context), - response_(response), call_(channel->CreateCall(method, context, &cq_)) { - CallOpBuffer buf; - buf.AddSendInitialMetadata(&context->send_initial_metadata_); - call_.PerformOps(&buf); - cq_.Pluck(&buf); + finish_ops_.RecvMessage(response); + + CallOpSet<CallOpRecvMessage<R>> ops; + ops.AddSendInitialMetadata(&context->send_initial_metadata_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); } bool Write(const W& msg) GRPC_OVERRIDE { - CallOpBuffer buf; - buf.AddSendMessage(msg); - call_.PerformOps(&buf); - return cq_.Pluck(&buf); + CallOpSet<CallOpSendMessage> ops; + ops.SendMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); } bool WritesDone() GRPC_OVERRIDE { - CallOpBuffer buf; - buf.AddClientSendClose(); - call_.PerformOps(&buf); - return cq_.Pluck(&buf); + CallOpSet<CallOpClientSendClose> ops; + ops.ClientSendClose(); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); } // Read the final response and wait for the final status. Status Finish() GRPC_OVERRIDE { - CallOpBuffer buf; Status status; - buf.AddRecvMessage(response_); - buf.AddClientRecvStatus(context_, &status); - call_.PerformOps(&buf); - GPR_ASSERT(cq_.Pluck(&buf)); + finish_ops_.ClientRecvStatus(context_, &status); + call_.PerformOps(&finish_ops_); + GPR_ASSERT(cq_.Pluck(&finish_ops_)); return status; } private: ClientContext* context_; - grpc::protobuf::Message* const response_; + CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_; CompletionQueue cq_; Call call_; }; @@ -213,10 +214,10 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context) : context_(context), call_(channel->CreateCall(method, context, &cq_)) { - CallOpBuffer buf; - buf.AddSendInitialMetadata(&context->send_initial_metadata_); - call_.PerformOps(&buf); - cq_.Pluck(&buf); + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); } // Blocking wait for initial metadata from server. The received metadata @@ -226,42 +227,42 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { void WaitForInitialMetadata() { GPR_ASSERT(!context_->initial_metadata_received_); - CallOpBuffer buf; - buf.AddRecvInitialMetadata(context_); - call_.PerformOps(&buf); - cq_.Pluck(&buf); // status ignored + CallOpSet<CallOpRecvInitialMetadata> ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); // status ignored } bool Read(R* msg) GRPC_OVERRIDE { - CallOpBuffer buf; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; if (!context_->initial_metadata_received_) { - buf.AddRecvInitialMetadata(context_); + ops.RecvInitialMetadata(context_); } - buf.AddRecvMessage(msg); - call_.PerformOps(&buf); - return cq_.Pluck(&buf) && buf.got_message; + ops.RecvMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops) && ops.got_message; } bool Write(const W& msg) GRPC_OVERRIDE { - CallOpBuffer buf; - buf.AddSendMessage(msg); - call_.PerformOps(&buf); - return cq_.Pluck(&buf); + CallOpSet<CallOpSendMessage> ops; + ops.SendMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); } bool WritesDone() GRPC_OVERRIDE { - CallOpBuffer buf; - buf.AddClientSendClose(); - call_.PerformOps(&buf); - return cq_.Pluck(&buf); + CallOpSet<CallOpClientSendClose> ops; + ops.ClientSendClose(); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); } Status Finish() GRPC_OVERRIDE { - CallOpBuffer buf; + CallOpSet<CallOpClientRecvStatus> ops; Status status; - buf.AddClientRecvStatus(context_, &status); - call_.PerformOps(&buf); - GPR_ASSERT(cq_.Pluck(&buf)); + ops.ClientRecvStatus(context_, &status); + call_.PerformOps(&ops); + GPR_ASSERT(cq_.Pluck(&ops)); return status; } @@ -279,18 +280,18 @@ class ServerReader GRPC_FINAL : public ReaderInterface<R> { void SendInitialMetadata() { GPR_ASSERT(!ctx_->sent_initial_metadata_); - CallOpBuffer buf; - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&buf); - call_->cq()->Pluck(&buf); + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); } bool Read(R* msg) GRPC_OVERRIDE { - CallOpBuffer buf; - buf.AddRecvMessage(msg); - call_->PerformOps(&buf); - return call_->cq()->Pluck(&buf) && buf.got_message; + CallOpSet<CallOpRecvMessage<R>> ops; + ops.RecvMessage(msg); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops) && ops.got_message; } private: @@ -306,22 +307,22 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> { void SendInitialMetadata() { GPR_ASSERT(!ctx_->sent_initial_metadata_); - CallOpBuffer buf; - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&buf); - call_->cq()->Pluck(&buf); + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); } bool Write(const W& msg) GRPC_OVERRIDE { - CallOpBuffer buf; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; if (!ctx_->sent_initial_metadata_) { - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ops.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - buf.AddSendMessage(msg); - call_->PerformOps(&buf); - return call_->cq()->Pluck(&buf); + ops.SendMessage(msg); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops); } private: @@ -339,29 +340,29 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>, void SendInitialMetadata() { GPR_ASSERT(!ctx_->sent_initial_metadata_); - CallOpBuffer buf; - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&buf); - call_->cq()->Pluck(&buf); + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); } bool Read(R* msg) GRPC_OVERRIDE { - CallOpBuffer buf; - buf.AddRecvMessage(msg); - call_->PerformOps(&buf); - return call_->cq()->Pluck(&buf) && buf.got_message; + CallOpSet<CallOpRecvMessage<R>> ops; + ops.RecvMessage(msg); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops) && ops.got_message; } bool Write(const W& msg) GRPC_OVERRIDE { - CallOpBuffer buf; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; if (!ctx_->sent_initial_metadata_) { - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ops.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - buf.AddSendMessage(msg); - call_->PerformOps(&buf); - return call_->cq()->Pluck(&buf); + ops.SendMessage(msg); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops); } private: @@ -407,50 +408,51 @@ template <class R> class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> { public: // Create a stream and write the first request out. + template <class W> ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, - const grpc::protobuf::Message& request, void* tag) + const W& request, void* tag) : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_buf_.Reset(tag); - init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); - init_buf_.AddSendMessage(request); - init_buf_.AddClientSendClose(); - call_.PerformOps(&init_buf_); + init_ops_.SetOutputTag(tag); + init_ops_.SendInitialMetadata(context->send_initial_metadata_); + init_ops_.SendMessage(request); + init_ops_.ClientSendClose(); + call_.PerformOps(&init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_buf_.Reset(tag); - meta_buf_.AddRecvInitialMetadata(context_); - call_.PerformOps(&meta_buf_); + meta_ops_.SetOutputTag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_buf_.Reset(tag); + read_ops_.SetOutputTag(tag); if (!context_->initial_metadata_received_) { - read_buf_.AddRecvInitialMetadata(context_); + read_ops_.RecvInitialMetadata(context_); } - read_buf_.AddRecvMessage(msg); - call_.PerformOps(&read_buf_); + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_buf_.Reset(tag); + finish_ops_.SetOutputTag(tag); if (!context_->initial_metadata_received_) { - finish_buf_.AddRecvInitialMetadata(context_); + finish_ops_.RecvInitialMetadata(context_); } - finish_buf_.AddClientRecvStatus(context_, status); - call_.PerformOps(&finish_buf_); + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); } private: ClientContext* context_; Call call_; - CallOpBuffer init_buf_; - CallOpBuffer meta_buf_; - CallOpBuffer read_buf_; - CallOpBuffer finish_buf_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> init_ops_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; }; template <class W> @@ -463,56 +465,56 @@ class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, template <class W> class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> { public: + template <class R> ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, - grpc::protobuf::Message* response, void* tag) + R* response, void* tag) : context_(context), - response_(response), call_(channel->CreateCall(method, context, cq)) { - init_buf_.Reset(tag); - init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); - call_.PerformOps(&init_buf_); + finish_ops_.RecvMessage(response); + + init_ops_.SetOutputTag(tag); + init_ops_.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_buf_.Reset(tag); - meta_buf_.AddRecvInitialMetadata(context_); - call_.PerformOps(&meta_buf_); + meta_ops_.SetOutputTag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_buf_.Reset(tag); - write_buf_.AddSendMessage(msg); - call_.PerformOps(&write_buf_); + write_ops_.SetOutputTag(tag); + write_ops_.SendMessage(msg); + call_.PerformOps(&write_ops_); } void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_buf_.Reset(tag); - writes_done_buf_.AddClientSendClose(); - call_.PerformOps(&writes_done_buf_); + writes_done_ops_.SetOutputTag(tag); + writes_done_ops_.ClientSendClose(); + call_.PerformOps(&writes_done_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_buf_.Reset(tag); + finish_ops_.SetOutputTag(tag); if (!context_->initial_metadata_received_) { - finish_buf_.AddRecvInitialMetadata(context_); + finish_ops_.RecvInitialMetadata(context_); } - finish_buf_.AddRecvMessage(response_); - finish_buf_.AddClientRecvStatus(context_, status); - call_.PerformOps(&finish_buf_); + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); } private: ClientContext* context_; - grpc::protobuf::Message* const response_; Call call_; - CallOpBuffer init_buf_; - CallOpBuffer meta_buf_; - CallOpBuffer write_buf_; - CallOpBuffer writes_done_buf_; - CallOpBuffer finish_buf_; + CallOpSet<CallOpSendInitialMetadata> init_ops_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpSendMessage> write_ops_; + CallOpSet<CallOpClientSendClose> writes_done_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_; }; // Client-side interface for bi-directional streaming. @@ -532,58 +534,58 @@ class ClientAsyncReaderWriter GRPC_FINAL const RpcMethod& method, ClientContext* context, void* tag) : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_buf_.Reset(tag); - init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); - call_.PerformOps(&init_buf_); + init_ops_.SetOutputTag(tag); + init_ops_.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_buf_.Reset(tag); - meta_buf_.AddRecvInitialMetadata(context_); - call_.PerformOps(&meta_buf_); + meta_ops_.SetOutputTag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_buf_.Reset(tag); + read_ops_.SetOutputTag(tag); if (!context_->initial_metadata_received_) { - read_buf_.AddRecvInitialMetadata(context_); + read_ops_.RecvInitialMetadata(context_); } - read_buf_.AddRecvMessage(msg); - call_.PerformOps(&read_buf_); + read_ops_.AddRecvMessage(msg); + call_.PerformOps(&read_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_buf_.Reset(tag); - write_buf_.AddSendMessage(msg); - call_.PerformOps(&write_buf_); + write_ops_.SetOutputTag(tag); + write_ops_.SendMessage(msg); + call_.PerformOps(&write_ops_); } void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_buf_.Reset(tag); - writes_done_buf_.AddClientSendClose(); - call_.PerformOps(&writes_done_buf_); + writes_done_ops_.SetOutputTag(tag); + writes_done_ops_.ClientSendClose(); + call_.PerformOps(&writes_done_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_buf_.Reset(tag); + finish_ops_.SetOutputTag(tag); if (!context_->initial_metadata_received_) { - finish_buf_.AddRecvInitialMetadata(context_); + finish_ops_.RecvInitialMetadata(context_); } - finish_buf_.AddClientRecvStatus(context_, status); - call_.PerformOps(&finish_buf_); + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); } private: ClientContext* context_; Call call_; - CallOpBuffer init_buf_; - CallOpBuffer meta_buf_; - CallOpBuffer read_buf_; - CallOpBuffer write_buf_; - CallOpBuffer writes_done_buf_; - CallOpBuffer finish_buf_; + CallOpSet<CallOpSendInitialMetadata> init_ops_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendMessage> write_ops_; + CallOpSet<CallOpClientSendClose> writes_done_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; }; template <class W, class R> @@ -596,41 +598,41 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_buf_.Reset(tag); - meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + meta_ops_.SetOutputTag(tag); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_buf_); + call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_buf_.Reset(tag); - read_buf_.AddRecvMessage(msg); - call_.PerformOps(&read_buf_); + read_ops_.SetOutputTag(tag); + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); } void Finish(const W& msg, const Status& status, void* tag) { - finish_buf_.Reset(tag); + finish_ops_.SetOutputTag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (status.IsOk()) { - finish_buf_.AddSendMessage(msg); + finish_ops_.SendMessage(msg); } - finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_buf_); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); } void FinishWithError(const Status& status, void* tag) { GPR_ASSERT(!status.IsOk()); - finish_buf_.Reset(tag); + finish_ops_.SetOutputTag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_buf_); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); } private: @@ -638,9 +640,9 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - CallOpBuffer meta_buf_; - CallOpBuffer read_buf_; - CallOpBuffer finish_buf_; + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpServerSendStatus> finish_ops_; }; template <class W> @@ -653,30 +655,30 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_buf_.Reset(tag); - meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + meta_ops_.SetOutputTag(tag); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_buf_); + call_.PerformOps(&meta_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_buf_.Reset(tag); + write_ops_.SetOutputTag(tag); if (!ctx_->sent_initial_metadata_) { - write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + write_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - write_buf_.AddSendMessage(msg); - call_.PerformOps(&write_buf_); + write_ops_.SendMessage(msg); + call_.PerformOps(&write_ops_); } void Finish(const Status& status, void* tag) { - finish_buf_.Reset(tag); + finish_ops_.SetOutputTag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_buf_); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); } private: @@ -684,9 +686,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - CallOpBuffer meta_buf_; - CallOpBuffer write_buf_; - CallOpBuffer finish_buf_; + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; }; // Server-side interface for bi-directional streaming. @@ -701,36 +703,36 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_buf_.Reset(tag); - meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + meta_ops_.SetOutputTag(tag); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_buf_); + call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_buf_.Reset(tag); - read_buf_.AddRecvMessage(msg); - call_.PerformOps(&read_buf_); + read_ops_.SetOutputTag(tag); + read_ops_.AddRecvMessage(msg); + call_.PerformOps(&read_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_buf_.Reset(tag); + write_ops_.SetOutputTag(tag); if (!ctx_->sent_initial_metadata_) { - write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + write_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - write_buf_.AddSendMessage(msg); - call_.PerformOps(&write_buf_); + write_ops_.SendMessage(msg); + call_.PerformOps(&write_ops_); } void Finish(const Status& status, void* tag) { - finish_buf_.Reset(tag); + finish_ops_.SetOutputTag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_buf_); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); } private: @@ -738,10 +740,10 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - CallOpBuffer meta_buf_; - CallOpBuffer read_buf_; - CallOpBuffer write_buf_; - CallOpBuffer finish_buf_; + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; }; } // namespace grpc diff --git a/src/compiler/config.h b/src/compiler/config.h index e81de8d6c8..06ccd8530c 100644 --- a/src/compiler/config.h +++ b/src/compiler/config.h @@ -35,6 +35,7 @@ #define SRC_COMPILER_CONFIG_H #include <grpc++/config.h> +#include <grpc++/config_protobuf.h> #ifndef GRPC_CUSTOM_DESCRIPTOR #include <google/protobuf/descriptor.h> |