aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-06-04 08:51:17 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-06-04 08:51:17 -0700
commit81fafa8971d3dc1a070dd77834dc120974a62418 (patch)
tree7ba6ac5c1ae0e17590779507020d31e7a7e3dbd6
parentda11694971f12ee973e84c9386494a161228e51a (diff)
Beginning the cleanup
-rw-r--r--include/grpc++/async_unary_call.h47
-rw-r--r--include/grpc++/completion_queue.h9
-rw-r--r--include/grpc++/config.h45
-rw-r--r--include/grpc++/config_protobuf.h75
-rw-r--r--include/grpc++/impl/call.h132
-rw-r--r--include/grpc++/impl/client_unary_call.h27
-rw-r--r--include/grpc++/impl/serialization_traits.h (renamed from src/cpp/client/client_unary_call.cc)37
-rw-r--r--include/grpc++/impl/service_type.h36
-rw-r--r--include/grpc++/server.h18
-rw-r--r--include/grpc++/stream.h438
-rw-r--r--src/compiler/config.h1
11 files changed, 521 insertions, 344 deletions
diff --git a/include/grpc++/async_unary_call.h b/include/grpc++/async_unary_call.h
index 786f8c7184..a296102c3a 100644
--- a/include/grpc++/async_unary_call.h
+++ b/include/grpc++/async_unary_call.h
@@ -58,40 +58,41 @@ template <class R>
class ClientAsyncResponseReader GRPC_FINAL
: public ClientAsyncResponseReaderInterface<R> {
public:
+ template <class W>
ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
- const grpc::protobuf::Message& request)
+ const W& request)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
- init_buf_.AddSendMessage(request);
- init_buf_.AddClientSendClose();
+ init_buf_.SendInitialMetadata(context->send_initial_metadata_);
+ init_buf_.SendMessage(request);
+ init_buf_.ClientSendClose();
call_.PerformOps(&init_buf_);
}
void ReadInitialMetadata(void* tag) {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_buf_.Reset(tag);
- meta_buf_.AddRecvInitialMetadata(context_);
+ meta_buf_.SetOutputTag(tag);
+ meta_buf_.RecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
}
void Finish(R* msg, Status* status, void* tag) {
- finish_buf_.Reset(tag);
+ finish_buf_.SetOutputTag(tag);
if (!context_->initial_metadata_received_) {
finish_buf_.AddRecvInitialMetadata(context_);
}
- finish_buf_.AddRecvMessage(msg);
- finish_buf_.AddClientRecvStatus(context_, status);
+ finish_buf_.RecvMessage(msg);
+ finish_buf_.ClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
}
private:
ClientContext* context_;
Call call_;
- SneakyCallOpBuffer init_buf_;
- CallOpBuffer meta_buf_;
- CallOpBuffer finish_buf_;
+ SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> init_buf_;
+ CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
+ CallOpSet<CallOpRecvMessage<R>, CallOpClientRecvStatus> finish_buf_;
};
template <class W>
@@ -104,34 +105,34 @@ class ServerAsyncResponseWriter GRPC_FINAL
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- meta_buf_.Reset(tag);
- meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ meta_buf_.SetOutputTag(tag);
+ meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
}
void Finish(const W& msg, const Status& status, void* tag) {
- finish_buf_.Reset(tag);
+ finish_buf_.SetOutputTag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (status.IsOk()) {
- finish_buf_.AddSendMessage(msg);
+ finish_buf_.SendMessage(msg);
}
- finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+ finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.IsOk());
- finish_buf_.Reset(tag);
+ finish_buf_.SetOutputTag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+ finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
@@ -140,8 +141,8 @@ class ServerAsyncResponseWriter GRPC_FINAL
Call call_;
ServerContext* ctx_;
- CallOpBuffer meta_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata> meta_buf_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpServerSendStatus> finish_buf_;
};
} // namespace grpc
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h
index e8429c8f41..b45b5e2e71 100644
--- a/include/grpc++/completion_queue.h
+++ b/include/grpc++/completion_queue.h
@@ -35,7 +35,6 @@
#define GRPCXX_COMPLETION_QUEUE_H
#include <grpc/support/time.h>
-#include <grpc++/impl/client_unary_call.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/time.h>
@@ -56,7 +55,10 @@ class ServerWriter;
template <class R, class W>
class ServerReaderWriter;
+class ChannelInterface;
+class ClientContext;
class CompletionQueue;
+class RpcMethod;
class Server;
class ServerBuilder;
class ServerContext;
@@ -120,11 +122,12 @@ class CompletionQueue : public GrpcLibrary {
friend class ::grpc::ServerReaderWriter;
friend class ::grpc::Server;
friend class ::grpc::ServerContext;
+ template <class InputMessage, class OutputMessage>
friend Status BlockingUnaryCall(ChannelInterface* channel,
const RpcMethod& method,
ClientContext* context,
- const grpc::protobuf::Message& request,
- grpc::protobuf::Message* result);
+ const InputMessage& request,
+ OutputMessage* result);
NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline);
diff --git a/include/grpc++/config.h b/include/grpc++/config.h
index 55b2a64482..af248b66e3 100644
--- a/include/grpc++/config.h
+++ b/include/grpc++/config.h
@@ -77,33 +77,6 @@
#define GRPC_OVERRIDE override
#endif
-#ifndef GRPC_CUSTOM_PROTOBUF_INT64
-#include <google/protobuf/stubs/common.h>
-#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64
-#endif
-
-#ifndef GRPC_CUSTOM_MESSAGE
-#include <google/protobuf/message.h>
-#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message
-#endif
-
-#ifndef GRPC_CUSTOM_STRING
-#include <string>
-#define GRPC_CUSTOM_STRING std::string
-#endif
-
-#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM
-#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream.h>
-#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \
- ::google::protobuf::io::ZeroCopyOutputStream
-#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \
- ::google::protobuf::io::ZeroCopyInputStream
-#define GRPC_CUSTOM_CODEDINPUTSTREAM \
- ::google::protobuf::io::CodedInputStream
-#endif
-
-
#ifdef GRPC_CXX0X_NO_NULLPTR
#include <memory>
const class {
@@ -121,23 +94,15 @@ private:
} nullptr = {};
#endif
+#ifndef GRPC_CUSTOM_STRING
+#include <string>
+#define GRPC_CUSTOM_STRING std::string
+#endif
+
namespace grpc {
typedef GRPC_CUSTOM_STRING string;
-namespace protobuf {
-
-typedef GRPC_CUSTOM_MESSAGE Message;
-typedef GRPC_CUSTOM_PROTOBUF_INT64 int64;
-
-namespace io {
-typedef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ZeroCopyOutputStream;
-typedef GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ZeroCopyInputStream;
-typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream;
-} // namespace io
-
-} // namespace protobuf
-
} // namespace grpc
#endif // GRPCXX_CONFIG_H
diff --git a/include/grpc++/config_protobuf.h b/include/grpc++/config_protobuf.h
new file mode 100644
index 0000000000..5ef1be1aa9
--- /dev/null
+++ b/include/grpc++/config_protobuf.h
@@ -0,0 +1,75 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPCXX_CONFIG_PROTOBUF_H
+#define GRPCXX_CONFIG_PROTOBUF_H
+
+#include <grpc++/impl/serialization_traits.h>
+
+#ifndef GRPC_CUSTOM_PROTOBUF_INT64
+#include <google/protobuf/stubs/common.h>
+#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64
+#endif
+
+#ifndef GRPC_CUSTOM_MESSAGE
+#include <google/protobuf/message.h>
+#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message
+#endif
+
+#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream.h>
+#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \
+ ::google::protobuf::io::ZeroCopyOutputStream
+#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \
+ ::google::protobuf::io::ZeroCopyInputStream
+#define GRPC_CUSTOM_CODEDINPUTSTREAM \
+ ::google::protobuf::io::CodedInputStream
+#endif
+
+namespace grpc {
+namespace protobuf {
+
+typedef GRPC_CUSTOM_MESSAGE Message;
+typedef GRPC_CUSTOM_PROTOBUF_INT64 int64;
+
+namespace io {
+typedef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ZeroCopyOutputStream;
+typedef GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ZeroCopyInputStream;
+typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream;
+} // namespace io
+
+} // namespace protobuf
+} // namespace grpc
+
+#endif // GRPCXX_CONFIG_PROTOBUF_H
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h
index aae199db1b..f8b290a851 100644
--- a/include/grpc++/impl/call.h
+++ b/include/grpc++/impl/call.h
@@ -50,6 +50,128 @@ namespace grpc {
class ByteBuffer;
class Call;
+class CallNoOp {
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops) {}
+ void FinishOp(void* tag, bool* status) {}
+};
+
+class CallOpSendInitialMetadata {
+ public:
+ void SendInitialMetadata(const std::multimap<grpc::string, grpc::string>& metadata);
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops);
+ void FinishOp(void* tag, bool* status);
+};
+
+class CallOpSendMessage {
+ public:
+ template <class M>
+ void SendMessage(const M& message);
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops);
+ void FinishOp(void* tag, bool* status);
+};
+
+template <class M>
+class CallOpRecvMessage {
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops);
+ void FinishOp(void* tag, bool* status);
+};
+
+class CallOpGenericRecvMessage {
+ public:
+ template <class R>
+ void RecvMessage(R* message);
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops);
+ void FinishOp(void* tag, bool* status);
+};
+
+class CallOpClientSendClose {
+ public:
+ void ClientSendClose();
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops);
+ void FinishOp(void* tag, bool* status);
+};
+
+class CallOpServerSendStatus {
+ public:
+ void ServerSendStatus(const std::multimap<grpc::string, grpc::string>& trailing_metadata, const Status& status);
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops);
+ void FinishOp(void* tag, bool* status);
+};
+
+class CallOpRecvInitialMetadata {
+ public:
+ void RecvInitialMetadata(ClientContext* context);
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops);
+ void FinishOp(void* tag, bool* status);
+};
+
+class CallOpClientRecvStatus {
+ public:
+ void ClientRecvStatus(ClientContext* context, Status* status);
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops);
+ void FinishOp(void* tag, bool* status);
+};
+
+class CallOpSetInterface : public CompletionQueueTag {
+ public:
+ virtual void FillOps(grpc_op* ops, size_t* nops) = 0;
+};
+
+template <class T, int I>
+class WrapAndDerive : public T {};
+
+template <class Op1 = CallNoOp, class Op2 = CallNoOp, class Op3 = CallNoOp, class Op4 = CallNoOp, class Op5 = CallNoOp, class Op6 = CallNoOp>
+class CallOpSet : public CallOpSetInterface,
+public WrapAndDerive<Op1, 1>,
+public WrapAndDerive<Op2, 2>,
+public WrapAndDerive<Op3, 3>,
+public WrapAndDerive<Op4, 4>,
+public WrapAndDerive<Op5, 5>,
+public WrapAndDerive<Op6, 6> {
+ public:
+ void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE {
+ this->Op1::AddOp(ops, nops);
+ this->Op2::AddOp(ops, nops);
+ this->Op3::AddOp(ops, nops);
+ this->Op4::AddOp(ops, nops);
+ this->Op5::AddOp(ops, nops);
+ this->Op6::AddOp(ops, nops);
+ }
+
+ bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
+ this->Op1::FinishOp(*tag, status);
+ this->Op2::FinishOp(*tag, status);
+ this->Op3::FinishOp(*tag, status);
+ this->Op4::FinishOp(*tag, status);
+ this->Op5::FinishOp(*tag, status);
+ this->Op6::FinishOp(*tag, status);
+ *tag = return_tag_;
+ return true;
+ }
+
+ void SetOutputTag(void* return_tag) { return_tag_ = return_tag; }
+
+ private:
+ void *return_tag_;
+};
+
+#if 0
class CallOpBuffer : public CompletionQueueTag {
public:
CallOpBuffer();
@@ -122,12 +244,14 @@ class CallOpBuffer : public CompletionQueueTag {
int cancelled_buf_;
bool* recv_closed_;
};
+#endif
// SneakyCallOpBuffer does not post completions to the completion queue
-class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer {
+template <class Op1 = CallNoOp, class Op2 = CallNoOp, class Op3 = CallNoOp, class Op4 = CallNoOp, class Op5 = CallNoOp, class Op6 = CallNoOp>
+class SneakyCallOpSet GRPC_FINAL : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> {
public:
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
- return CallOpBuffer::FinalizeResult(tag, status) && false;
+ return CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6>::FinalizeResult(tag, status) && false;
}
};
@@ -135,7 +259,7 @@ class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer {
class CallHook {
public:
virtual ~CallHook() {}
- virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) = 0;
+ virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0;
};
// Straightforward wrapping of the C call object
@@ -146,7 +270,7 @@ class Call GRPC_FINAL {
Call(grpc_call* call, CallHook* call_hook_, CompletionQueue* cq,
int max_message_size);
- void PerformOps(CallOpBuffer* buffer);
+ void PerformOps(CallOpSetInterface* ops);
grpc_call* call() { return call_; }
CompletionQueue* cq() { return cq_; }
diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h
index 0e8aeed781..561c4721ef 100644
--- a/include/grpc++/impl/client_unary_call.h
+++ b/include/grpc++/impl/client_unary_call.h
@@ -36,6 +36,8 @@
#include <grpc++/config.h>
+#include <grpc++/impl/call.h>
+
namespace grpc {
class ChannelInterface;
@@ -45,10 +47,31 @@ class RpcMethod;
class Status;
// Wrapper that performs a blocking unary call
+template <class InputMessage, class OutputMessage>
Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context,
- const grpc::protobuf::Message& request,
- grpc::protobuf::Message* result);
+ const InputMessage& request,
+ OutputMessage* result) {
+ CompletionQueue cq;
+ Call call(channel->CreateCall(method, context, &cq));
+ CallOpSet<
+ CallOpSendInitialMetadata,
+ CallOpSendMessage,
+ CallOpRecvInitialMetadata,
+ CallOpRecvMessage<OutputMessage>,
+ CallOpClientSendClose,
+ CallOpClientRecvStatus> ops;
+ Status status;
+ ops.AddSendInitialMetadata(context);
+ ops.AddSendMessage(request);
+ ops.AddRecvInitialMetadata(context);
+ ops.AddRecvMessage(result);
+ ops.AddClientSendClose();
+ ops.AddClientRecvStatus(context, &status);
+ call.PerformOps(&ops);
+ GPR_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.IsOk());
+ return status;
+}
} // namespace grpc
diff --git a/src/cpp/client/client_unary_call.cc b/include/grpc++/impl/serialization_traits.h
index 7e7ea78bcd..d21ad92475 100644
--- a/src/cpp/client/client_unary_call.cc
+++ b/include/grpc++/impl/serialization_traits.h
@@ -31,34 +31,19 @@
*
*/
-#include <grpc++/impl/client_unary_call.h>
-#include <grpc++/impl/call.h>
-#include <grpc++/channel_interface.h>
-#include <grpc++/client_context.h>
-#include <grpc++/completion_queue.h>
-#include <grpc++/status.h>
-#include <grpc/support/log.h>
+#ifndef GRPCXX_IMPL_SERIALIZATION_TRAITS_H
+#define GRPCXX_IMPL_SERIALIZATION_TRAITS_H
+
+struct grpc_byte_buffer;
namespace grpc {
-// Wrapper that performs a blocking unary call
-Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context,
- const grpc::protobuf::Message& request,
- grpc::protobuf::Message* result) {
- CompletionQueue cq;
- Call call(channel->CreateCall(method, context, &cq));
- CallOpBuffer buf;
- Status status;
- buf.AddSendInitialMetadata(context);
- buf.AddSendMessage(request);
- buf.AddRecvInitialMetadata(context);
- buf.AddRecvMessage(result);
- buf.AddClientSendClose();
- buf.AddClientRecvStatus(context, &status);
- call.PerformOps(&buf);
- GPR_ASSERT((cq.Pluck(&buf) && buf.got_message) || !status.IsOk());
- return status;
-}
+template <class Message>
+class SerializationTraits;
+
+typedef bool (*SerializationTraitsReadFunction)(grpc_byte_buffer* src, void* dest);
+typedef bool (*SerializationTraitsWriteFunction)(const void* src, grpc_byte_buffer* dst);
} // namespace grpc
+
+#endif // GRPCXX_IMPL_SERIALIZATION_TRAITS_H
diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h
index bc39bb82ac..af21d9b8cf 100644
--- a/include/grpc++/impl/service_type.h
+++ b/include/grpc++/impl/service_type.h
@@ -35,6 +35,8 @@
#define GRPCXX_IMPL_SERVICE_TYPE_H
#include <grpc++/config.h>
+#include <grpc++/impl/serialization_traits.h>
+#include <grpc++/server.h>
namespace grpc {
@@ -65,20 +67,8 @@ class ServerAsyncStreamingInterface {
class AsynchronousService {
public:
- // this is Server, but in disguise to avoid a link dependency
- class DispatchImpl {
- public:
- virtual void RequestAsyncCall(void* registered_method,
- ServerContext* context,
- ::grpc::protobuf::Message* request,
- ServerAsyncStreamingInterface* stream,
- CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq,
- void* tag) = 0;
- };
-
AsynchronousService(const char** method_names, size_t method_count)
- : dispatch_impl_(nullptr),
+ : server_(nullptr),
method_names_(method_names),
method_count_(method_count),
request_args_(nullptr) {}
@@ -86,42 +76,44 @@ class AsynchronousService {
~AsynchronousService() { delete[] request_args_; }
protected:
+ template <class Message>
void RequestAsyncUnary(int index, ServerContext* context,
- grpc::protobuf::Message* request,
+ Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
- dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
- stream, call_cq, notification_cq, tag);
+ server_->RequestAsyncCall(request_args_[index], context,
+ stream, call_cq, notification_cq, tag, request);
}
void RequestClientStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
- dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
+ server_->RequestAsyncCall(request_args_[index], context,
stream, call_cq, notification_cq, tag);
}
+ template <class Message>
void RequestServerStreaming(int index, ServerContext* context,
- grpc::protobuf::Message* request,
+ Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
- dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
- stream, call_cq, notification_cq, tag);
+ server_->RequestAsyncCall(request_args_[index], context,
+ stream, call_cq, notification_cq, tag, request);
}
void RequestBidiStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
- dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
+ server_->RequestAsyncCall(request_args_[index], context,
stream, call_cq, notification_cq, tag);
}
private:
friend class Server;
- DispatchImpl* dispatch_impl_;
+ Server* server_;
const char** const method_names_;
size_t method_count_;
void** request_args_;
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 2cfeb359fc..e0599ee768 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -53,13 +53,13 @@ class GenericServerContext;
class AsyncGenericService;
class RpcService;
class RpcServiceMethod;
+class ServerAsyncStreamingInterface;
class ServerCredentials;
class ThreadPoolInterface;
// Currently it only supports handling rpcs in a single thread.
class Server GRPC_FINAL : public GrpcLibrary,
- private CallHook,
- private AsynchronousService::DispatchImpl {
+ private CallHook {
public:
~Server();
@@ -73,6 +73,7 @@ class Server GRPC_FINAL : public GrpcLibrary,
private:
friend class AsyncGenericService;
+ friend class AsynchronousService;
friend class ServerBuilder;
class SyncRequest;
@@ -96,15 +97,20 @@ class Server GRPC_FINAL : public GrpcLibrary,
void RunRpc();
void ScheduleCallback();
- void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE;
+ void PerformOpsOnCall(CallOpSetInterface *ops, Call* call) GRPC_OVERRIDE;
- // DispatchImpl
+ template <class Message>
void RequestAsyncCall(void* registered_method, ServerContext* context,
- grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
- void* tag) GRPC_OVERRIDE;
+ void* tag, Message *message);
+
+ void RequestAsyncCall(void* registered_method, ServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag);
void RequestAsyncGenericCall(GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index c836f98c2a..32ba03f8d8 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -93,15 +93,16 @@ template <class R>
class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
public:
// Blocking create a stream and write the first request out.
+ template <class W>
ClientReader(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context, const grpc::protobuf::Message& request)
+ ClientContext* context, const W& request)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
- CallOpBuffer buf;
- buf.AddSendInitialMetadata(&context->send_initial_metadata_);
- buf.AddSendMessage(request);
- buf.AddClientSendClose();
- call_.PerformOps(&buf);
- cq_.Pluck(&buf);
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_);
+ ops.SendMessage(request);
+ ops.ClientSendClose();
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
}
// Blocking wait for initial metadata from server. The received metadata
@@ -111,28 +112,28 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
- CallOpBuffer buf;
- buf.AddRecvInitialMetadata(context_);
- call_.PerformOps(&buf);
- cq_.Pluck(&buf); // status ignored
+ CallOpSet<CallOpRecvInitialMetadata> ops;
+ ops.RecvInitialMetadata(context_);
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops); // status ignored
}
bool Read(R* msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
if (!context_->initial_metadata_received_) {
- buf.AddRecvInitialMetadata(context_);
+ ops.RecvInitialMetadata(context_);
}
- buf.AddRecvMessage(msg);
- call_.PerformOps(&buf);
- return cq_.Pluck(&buf) && buf.got_message;
+ ops.RecvMessage(msg);
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops) && ops.got_message;
}
Status Finish() GRPC_OVERRIDE {
- CallOpBuffer buf;
+ CallOpSet<CallOpClientRecvStatus> ops;
Status status;
- buf.AddClientRecvStatus(context_, &status);
- call_.PerformOps(&buf);
- GPR_ASSERT(cq_.Pluck(&buf));
+ ops.ClientRecvStatus(context_, &status);
+ call_.PerformOps(&ops);
+ GPR_ASSERT(cq_.Pluck(&ops));
return status;
}
@@ -150,48 +151,48 @@ class ClientWriterInterface : public ClientStreamingInterface,
};
template <class W>
-class ClientWriter GRPC_FINAL : public ClientWriterInterface<W> {
+class ClientWriter : public ClientWriterInterface<W> {
public:
// Blocking create a stream.
+ template <class R>
ClientWriter(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context, grpc::protobuf::Message* response)
+ ClientContext* context, R* response)
: context_(context),
- response_(response),
call_(channel->CreateCall(method, context, &cq_)) {
- CallOpBuffer buf;
- buf.AddSendInitialMetadata(&context->send_initial_metadata_);
- call_.PerformOps(&buf);
- cq_.Pluck(&buf);
+ finish_ops_.RecvMessage(response);
+
+ CallOpSet<CallOpRecvMessage<R>> ops;
+ ops.AddSendInitialMetadata(&context->send_initial_metadata_);
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
}
bool Write(const W& msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
- buf.AddSendMessage(msg);
- call_.PerformOps(&buf);
- return cq_.Pluck(&buf);
+ CallOpSet<CallOpSendMessage> ops;
+ ops.SendMessage(msg);
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops);
}
bool WritesDone() GRPC_OVERRIDE {
- CallOpBuffer buf;
- buf.AddClientSendClose();
- call_.PerformOps(&buf);
- return cq_.Pluck(&buf);
+ CallOpSet<CallOpClientSendClose> ops;
+ ops.ClientSendClose();
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops);
}
// Read the final response and wait for the final status.
Status Finish() GRPC_OVERRIDE {
- CallOpBuffer buf;
Status status;
- buf.AddRecvMessage(response_);
- buf.AddClientRecvStatus(context_, &status);
- call_.PerformOps(&buf);
- GPR_ASSERT(cq_.Pluck(&buf));
+ finish_ops_.ClientRecvStatus(context_, &status);
+ call_.PerformOps(&finish_ops_);
+ GPR_ASSERT(cq_.Pluck(&finish_ops_));
return status;
}
private:
ClientContext* context_;
- grpc::protobuf::Message* const response_;
+ CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
CompletionQueue cq_;
Call call_;
};
@@ -213,10 +214,10 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
- CallOpBuffer buf;
- buf.AddSendInitialMetadata(&context->send_initial_metadata_);
- call_.PerformOps(&buf);
- cq_.Pluck(&buf);
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_);
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
}
// Blocking wait for initial metadata from server. The received metadata
@@ -226,42 +227,42 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
- CallOpBuffer buf;
- buf.AddRecvInitialMetadata(context_);
- call_.PerformOps(&buf);
- cq_.Pluck(&buf); // status ignored
+ CallOpSet<CallOpRecvInitialMetadata> ops;
+ ops.RecvInitialMetadata(context_);
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops); // status ignored
}
bool Read(R* msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
if (!context_->initial_metadata_received_) {
- buf.AddRecvInitialMetadata(context_);
+ ops.RecvInitialMetadata(context_);
}
- buf.AddRecvMessage(msg);
- call_.PerformOps(&buf);
- return cq_.Pluck(&buf) && buf.got_message;
+ ops.RecvMessage(msg);
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops) && ops.got_message;
}
bool Write(const W& msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
- buf.AddSendMessage(msg);
- call_.PerformOps(&buf);
- return cq_.Pluck(&buf);
+ CallOpSet<CallOpSendMessage> ops;
+ ops.SendMessage(msg);
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops);
}
bool WritesDone() GRPC_OVERRIDE {
- CallOpBuffer buf;
- buf.AddClientSendClose();
- call_.PerformOps(&buf);
- return cq_.Pluck(&buf);
+ CallOpSet<CallOpClientSendClose> ops;
+ ops.ClientSendClose();
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops);
}
Status Finish() GRPC_OVERRIDE {
- CallOpBuffer buf;
+ CallOpSet<CallOpClientRecvStatus> ops;
Status status;
- buf.AddClientRecvStatus(context_, &status);
- call_.PerformOps(&buf);
- GPR_ASSERT(cq_.Pluck(&buf));
+ ops.ClientRecvStatus(context_, &status);
+ call_.PerformOps(&ops);
+ GPR_ASSERT(cq_.Pluck(&ops));
return status;
}
@@ -279,18 +280,18 @@ class ServerReader GRPC_FINAL : public ReaderInterface<R> {
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- CallOpBuffer buf;
- buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_->PerformOps(&buf);
- call_->cq()->Pluck(&buf);
+ call_->PerformOps(&ops);
+ call_->cq()->Pluck(&ops);
}
bool Read(R* msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
- buf.AddRecvMessage(msg);
- call_->PerformOps(&buf);
- return call_->cq()->Pluck(&buf) && buf.got_message;
+ CallOpSet<CallOpRecvMessage<R>> ops;
+ ops.RecvMessage(msg);
+ call_->PerformOps(&ops);
+ return call_->cq()->Pluck(&ops) && ops.got_message;
}
private:
@@ -306,22 +307,22 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> {
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- CallOpBuffer buf;
- buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_->PerformOps(&buf);
- call_->cq()->Pluck(&buf);
+ call_->PerformOps(&ops);
+ call_->cq()->Pluck(&ops);
}
bool Write(const W& msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ctx_->sent_initial_metadata_) {
- buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- buf.AddSendMessage(msg);
- call_->PerformOps(&buf);
- return call_->cq()->Pluck(&buf);
+ ops.SendMessage(msg);
+ call_->PerformOps(&ops);
+ return call_->cq()->Pluck(&ops);
}
private:
@@ -339,29 +340,29 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- CallOpBuffer buf;
- buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_->PerformOps(&buf);
- call_->cq()->Pluck(&buf);
+ call_->PerformOps(&ops);
+ call_->cq()->Pluck(&ops);
}
bool Read(R* msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
- buf.AddRecvMessage(msg);
- call_->PerformOps(&buf);
- return call_->cq()->Pluck(&buf) && buf.got_message;
+ CallOpSet<CallOpRecvMessage<R>> ops;
+ ops.RecvMessage(msg);
+ call_->PerformOps(&ops);
+ return call_->cq()->Pluck(&ops) && ops.got_message;
}
bool Write(const W& msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ctx_->sent_initial_metadata_) {
- buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- buf.AddSendMessage(msg);
- call_->PerformOps(&buf);
- return call_->cq()->Pluck(&buf);
+ ops.SendMessage(msg);
+ call_->PerformOps(&ops);
+ return call_->cq()->Pluck(&ops);
}
private:
@@ -407,50 +408,51 @@ template <class R>
class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
public:
// Create a stream and write the first request out.
+ template <class W>
ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
- const grpc::protobuf::Message& request, void* tag)
+ const W& request, void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_buf_.Reset(tag);
- init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
- init_buf_.AddSendMessage(request);
- init_buf_.AddClientSendClose();
- call_.PerformOps(&init_buf_);
+ init_ops_.SetOutputTag(tag);
+ init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+ init_ops_.SendMessage(request);
+ init_ops_.ClientSendClose();
+ call_.PerformOps(&init_ops_);
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_buf_.Reset(tag);
- meta_buf_.AddRecvInitialMetadata(context_);
- call_.PerformOps(&meta_buf_);
+ meta_ops_.SetOutputTag(tag);
+ meta_ops_.RecvInitialMetadata(context_);
+ call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
- read_buf_.Reset(tag);
+ read_ops_.SetOutputTag(tag);
if (!context_->initial_metadata_received_) {
- read_buf_.AddRecvInitialMetadata(context_);
+ read_ops_.RecvInitialMetadata(context_);
}
- read_buf_.AddRecvMessage(msg);
- call_.PerformOps(&read_buf_);
+ read_ops_.RecvMessage(msg);
+ call_.PerformOps(&read_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
- finish_buf_.Reset(tag);
+ finish_ops_.SetOutputTag(tag);
if (!context_->initial_metadata_received_) {
- finish_buf_.AddRecvInitialMetadata(context_);
+ finish_ops_.RecvInitialMetadata(context_);
}
- finish_buf_.AddClientRecvStatus(context_, status);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.ClientRecvStatus(context_, status);
+ call_.PerformOps(&finish_ops_);
}
private:
ClientContext* context_;
Call call_;
- CallOpBuffer init_buf_;
- CallOpBuffer meta_buf_;
- CallOpBuffer read_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> init_ops_;
+ CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
};
template <class W>
@@ -463,56 +465,56 @@ class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface,
template <class W>
class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
public:
+ template <class R>
ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
- grpc::protobuf::Message* response, void* tag)
+ R* response, void* tag)
: context_(context),
- response_(response),
call_(channel->CreateCall(method, context, cq)) {
- init_buf_.Reset(tag);
- init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
- call_.PerformOps(&init_buf_);
+ finish_ops_.RecvMessage(response);
+
+ init_ops_.SetOutputTag(tag);
+ init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+ call_.PerformOps(&init_ops_);
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_buf_.Reset(tag);
- meta_buf_.AddRecvInitialMetadata(context_);
- call_.PerformOps(&meta_buf_);
+ meta_ops_.SetOutputTag(tag);
+ meta_ops_.RecvInitialMetadata(context_);
+ call_.PerformOps(&meta_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
- write_buf_.Reset(tag);
- write_buf_.AddSendMessage(msg);
- call_.PerformOps(&write_buf_);
+ write_ops_.SetOutputTag(tag);
+ write_ops_.SendMessage(msg);
+ call_.PerformOps(&write_ops_);
}
void WritesDone(void* tag) GRPC_OVERRIDE {
- writes_done_buf_.Reset(tag);
- writes_done_buf_.AddClientSendClose();
- call_.PerformOps(&writes_done_buf_);
+ writes_done_ops_.SetOutputTag(tag);
+ writes_done_ops_.ClientSendClose();
+ call_.PerformOps(&writes_done_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
- finish_buf_.Reset(tag);
+ finish_ops_.SetOutputTag(tag);
if (!context_->initial_metadata_received_) {
- finish_buf_.AddRecvInitialMetadata(context_);
+ finish_ops_.RecvInitialMetadata(context_);
}
- finish_buf_.AddRecvMessage(response_);
- finish_buf_.AddClientRecvStatus(context_, status);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.ClientRecvStatus(context_, status);
+ call_.PerformOps(&finish_ops_);
}
private:
ClientContext* context_;
- grpc::protobuf::Message* const response_;
Call call_;
- CallOpBuffer init_buf_;
- CallOpBuffer meta_buf_;
- CallOpBuffer write_buf_;
- CallOpBuffer writes_done_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata> init_ops_;
+ CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+ CallOpSet<CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpClientSendClose> writes_done_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
};
// Client-side interface for bi-directional streaming.
@@ -532,58 +534,58 @@ class ClientAsyncReaderWriter GRPC_FINAL
const RpcMethod& method, ClientContext* context,
void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_buf_.Reset(tag);
- init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
- call_.PerformOps(&init_buf_);
+ init_ops_.SetOutputTag(tag);
+ init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+ call_.PerformOps(&init_ops_);
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_buf_.Reset(tag);
- meta_buf_.AddRecvInitialMetadata(context_);
- call_.PerformOps(&meta_buf_);
+ meta_ops_.SetOutputTag(tag);
+ meta_ops_.RecvInitialMetadata(context_);
+ call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
- read_buf_.Reset(tag);
+ read_ops_.SetOutputTag(tag);
if (!context_->initial_metadata_received_) {
- read_buf_.AddRecvInitialMetadata(context_);
+ read_ops_.RecvInitialMetadata(context_);
}
- read_buf_.AddRecvMessage(msg);
- call_.PerformOps(&read_buf_);
+ read_ops_.AddRecvMessage(msg);
+ call_.PerformOps(&read_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
- write_buf_.Reset(tag);
- write_buf_.AddSendMessage(msg);
- call_.PerformOps(&write_buf_);
+ write_ops_.SetOutputTag(tag);
+ write_ops_.SendMessage(msg);
+ call_.PerformOps(&write_ops_);
}
void WritesDone(void* tag) GRPC_OVERRIDE {
- writes_done_buf_.Reset(tag);
- writes_done_buf_.AddClientSendClose();
- call_.PerformOps(&writes_done_buf_);
+ writes_done_ops_.SetOutputTag(tag);
+ writes_done_ops_.ClientSendClose();
+ call_.PerformOps(&writes_done_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
- finish_buf_.Reset(tag);
+ finish_ops_.SetOutputTag(tag);
if (!context_->initial_metadata_received_) {
- finish_buf_.AddRecvInitialMetadata(context_);
+ finish_ops_.RecvInitialMetadata(context_);
}
- finish_buf_.AddClientRecvStatus(context_, status);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.ClientRecvStatus(context_, status);
+ call_.PerformOps(&finish_ops_);
}
private:
ClientContext* context_;
Call call_;
- CallOpBuffer init_buf_;
- CallOpBuffer meta_buf_;
- CallOpBuffer read_buf_;
- CallOpBuffer write_buf_;
- CallOpBuffer writes_done_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata> init_ops_;
+ CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
+ CallOpSet<CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpClientSendClose> writes_done_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
};
template <class W, class R>
@@ -596,41 +598,41 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- meta_buf_.Reset(tag);
- meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ meta_ops_.SetOutputTag(tag);
+ meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_.PerformOps(&meta_buf_);
+ call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
- read_buf_.Reset(tag);
- read_buf_.AddRecvMessage(msg);
- call_.PerformOps(&read_buf_);
+ read_ops_.SetOutputTag(tag);
+ read_ops_.RecvMessage(msg);
+ call_.PerformOps(&read_ops_);
}
void Finish(const W& msg, const Status& status, void* tag) {
- finish_buf_.Reset(tag);
+ finish_ops_.SetOutputTag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (status.IsOk()) {
- finish_buf_.AddSendMessage(msg);
+ finish_ops_.SendMessage(msg);
}
- finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_ops_);
}
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.IsOk());
- finish_buf_.Reset(tag);
+ finish_ops_.SetOutputTag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_ops_);
}
private:
@@ -638,9 +640,9 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_;
ServerContext* ctx_;
- CallOpBuffer meta_buf_;
- CallOpBuffer read_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+ CallOpSet<CallOpRecvMessage<R>> read_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpServerSendStatus> finish_ops_;
};
template <class W>
@@ -653,30 +655,30 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- meta_buf_.Reset(tag);
- meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ meta_ops_.SetOutputTag(tag);
+ meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_.PerformOps(&meta_buf_);
+ call_.PerformOps(&meta_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
- write_buf_.Reset(tag);
+ write_ops_.SetOutputTag(tag);
if (!ctx_->sent_initial_metadata_) {
- write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- write_buf_.AddSendMessage(msg);
- call_.PerformOps(&write_buf_);
+ write_ops_.SendMessage(msg);
+ call_.PerformOps(&write_ops_);
}
void Finish(const Status& status, void* tag) {
- finish_buf_.Reset(tag);
+ finish_ops_.SetOutputTag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_ops_);
}
private:
@@ -684,9 +686,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_;
ServerContext* ctx_;
- CallOpBuffer meta_buf_;
- CallOpBuffer write_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
};
// Server-side interface for bi-directional streaming.
@@ -701,36 +703,36 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- meta_buf_.Reset(tag);
- meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ meta_ops_.SetOutputTag(tag);
+ meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_.PerformOps(&meta_buf_);
+ call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
- read_buf_.Reset(tag);
- read_buf_.AddRecvMessage(msg);
- call_.PerformOps(&read_buf_);
+ read_ops_.SetOutputTag(tag);
+ read_ops_.AddRecvMessage(msg);
+ call_.PerformOps(&read_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
- write_buf_.Reset(tag);
+ write_ops_.SetOutputTag(tag);
if (!ctx_->sent_initial_metadata_) {
- write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- write_buf_.AddSendMessage(msg);
- call_.PerformOps(&write_buf_);
+ write_ops_.SendMessage(msg);
+ call_.PerformOps(&write_ops_);
}
void Finish(const Status& status, void* tag) {
- finish_buf_.Reset(tag);
+ finish_ops_.SetOutputTag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_ops_);
}
private:
@@ -738,10 +740,10 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_;
ServerContext* ctx_;
- CallOpBuffer meta_buf_;
- CallOpBuffer read_buf_;
- CallOpBuffer write_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+ CallOpSet<CallOpRecvMessage<R>> read_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
};
} // namespace grpc
diff --git a/src/compiler/config.h b/src/compiler/config.h
index e81de8d6c8..06ccd8530c 100644
--- a/src/compiler/config.h
+++ b/src/compiler/config.h
@@ -35,6 +35,7 @@
#define SRC_COMPILER_CONFIG_H
#include <grpc++/config.h>
+#include <grpc++/config_protobuf.h>
#ifndef GRPC_CUSTOM_DESCRIPTOR
#include <google/protobuf/descriptor.h>