aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--BUILD4
-rw-r--r--Makefile4
-rw-r--r--build.json2
-rw-r--r--include/grpc++/async_unary_call.h56
-rw-r--r--include/grpc++/byte_buffer.h23
-rw-r--r--include/grpc++/client_context.h10
-rw-r--r--include/grpc++/completion_queue.h30
-rw-r--r--include/grpc++/config.h43
-rw-r--r--include/grpc++/config_protobuf.h (renamed from src/cpp/client/client_unary_call.cc)60
-rw-r--r--include/grpc++/impl/call.h427
-rw-r--r--include/grpc++/impl/client_unary_call.h26
-rw-r--r--include/grpc++/impl/proto_utils.h (renamed from src/cpp/proto/proto_utils.h)33
-rw-r--r--include/grpc++/impl/rpc_service_method.h111
-rw-r--r--include/grpc++/impl/serialization_traits.h68
-rw-r--r--include/grpc++/impl/service_type.h41
-rw-r--r--include/grpc++/server.h126
-rw-r--r--include/grpc++/server_context.h16
-rw-r--r--include/grpc++/stream.h466
-rw-r--r--include/grpc/support/port_platform.h26
-rw-r--r--src/compiler/config.h7
-rw-r--r--src/compiler/cpp_generator.cc194
-rw-r--r--src/cpp/client/channel.cc11
-rw-r--r--src/cpp/client/channel.h10
-rw-r--r--src/cpp/common/call.cc307
-rw-r--r--src/cpp/proto/proto_utils.cc26
-rw-r--r--src/cpp/server/server.cc240
-rw-r--r--src/cpp/server/server_context.cc22
-rw-r--r--test/cpp/end2end/generic_end2end_test.cc2
-rw-r--r--tools/doxygen/Doxyfile.c++2
-rw-r--r--tools/doxygen/Doxyfile.c++.internal2
-rw-r--r--vsprojects/grpc++/grpc++.vcxproj3
-rw-r--r--vsprojects/grpc++/grpc++.vcxproj.filters6
-rw-r--r--vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj3
-rw-r--r--vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters6
34 files changed, 1332 insertions, 1081 deletions
diff --git a/BUILD b/BUILD
index a09cf6c8de..47fcfae1db 100644
--- a/BUILD
+++ b/BUILD
@@ -584,7 +584,6 @@ cc_library(
"src/cpp/client/channel.cc",
"src/cpp/client/channel_arguments.cc",
"src/cpp/client/client_context.cc",
- "src/cpp/client/client_unary_call.cc",
"src/cpp/client/create_channel.cc",
"src/cpp/client/credentials.cc",
"src/cpp/client/generic_stub.cc",
@@ -625,6 +624,7 @@ cc_library(
"include/grpc++/impl/internal_stub.h",
"include/grpc++/impl/rpc_method.h",
"include/grpc++/impl/rpc_service_method.h",
+ "include/grpc++/impl/serialization_traits.h",
"include/grpc++/impl/service_type.h",
"include/grpc++/impl/sync.h",
"include/grpc++/impl/sync_cxx11.h",
@@ -664,7 +664,6 @@ cc_library(
"src/cpp/client/channel.cc",
"src/cpp/client/channel_arguments.cc",
"src/cpp/client/client_context.cc",
- "src/cpp/client/client_unary_call.cc",
"src/cpp/client/create_channel.cc",
"src/cpp/client/credentials.cc",
"src/cpp/client/generic_stub.cc",
@@ -705,6 +704,7 @@ cc_library(
"include/grpc++/impl/internal_stub.h",
"include/grpc++/impl/rpc_method.h",
"include/grpc++/impl/rpc_service_method.h",
+ "include/grpc++/impl/serialization_traits.h",
"include/grpc++/impl/service_type.h",
"include/grpc++/impl/sync.h",
"include/grpc++/impl/sync_cxx11.h",
diff --git a/Makefile b/Makefile
index df3be356e9..a8b3ffa420 100644
--- a/Makefile
+++ b/Makefile
@@ -3402,7 +3402,6 @@ LIBGRPC++_SRC = \
src/cpp/client/channel.cc \
src/cpp/client/channel_arguments.cc \
src/cpp/client/client_context.cc \
- src/cpp/client/client_unary_call.cc \
src/cpp/client/create_channel.cc \
src/cpp/client/credentials.cc \
src/cpp/client/generic_stub.cc \
@@ -3443,6 +3442,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/impl/internal_stub.h \
include/grpc++/impl/rpc_method.h \
include/grpc++/impl/rpc_service_method.h \
+ include/grpc++/impl/serialization_traits.h \
include/grpc++/impl/service_type.h \
include/grpc++/impl/sync.h \
include/grpc++/impl/sync_cxx11.h \
@@ -3690,7 +3690,6 @@ LIBGRPC++_UNSECURE_SRC = \
src/cpp/client/channel.cc \
src/cpp/client/channel_arguments.cc \
src/cpp/client/client_context.cc \
- src/cpp/client/client_unary_call.cc \
src/cpp/client/create_channel.cc \
src/cpp/client/credentials.cc \
src/cpp/client/generic_stub.cc \
@@ -3731,6 +3730,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/impl/internal_stub.h \
include/grpc++/impl/rpc_method.h \
include/grpc++/impl/rpc_service_method.h \
+ include/grpc++/impl/serialization_traits.h \
include/grpc++/impl/service_type.h \
include/grpc++/impl/sync.h \
include/grpc++/impl/sync_cxx11.h \
diff --git a/build.json b/build.json
index b670ebfba5..f012cf827f 100644
--- a/build.json
+++ b/build.json
@@ -45,6 +45,7 @@
"include/grpc++/impl/internal_stub.h",
"include/grpc++/impl/rpc_method.h",
"include/grpc++/impl/rpc_service_method.h",
+ "include/grpc++/impl/serialization_traits.h",
"include/grpc++/impl/service_type.h",
"include/grpc++/impl/sync.h",
"include/grpc++/impl/sync_cxx11.h",
@@ -72,7 +73,6 @@
"src/cpp/client/channel.cc",
"src/cpp/client/channel_arguments.cc",
"src/cpp/client/client_context.cc",
- "src/cpp/client/client_unary_call.cc",
"src/cpp/client/create_channel.cc",
"src/cpp/client/credentials.cc",
"src/cpp/client/generic_stub.cc",
diff --git a/include/grpc++/async_unary_call.h b/include/grpc++/async_unary_call.h
index abb6308782..d631ccd134 100644
--- a/include/grpc++/async_unary_call.h
+++ b/include/grpc++/async_unary_call.h
@@ -51,47 +51,50 @@ class ClientAsyncResponseReaderInterface {
virtual ~ClientAsyncResponseReaderInterface() {}
virtual void ReadInitialMetadata(void* tag) = 0;
virtual void Finish(R* msg, Status* status, void* tag) = 0;
-
};
template <class R>
class ClientAsyncResponseReader GRPC_FINAL
: public ClientAsyncResponseReaderInterface<R> {
public:
+ template <class W>
ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
- const grpc::protobuf::Message& request)
+ const W& request)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
- init_buf_.AddSendMessage(request);
- init_buf_.AddClientSendClose();
+ init_buf_.SendInitialMetadata(context->send_initial_metadata_);
+ // TODO(ctiller): don't assert
+ GPR_ASSERT(init_buf_.SendMessage(request).ok());
+ init_buf_.ClientSendClose();
call_.PerformOps(&init_buf_);
}
void ReadInitialMetadata(void* tag) {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_buf_.Reset(tag);
- meta_buf_.AddRecvInitialMetadata(context_);
+ meta_buf_.set_output_tag(tag);
+ meta_buf_.RecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
}
void Finish(R* msg, Status* status, void* tag) {
- finish_buf_.Reset(tag);
+ finish_buf_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- finish_buf_.AddRecvInitialMetadata(context_);
+ finish_buf_.RecvInitialMetadata(context_);
}
- finish_buf_.AddRecvMessage(msg);
- finish_buf_.AddClientRecvStatus(context_, status);
+ finish_buf_.RecvMessage(msg);
+ finish_buf_.ClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
}
private:
ClientContext* context_;
Call call_;
- SneakyCallOpBuffer init_buf_;
- CallOpBuffer meta_buf_;
- CallOpBuffer finish_buf_;
+ SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpClientSendClose> init_buf_;
+ CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>,
+ CallOpClientRecvStatus> finish_buf_;
};
template <class W>
@@ -104,34 +107,36 @@ class ServerAsyncResponseWriter GRPC_FINAL
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- meta_buf_.Reset(tag);
- meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ meta_buf_.set_output_tag(tag);
+ meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
}
void Finish(const W& msg, const Status& status, void* tag) {
- finish_buf_.Reset(tag);
+ finish_buf_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (status.ok()) {
- finish_buf_.AddSendMessage(msg);
+ finish_buf_.ServerSendStatus(
+ ctx_->trailing_metadata_, finish_buf_.SendMessage(msg));
+ } else {
+ finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
}
- finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.ok());
- finish_buf_.Reset(tag);
+ finish_buf_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+ finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
@@ -140,8 +145,9 @@ class ServerAsyncResponseWriter GRPC_FINAL
Call call_;
ServerContext* ctx_;
- CallOpBuffer meta_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata> meta_buf_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus> finish_buf_;
};
} // namespace grpc
diff --git a/include/grpc++/byte_buffer.h b/include/grpc++/byte_buffer.h
index 2c0f2e6944..cb3c6a1159 100644
--- a/include/grpc++/byte_buffer.h
+++ b/include/grpc++/byte_buffer.h
@@ -39,6 +39,8 @@
#include <grpc/support/log.h>
#include <grpc++/config.h>
#include <grpc++/slice.h>
+#include <grpc++/status.h>
+#include <grpc++/impl/serialization_traits.h>
#include <vector>
@@ -62,7 +64,10 @@ class ByteBuffer GRPC_FINAL {
size_t Length() const;
private:
- friend class CallOpBuffer;
+ friend class SerializationTraits<ByteBuffer, void>;
+
+ ByteBuffer(const ByteBuffer&);
+ ByteBuffer& operator=(const ByteBuffer&);
// takes ownership
void set_buffer(grpc_byte_buffer* buf) {
@@ -78,6 +83,22 @@ class ByteBuffer GRPC_FINAL {
grpc_byte_buffer* buffer_;
};
+template <>
+class SerializationTraits<ByteBuffer, void> {
+ public:
+ static Status Deserialize(grpc_byte_buffer* byte_buffer, ByteBuffer* dest,
+ int max_message_size) {
+ dest->set_buffer(byte_buffer);
+ return Status::OK;
+ }
+ static Status Serialize(const ByteBuffer& source, grpc_byte_buffer** buffer,
+ bool* own_buffer) {
+ *buffer = source.buffer();
+ *own_buffer = false;
+ return Status::OK;
+ }
+};
+
} // namespace grpc
#endif // GRPCXX_BYTE_BUFFER_H
diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h
index ecf4cc7f7b..5e10875260 100644
--- a/include/grpc++/client_context.h
+++ b/include/grpc++/client_context.h
@@ -49,7 +49,6 @@ struct grpc_completion_queue;
namespace grpc {
-class CallOpBuffer;
class ChannelInterface;
class CompletionQueue;
class Credentials;
@@ -115,7 +114,8 @@ class ClientContext {
ClientContext(const ClientContext&);
ClientContext& operator=(const ClientContext&);
- friend class CallOpBuffer;
+ friend class CallOpClientRecvStatus;
+ friend class CallOpRecvInitialMetadata;
friend class Channel;
template <class R>
friend class ::grpc::ClientReader;
@@ -131,6 +131,12 @@ class ClientContext {
friend class ::grpc::ClientAsyncReaderWriter;
template <class R>
friend class ::grpc::ClientAsyncResponseReader;
+ template <class InputMessage, class OutputMessage>
+ friend Status BlockingUnaryCall(ChannelInterface* channel,
+ const RpcMethod& method,
+ ClientContext* context,
+ const InputMessage& request,
+ OutputMessage* result);
grpc_call* call() { return call_; }
void set_call(grpc_call* call,
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h
index e8429c8f41..a3e7a9c9f4 100644
--- a/include/grpc++/completion_queue.h
+++ b/include/grpc++/completion_queue.h
@@ -35,7 +35,6 @@
#define GRPCXX_COMPLETION_QUEUE_H
#include <grpc/support/time.h>
-#include <grpc++/impl/client_unary_call.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/time.h>
@@ -55,11 +54,23 @@ template <class W>
class ServerWriter;
template <class R, class W>
class ServerReaderWriter;
-
+template <class ServiceType, class RequestType, class ResponseType>
+class RpcMethodHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class ClientStreamingHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class ServerStreamingHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class BidiStreamingHandler;
+
+class ChannelInterface;
+class ClientContext;
class CompletionQueue;
+class RpcMethod;
class Server;
class ServerBuilder;
class ServerContext;
+class Status;
class CompletionQueueTag {
public:
@@ -84,7 +95,7 @@ class CompletionQueue : public GrpcLibrary {
// Nonblocking (until deadline) read from queue.
// Cannot rely on result of tag or ok if return is TIMEOUT
- template<typename T>
+ template <typename T>
NextStatus AsyncNext(void** tag, bool* ok, const T& deadline) {
TimePoint<T> deadline_tp(deadline);
return AsyncNextInternal(tag, ok, deadline_tp.raw_time());
@@ -118,13 +129,22 @@ class CompletionQueue : public GrpcLibrary {
friend class ::grpc::ServerWriter;
template <class R, class W>
friend class ::grpc::ServerReaderWriter;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class RpcMethodHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class ClientStreamingHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class ServerStreamingHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class BidiStreamingHandler;
friend class ::grpc::Server;
friend class ::grpc::ServerContext;
+ template <class InputMessage, class OutputMessage>
friend Status BlockingUnaryCall(ChannelInterface* channel,
const RpcMethod& method,
ClientContext* context,
- const grpc::protobuf::Message& request,
- grpc::protobuf::Message* result);
+ const InputMessage& request,
+ OutputMessage* result);
NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline);
diff --git a/include/grpc++/config.h b/include/grpc++/config.h
index ca74064be2..1362c0a1fa 100644
--- a/include/grpc++/config.h
+++ b/include/grpc++/config.h
@@ -77,31 +77,6 @@
#define GRPC_OVERRIDE override
#endif
-#ifndef GRPC_CUSTOM_PROTOBUF_INT64
-#include <google/protobuf/stubs/common.h>
-#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64
-#endif
-
-#ifndef GRPC_CUSTOM_MESSAGE
-#include <google/protobuf/message.h>
-#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message
-#endif
-
-#ifndef GRPC_CUSTOM_STRING
-#include <string>
-#define GRPC_CUSTOM_STRING std::string
-#endif
-
-#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM
-#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream.h>
-#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \
- ::google::protobuf::io::ZeroCopyOutputStream
-#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \
- ::google::protobuf::io::ZeroCopyInputStream
-#define GRPC_CUSTOM_CODEDINPUTSTREAM ::google::protobuf::io::CodedInputStream
-#endif
-
#ifdef GRPC_CXX0X_NO_NULLPTR
#include <memory>
const class {
@@ -125,23 +100,15 @@ const class {
} nullptr = {};
#endif
+#ifndef GRPC_CUSTOM_STRING
+#include <string>
+#define GRPC_CUSTOM_STRING std::string
+#endif
+
namespace grpc {
typedef GRPC_CUSTOM_STRING string;
-namespace protobuf {
-
-typedef GRPC_CUSTOM_MESSAGE Message;
-typedef GRPC_CUSTOM_PROTOBUF_INT64 int64;
-
-namespace io {
-typedef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ZeroCopyOutputStream;
-typedef GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ZeroCopyInputStream;
-typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream;
-} // namespace io
-
-} // namespace protobuf
-
} // namespace grpc
#endif // GRPCXX_CONFIG_H
diff --git a/src/cpp/client/client_unary_call.cc b/include/grpc++/config_protobuf.h
index 55e589306f..3afc7a58e2 100644
--- a/src/cpp/client/client_unary_call.cc
+++ b/include/grpc++/config_protobuf.h
@@ -31,34 +31,42 @@
*
*/
-#include <grpc++/impl/client_unary_call.h>
-#include <grpc++/impl/call.h>
-#include <grpc++/channel_interface.h>
-#include <grpc++/client_context.h>
-#include <grpc++/completion_queue.h>
-#include <grpc++/status.h>
-#include <grpc/support/log.h>
+#ifndef GRPCXX_CONFIG_PROTOBUF_H
+#define GRPCXX_CONFIG_PROTOBUF_H
+
+#ifndef GRPC_CUSTOM_PROTOBUF_INT64
+#include <google/protobuf/stubs/common.h>
+#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64
+#endif
+
+#ifndef GRPC_CUSTOM_MESSAGE
+#include <google/protobuf/message.h>
+#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message
+#endif
+
+#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream.h>
+#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \
+ ::google::protobuf::io::ZeroCopyOutputStream
+#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \
+ ::google::protobuf::io::ZeroCopyInputStream
+#define GRPC_CUSTOM_CODEDINPUTSTREAM ::google::protobuf::io::CodedInputStream
+#endif
namespace grpc {
+namespace protobuf {
-// Wrapper that performs a blocking unary call
-Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context,
- const grpc::protobuf::Message& request,
- grpc::protobuf::Message* result) {
- CompletionQueue cq;
- Call call(channel->CreateCall(method, context, &cq));
- CallOpBuffer buf;
- Status status;
- buf.AddSendInitialMetadata(context);
- buf.AddSendMessage(request);
- buf.AddRecvInitialMetadata(context);
- buf.AddRecvMessage(result);
- buf.AddClientSendClose();
- buf.AddClientRecvStatus(context, &status);
- call.PerformOps(&buf);
- GPR_ASSERT((cq.Pluck(&buf) && buf.got_message) || !status.ok());
- return status;
-}
+typedef GRPC_CUSTOM_MESSAGE Message;
+typedef GRPC_CUSTOM_PROTOBUF_INT64 int64;
+namespace io {
+typedef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ZeroCopyOutputStream;
+typedef GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ZeroCopyInputStream;
+typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream;
+} // namespace io
+
+} // namespace protobuf
} // namespace grpc
+
+#endif // GRPCXX_CONFIG_PROTOBUF_H
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h
index aae199db1b..33e66816f5 100644
--- a/include/grpc++/impl/call.h
+++ b/include/grpc++/impl/call.h
@@ -34,14 +34,18 @@
#ifndef GRPCXX_IMPL_CALL_H
#define GRPCXX_IMPL_CALL_H
-#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
#include <grpc++/config.h>
#include <grpc++/status.h>
+#include <grpc++/impl/serialization_traits.h>
#include <memory>
#include <map>
+#include <string.h>
+
struct grpc_call;
struct grpc_op;
@@ -50,84 +54,383 @@ namespace grpc {
class ByteBuffer;
class Call;
-class CallOpBuffer : public CompletionQueueTag {
+void FillMetadataMap(grpc_metadata_array* arr,
+ std::multimap<grpc::string, grpc::string>* metadata);
+grpc_metadata* FillMetadataArray(
+ const std::multimap<grpc::string, grpc::string>& metadata);
+
+/// Default argument for CallOpSet. I is unused by the class, but can be
+/// used for generating multiple names for the same thing.
+template <int I>
+class CallNoOp {
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops) {}
+ void FinishOp(bool* status, int max_message_size) {}
+};
+
+class CallOpSendInitialMetadata {
public:
- CallOpBuffer();
- ~CallOpBuffer();
-
- void Reset(void* next_return_tag);
-
- // Does not take ownership.
- void AddSendInitialMetadata(
- std::multimap<grpc::string, grpc::string>* metadata);
- void AddSendInitialMetadata(ClientContext* ctx);
- void AddRecvInitialMetadata(ClientContext* ctx);
- void AddSendMessage(const grpc::protobuf::Message& message);
- void AddSendMessage(const ByteBuffer& message);
- void AddRecvMessage(grpc::protobuf::Message* message);
- void AddRecvMessage(ByteBuffer* message);
- void AddClientSendClose();
- void AddClientRecvStatus(ClientContext* ctx, Status* status);
- void AddServerSendStatus(std::multimap<grpc::string, grpc::string>* metadata,
- const Status& status);
- void AddServerRecvClose(bool* cancelled);
-
- // INTERNAL API:
-
- // Convert to an array of grpc_op elements
- void FillOps(grpc_op* ops, size_t* nops);
-
- // Called by completion queue just prior to returning from Next() or Pluck()
- bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
+ CallOpSendInitialMetadata() : send_(false) {}
- void set_max_message_size(int max_message_size) {
- max_message_size_ = max_message_size;
+ void SendInitialMetadata(
+ const std::multimap<grpc::string, grpc::string>& metadata) {
+ send_ = true;
+ initial_metadata_count_ = metadata.size();
+ initial_metadata_ = FillMetadataArray(metadata);
}
- bool got_message;
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops) {
+ if (!send_) return;
+ grpc_op* op = &ops[(*nops)++];
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->flags = 0;
+ op->data.send_initial_metadata.count = initial_metadata_count_;
+ op->data.send_initial_metadata.metadata = initial_metadata_;
+ }
+ void FinishOp(bool* status, int max_message_size) {
+ if (!send_) return;
+ gpr_free(initial_metadata_);
+ send_ = false;
+ }
- private:
- void* return_tag_;
- // Send initial metadata
- bool send_initial_metadata_;
+ bool send_;
size_t initial_metadata_count_;
grpc_metadata* initial_metadata_;
- // Recv initial metadta
- std::multimap<grpc::string, grpc::string>* recv_initial_metadata_;
- grpc_metadata_array recv_initial_metadata_arr_;
- // Send message
- const grpc::protobuf::Message* send_message_;
- const ByteBuffer* send_message_buffer_;
+};
+
+class CallOpSendMessage {
+ public:
+ CallOpSendMessage() : send_buf_(nullptr), own_buf_(false) {}
+
+ template <class M>
+ Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops) {
+ if (send_buf_ == nullptr) return;
+ grpc_op* op = &ops[(*nops)++];
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->flags = 0;
+ op->data.send_message = send_buf_;
+ }
+ void FinishOp(bool* status, int max_message_size) {
+ if (own_buf_) grpc_byte_buffer_destroy(send_buf_);
+ send_buf_ = nullptr;
+ }
+
+ private:
grpc_byte_buffer* send_buf_;
- // Recv message
- grpc::protobuf::Message* recv_message_;
- ByteBuffer* recv_message_buffer_;
+ bool own_buf_;
+};
+
+template <class M>
+Status CallOpSendMessage::SendMessage(const M& message) {
+ return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_);
+}
+
+template <class R>
+class CallOpRecvMessage {
+ public:
+ CallOpRecvMessage() : got_message(false), message_(nullptr) {}
+
+ void RecvMessage(R* message) { message_ = message; }
+
+ bool got_message;
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops) {
+ if (message_ == nullptr) return;
+ grpc_op* op = &ops[(*nops)++];
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->flags = 0;
+ op->data.recv_message = &recv_buf_;
+ }
+
+ void FinishOp(bool* status, int max_message_size) {
+ if (message_ == nullptr) return;
+ if (recv_buf_) {
+ if (*status) {
+ got_message = true;
+ *status = SerializationTraits<R>::Deserialize(recv_buf_, message_,
+ max_message_size)
+ .ok();
+ } else {
+ got_message = false;
+ grpc_byte_buffer_destroy(recv_buf_);
+ }
+ } else {
+ got_message = false;
+ *status = false;
+ }
+ message_ = nullptr;
+ }
+
+ private:
+ R* message_;
grpc_byte_buffer* recv_buf_;
- int max_message_size_;
- // Client send close
- bool client_send_close_;
- // Client recv status
+};
+
+class CallOpGenericRecvMessage {
+ public:
+ CallOpGenericRecvMessage() : got_message(false) {}
+
+ template <class R>
+ void RecvMessage(R* message) {
+ deserialize_ = [message](grpc_byte_buffer* buf,
+ int max_message_size) -> Status {
+ return SerializationTraits<R>::Deserialize(buf, message,
+ max_message_size);
+ };
+ }
+
+ bool got_message;
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops) {
+ if (!deserialize_) return;
+ grpc_op* op = &ops[(*nops)++];
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->flags = 0;
+ op->data.recv_message = &recv_buf_;
+ }
+
+ void FinishOp(bool* status, int max_message_size) {
+ if (!deserialize_) return;
+ if (recv_buf_) {
+ if (*status) {
+ got_message = true;
+ *status = deserialize_(recv_buf_, max_message_size).ok();
+ } else {
+ got_message = false;
+ grpc_byte_buffer_destroy(recv_buf_);
+ }
+ } else {
+ got_message = false;
+ *status = false;
+ }
+ deserialize_ = DeserializeFunc();
+ }
+
+ private:
+ typedef std::function<Status(grpc_byte_buffer*, int)> DeserializeFunc;
+ DeserializeFunc deserialize_;
+ grpc_byte_buffer* recv_buf_;
+};
+
+class CallOpClientSendClose {
+ public:
+ CallOpClientSendClose() : send_(false) {}
+
+ void ClientSendClose() { send_ = true; }
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops) {
+ if (!send_) return;
+ grpc_op* op = &ops[(*nops)++];
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ op->flags = 0;
+ }
+ void FinishOp(bool* status, int max_message_size) { send_ = false; }
+
+ private:
+ bool send_;
+};
+
+class CallOpServerSendStatus {
+ public:
+ CallOpServerSendStatus() : send_status_available_(false) {}
+
+ void ServerSendStatus(
+ const std::multimap<grpc::string, grpc::string>& trailing_metadata,
+ const Status& status) {
+ trailing_metadata_count_ = trailing_metadata.size();
+ trailing_metadata_ = FillMetadataArray(trailing_metadata);
+ send_status_available_ = true;
+ send_status_code_ = static_cast<grpc_status_code>(status.error_code());
+ send_status_details_ = status.error_message();
+ }
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops) {
+ if (!send_status_available_) return;
+ grpc_op* op = &ops[(*nops)++];
+ op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ op->data.send_status_from_server.trailing_metadata_count =
+ trailing_metadata_count_;
+ op->data.send_status_from_server.trailing_metadata = trailing_metadata_;
+ op->data.send_status_from_server.status = send_status_code_;
+ op->data.send_status_from_server.status_details =
+ send_status_details_.empty() ? nullptr : send_status_details_.c_str();
+ op->flags = 0;
+ }
+
+ void FinishOp(bool* status, int max_message_size) {
+ if (!send_status_available_) return;
+ gpr_free(trailing_metadata_);
+ send_status_available_ = false;
+ }
+
+ private:
+ bool send_status_available_;
+ grpc_status_code send_status_code_;
+ grpc::string send_status_details_;
+ size_t trailing_metadata_count_;
+ grpc_metadata* trailing_metadata_;
+};
+
+class CallOpRecvInitialMetadata {
+ public:
+ CallOpRecvInitialMetadata() : recv_initial_metadata_(nullptr) {}
+
+ void RecvInitialMetadata(ClientContext* context) {
+ context->initial_metadata_received_ = true;
+ recv_initial_metadata_ = &context->recv_initial_metadata_;
+ }
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops) {
+ if (!recv_initial_metadata_) return;
+ memset(&recv_initial_metadata_arr_, 0, sizeof(recv_initial_metadata_arr_));
+ grpc_op* op = &ops[(*nops)++];
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata = &recv_initial_metadata_arr_;
+ op->flags = 0;
+ }
+ void FinishOp(bool* status, int max_message_size) {
+ if (recv_initial_metadata_ == nullptr) return;
+ FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_);
+ recv_initial_metadata_ = nullptr;
+ }
+
+ private:
+ std::multimap<grpc::string, grpc::string>* recv_initial_metadata_;
+ grpc_metadata_array recv_initial_metadata_arr_;
+};
+
+class CallOpClientRecvStatus {
+ public:
+ CallOpClientRecvStatus() : recv_status_(nullptr) {}
+
+ void ClientRecvStatus(ClientContext* context, Status* status) {
+ recv_trailing_metadata_ = &context->trailing_metadata_;
+ recv_status_ = status;
+ }
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops) {
+ if (recv_status_ == nullptr) return;
+ memset(&recv_trailing_metadata_arr_, 0,
+ sizeof(recv_trailing_metadata_arr_));
+ status_details_ = nullptr;
+ status_details_capacity_ = 0;
+ grpc_op* op = &ops[(*nops)++];
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata =
+ &recv_trailing_metadata_arr_;
+ op->data.recv_status_on_client.status = &status_code_;
+ op->data.recv_status_on_client.status_details = &status_details_;
+ op->data.recv_status_on_client.status_details_capacity =
+ &status_details_capacity_;
+ op->flags = 0;
+ }
+
+ void FinishOp(bool* status, int max_message_size) {
+ if (recv_status_ == nullptr) return;
+ FillMetadataMap(&recv_trailing_metadata_arr_, recv_trailing_metadata_);
+ *recv_status_ = Status(
+ static_cast<StatusCode>(status_code_),
+ status_details_ ? grpc::string(status_details_) : grpc::string());
+ gpr_free(status_details_);
+ recv_status_ = nullptr;
+ }
+
+ private:
std::multimap<grpc::string, grpc::string>* recv_trailing_metadata_;
Status* recv_status_;
grpc_metadata_array recv_trailing_metadata_arr_;
grpc_status_code status_code_;
char* status_details_;
size_t status_details_capacity_;
- // Server send status
- bool send_status_available_;
- grpc_status_code send_status_code_;
- grpc::string send_status_details_;
- size_t trailing_metadata_count_;
- grpc_metadata* trailing_metadata_;
- int cancelled_buf_;
- bool* recv_closed_;
};
-// SneakyCallOpBuffer does not post completions to the completion queue
-class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer {
+/// An abstract collection of call ops, used to generate the
+/// grpc_call_op structure to pass down to the lower layers,
+/// and as it is-a CompletionQueueTag, also massages the final
+/// completion into the correct form for consumption in the C++
+/// API.
+class CallOpSetInterface : public CompletionQueueTag {
+ public:
+ CallOpSetInterface() : max_message_size_(0) {}
+ /// Fills in grpc_op, starting from ops[*nops] and moving
+ /// upwards.
+ virtual void FillOps(grpc_op* ops, size_t* nops) = 0;
+
+ void set_max_message_size(int max_message_size) {
+ max_message_size_ = max_message_size;
+ }
+
+ protected:
+ int max_message_size_;
+};
+
+/// Primary implementaiton of CallOpSetInterface.
+/// Since we cannot use variadic templates, we declare slots up to
+/// the maximum count of ops we'll need in a set. We leverage the
+/// empty base class optimization to slim this class (especially
+/// when there are many unused slots used). To avoid duplicate base classes,
+/// the template parmeter for CallNoOp is varied by argument position.
+template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
+ class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
+ class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
+class CallOpSet : public CallOpSetInterface,
+ public Op1,
+ public Op2,
+ public Op3,
+ public Op4,
+ public Op5,
+ public Op6 {
+ public:
+ CallOpSet() : return_tag_(this) {}
+ void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE {
+ this->Op1::AddOp(ops, nops);
+ this->Op2::AddOp(ops, nops);
+ this->Op3::AddOp(ops, nops);
+ this->Op4::AddOp(ops, nops);
+ this->Op5::AddOp(ops, nops);
+ this->Op6::AddOp(ops, nops);
+ }
+
+ bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
+ this->Op1::FinishOp(status, max_message_size_);
+ this->Op2::FinishOp(status, max_message_size_);
+ this->Op3::FinishOp(status, max_message_size_);
+ this->Op4::FinishOp(status, max_message_size_);
+ this->Op5::FinishOp(status, max_message_size_);
+ this->Op6::FinishOp(status, max_message_size_);
+ *tag = return_tag_;
+ return true;
+ }
+
+ void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
+
+ private:
+ void* return_tag_;
+};
+
+/// A CallOpSet that does not post completions to the completion queue.
+///
+/// Allows hiding some completions that the C core must generate from
+/// C++ users.
+template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
+ class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
+ class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
+class SneakyCallOpSet GRPC_FINAL
+ : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> {
public:
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
- return CallOpBuffer::FinalizeResult(tag, status) && false;
+ typedef CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> Base;
+ return Base::FinalizeResult(tag, status) && false;
}
};
@@ -135,7 +438,7 @@ class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer {
class CallHook {
public:
virtual ~CallHook() {}
- virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) = 0;
+ virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0;
};
// Straightforward wrapping of the C call object
@@ -146,7 +449,7 @@ class Call GRPC_FINAL {
Call(grpc_call* call, CallHook* call_hook_, CompletionQueue* cq,
int max_message_size);
- void PerformOps(CallOpBuffer* buffer);
+ void PerformOps(CallOpSetInterface* ops);
grpc_call* call() { return call_; }
CompletionQueue* cq() { return cq_; }
diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h
index 2f234fd3ac..b77ce7d02c 100644
--- a/include/grpc++/impl/client_unary_call.h
+++ b/include/grpc++/impl/client_unary_call.h
@@ -37,6 +37,8 @@
#include <grpc++/config.h>
#include <grpc++/status.h>
+#include <grpc++/impl/call.h>
+
namespace grpc {
class ChannelInterface;
@@ -45,10 +47,28 @@ class CompletionQueue;
class RpcMethod;
// Wrapper that performs a blocking unary call
+template <class InputMessage, class OutputMessage>
Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context,
- const grpc::protobuf::Message& request,
- grpc::protobuf::Message* result);
+ ClientContext* context, const InputMessage& request,
+ OutputMessage* result) {
+ CompletionQueue cq;
+ Call call(channel->CreateCall(method, context, &cq));
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>,
+ CallOpClientSendClose, CallOpClientRecvStatus> ops;
+ Status status = ops.SendMessage(request);
+ if (!status.ok()) {
+ return status;
+ }
+ ops.SendInitialMetadata(context->send_initial_metadata_);
+ ops.RecvInitialMetadata(context);
+ ops.RecvMessage(result);
+ ops.ClientSendClose();
+ ops.ClientRecvStatus(context, &status);
+ call.PerformOps(&ops);
+ GPR_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.ok());
+ return status;
+}
} // namespace grpc
diff --git a/src/cpp/proto/proto_utils.h b/include/grpc++/impl/proto_utils.h
index 67a775b3ca..ebefa3e1be 100644
--- a/src/cpp/proto/proto_utils.h
+++ b/include/grpc++/impl/proto_utils.h
@@ -34,21 +34,42 @@
#ifndef GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
#define GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
-#include <grpc++/config.h>
+#include <type_traits>
-struct grpc_byte_buffer;
+#include <grpc/grpc.h>
+#include <grpc++/impl/serialization_traits.h>
+#include <grpc++/config_protobuf.h>
+#include <grpc++/status.h>
namespace grpc {
// Serialize the msg into a buffer created inside the function. The caller
// should destroy the returned buffer when done with it. If serialization fails,
// false is returned and buffer is left unchanged.
-bool SerializeProto(const grpc::protobuf::Message& msg,
- grpc_byte_buffer** buffer);
+Status SerializeProto(const grpc::protobuf::Message& msg,
+ grpc_byte_buffer** buffer);
// The caller keeps ownership of buffer and msg.
-bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
- int max_message_size);
+Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
+ int max_message_size);
+
+template <class T>
+class SerializationTraits<T, typename std::enable_if<std::is_base_of<
+ grpc::protobuf::Message, T>::value>::type> {
+ public:
+ static Status Serialize(const grpc::protobuf::Message& msg,
+ grpc_byte_buffer** buffer, bool* own_buffer) {
+ *own_buffer = true;
+ return SerializeProto(msg, buffer);
+ }
+ static Status Deserialize(grpc_byte_buffer* buffer,
+ grpc::protobuf::Message* msg,
+ int max_message_size) {
+ auto status = DeserializeProto(buffer, msg, max_message_size);
+ grpc_byte_buffer_destroy(buffer);
+ return status;
+ }
+};
} // namespace grpc
diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h
index 50204d2099..3cfbef7806 100644
--- a/include/grpc++/impl/rpc_service_method.h
+++ b/include/grpc++/impl/rpc_service_method.h
@@ -55,16 +55,19 @@ class MethodHandler {
public:
virtual ~MethodHandler() {}
struct HandlerParameter {
- HandlerParameter(Call* c, ServerContext* context,
- const grpc::protobuf::Message* req,
- grpc::protobuf::Message* resp)
- : call(c), server_context(context), request(req), response(resp) {}
+ HandlerParameter(Call* c, ServerContext* context, grpc_byte_buffer* req,
+ int max_size)
+ : call(c),
+ server_context(context),
+ request(req),
+ max_message_size(max_size) {}
Call* call;
ServerContext* server_context;
- const grpc::protobuf::Message* request;
- grpc::protobuf::Message* response;
+ // Handler required to grpc_byte_buffer_destroy this
+ grpc_byte_buffer* request;
+ int max_message_size;
};
- virtual Status RunHandler(const HandlerParameter& param) = 0;
+ virtual void RunHandler(const HandlerParameter& param) = 0;
};
// A wrapper class of an application provided rpc method handler.
@@ -77,11 +80,25 @@ class RpcMethodHandler : public MethodHandler {
ServiceType* service)
: func_(func), service_(service) {}
- Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
- // Invoke application function, cast proto messages to their actual types.
- return func_(service_, param.server_context,
- dynamic_cast<const RequestType*>(param.request),
- dynamic_cast<ResponseType*>(param.response));
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ RequestType req;
+ Status status = SerializationTraits<RequestType>::Deserialize(
+ param.request, &req, param.max_message_size);
+ ResponseType rsp;
+ if (status.ok()) {
+ status = func_(service_, param.server_context, &req, &rsp);
+ }
+
+ GPR_ASSERT(!param.server_context->sent_initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus> ops;
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ if (status.ok()) {
+ status = ops.SendMessage(rsp);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
}
private:
@@ -102,10 +119,21 @@ class ClientStreamingHandler : public MethodHandler {
ServiceType* service)
: func_(func), service_(service) {}
- Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
ServerReader<RequestType> reader(param.call, param.server_context);
- return func_(service_, param.server_context, &reader,
- dynamic_cast<ResponseType*>(param.response));
+ ResponseType rsp;
+ Status status = func_(service_, param.server_context, &reader, &rsp);
+
+ GPR_ASSERT(!param.server_context->sent_initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus> ops;
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ if (status.ok()) {
+ status = ops.SendMessage(rsp);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
}
private:
@@ -124,10 +152,23 @@ class ServerStreamingHandler : public MethodHandler {
ServiceType* service)
: func_(func), service_(service) {}
- Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
- ServerWriter<ResponseType> writer(param.call, param.server_context);
- return func_(service_, param.server_context,
- dynamic_cast<const RequestType*>(param.request), &writer);
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ RequestType req;
+ Status status = SerializationTraits<RequestType>::Deserialize(
+ param.request, &req, param.max_message_size);
+
+ if (status.ok()) {
+ ServerWriter<ResponseType> writer(param.call, param.server_context);
+ status = func_(service_, param.server_context, &req, &writer);
+ }
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
+ if (!param.server_context->sent_initial_metadata_) {
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
}
private:
@@ -147,10 +188,18 @@ class BidiStreamingHandler : public MethodHandler {
ServiceType* service)
: func_(func), service_(service) {}
- Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
ServerReaderWriter<ResponseType, RequestType> stream(param.call,
param.server_context);
- return func_(service_, param.server_context, &stream);
+ Status status = func_(service_, param.server_context, &stream);
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
+ if (!param.server_context->sent_initial_metadata_) {
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
}
private:
@@ -162,29 +211,15 @@ class BidiStreamingHandler : public MethodHandler {
// Server side rpc method class
class RpcServiceMethod : public RpcMethod {
public:
- // Takes ownership of the handler and two prototype objects.
+ // Takes ownership of the handler
RpcServiceMethod(const char* name, RpcMethod::RpcType type,
- MethodHandler* handler,
- grpc::protobuf::Message* request_prototype,
- grpc::protobuf::Message* response_prototype)
- : RpcMethod(name, type, nullptr),
- handler_(handler),
- request_prototype_(request_prototype),
- response_prototype_(response_prototype) {}
+ MethodHandler* handler)
+ : RpcMethod(name, type, nullptr), handler_(handler) {}
MethodHandler* handler() { return handler_.get(); }
- grpc::protobuf::Message* AllocateRequestProto() {
- return request_prototype_->New();
- }
- grpc::protobuf::Message* AllocateResponseProto() {
- return response_prototype_->New();
- }
-
private:
std::unique_ptr<MethodHandler> handler_;
- std::unique_ptr<grpc::protobuf::Message> request_prototype_;
- std::unique_ptr<grpc::protobuf::Message> response_prototype_;
};
// This class contains all the method information for an rpc service. It is
diff --git a/include/grpc++/impl/serialization_traits.h b/include/grpc++/impl/serialization_traits.h
new file mode 100644
index 0000000000..1f5c674e4c
--- /dev/null
+++ b/include/grpc++/impl/serialization_traits.h
@@ -0,0 +1,68 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPCXX_IMPL_SERIALIZATION_TRAITS_H
+#define GRPCXX_IMPL_SERIALIZATION_TRAITS_H
+
+namespace grpc {
+
+/// Defines how to serialize and deserialize some type.
+///
+/// Used for hooking different message serialization API's into GRPC.
+/// Each SerializationTraits implementation must provide the following
+/// functions:
+/// static Status Serialize(const Message& msg,
+/// grpc_byte_buffer** buffer,
+// bool* own_buffer);
+/// static Status Deserialize(grpc_byte_buffer* buffer,
+/// Message* msg,
+/// int max_message_size);
+///
+/// Serialize is required to convert message to a grpc_byte_buffer, and
+/// to store a pointer to that byte buffer at *buffer. *own_buffer should
+/// be set to true if the caller owns said byte buffer, or false if
+/// ownership is retained elsewhere.
+///
+/// Deserialize is required to convert buffer into the message stored at
+/// msg. max_message_size is passed in as a bound on the maximum number of
+/// message bytes Deserialize should accept.
+///
+/// Both functions return a Status, allowing them to explain what went
+/// wrong if required.
+template <class Message,
+ class UnusedButHereForPartialTemplateSpecialization = void>
+class SerializationTraits;
+
+} // namespace grpc
+
+#endif // GRPCXX_IMPL_SERIALIZATION_TRAITS_H
diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h
index 25e437edad..c33a278f5b 100644
--- a/include/grpc++/impl/service_type.h
+++ b/include/grpc++/impl/service_type.h
@@ -35,6 +35,8 @@
#define GRPCXX_IMPL_SERVICE_TYPE_H
#include <grpc++/config.h>
+#include <grpc++/impl/serialization_traits.h>
+#include <grpc++/server.h>
#include <grpc++/status.h>
namespace grpc {
@@ -65,20 +67,8 @@ class ServerAsyncStreamingInterface {
class AsynchronousService {
public:
- // this is Server, but in disguise to avoid a link dependency
- class DispatchImpl {
- public:
- virtual void RequestAsyncCall(void* registered_method,
- ServerContext* context,
- ::grpc::protobuf::Message* request,
- ServerAsyncStreamingInterface* stream,
- CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq,
- void* tag) = 0;
- };
-
AsynchronousService(const char** method_names, size_t method_count)
- : dispatch_impl_(nullptr),
+ : server_(nullptr),
method_names_(method_names),
method_count_(method_count),
request_args_(nullptr) {}
@@ -86,42 +76,43 @@ class AsynchronousService {
~AsynchronousService() { delete[] request_args_; }
protected:
- void RequestAsyncUnary(int index, ServerContext* context,
- grpc::protobuf::Message* request,
+ template <class Message>
+ void RequestAsyncUnary(int index, ServerContext* context, Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
- dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
- stream, call_cq, notification_cq, tag);
+ server_->RequestAsyncCall(request_args_[index], context, stream, call_cq,
+ notification_cq, tag, request);
}
void RequestClientStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
- dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
- stream, call_cq, notification_cq, tag);
+ server_->RequestAsyncCall(request_args_[index], context, stream, call_cq,
+ notification_cq, tag);
}
+ template <class Message>
void RequestServerStreaming(int index, ServerContext* context,
- grpc::protobuf::Message* request,
+ Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
- dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
- stream, call_cq, notification_cq, tag);
+ server_->RequestAsyncCall(request_args_[index], context, stream, call_cq,
+ notification_cq, tag, request);
}
void RequestBidiStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
- dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
- stream, call_cq, notification_cq, tag);
+ server_->RequestAsyncCall(request_args_[index], context, stream, call_cq,
+ notification_cq, tag);
}
private:
friend class Server;
- DispatchImpl* dispatch_impl_;
+ Server* server_;
const char** const method_names_;
size_t method_count_;
void** request_args_;
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 2cfeb359fc..6a9e757e77 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -41,25 +41,24 @@
#include <grpc++/config.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/grpc_library.h>
-#include <grpc++/impl/service_type.h>
#include <grpc++/impl/sync.h>
#include <grpc++/status.h>
struct grpc_server;
namespace grpc {
+
class AsynchronousService;
class GenericServerContext;
class AsyncGenericService;
class RpcService;
class RpcServiceMethod;
+class ServerAsyncStreamingInterface;
class ServerCredentials;
class ThreadPoolInterface;
// Currently it only supports handling rpcs in a single thread.
-class Server GRPC_FINAL : public GrpcLibrary,
- private CallHook,
- private AsynchronousService::DispatchImpl {
+class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
public:
~Server();
@@ -73,6 +72,7 @@ class Server GRPC_FINAL : public GrpcLibrary,
private:
friend class AsyncGenericService;
+ friend class AsynchronousService;
friend class ServerBuilder;
class SyncRequest;
@@ -96,21 +96,123 @@ class Server GRPC_FINAL : public GrpcLibrary,
void RunRpc();
void ScheduleCallback();
- void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE;
+ void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
+
+ class BaseAsyncRequest : public CompletionQueueTag {
+ public:
+ BaseAsyncRequest(Server* server, ServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* call_cq, void* tag);
+ virtual ~BaseAsyncRequest();
+
+ bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
+
+ protected:
+ Server* const server_;
+ ServerContext* const context_;
+ ServerAsyncStreamingInterface* const stream_;
+ CompletionQueue* const call_cq_;
+ void* const tag_;
+ grpc_call* call_;
+ grpc_metadata_array initial_metadata_array_;
+ };
+
+ class RegisteredAsyncRequest : public BaseAsyncRequest {
+ public:
+ RegisteredAsyncRequest(Server* server, ServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* call_cq, void* tag);
+
+ // uses BaseAsyncRequest::FinalizeResult
+
+ protected:
+ void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
+ ServerCompletionQueue* notification_cq);
+ };
+
+ class NoPayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
+ public:
+ NoPayloadAsyncRequest(void* registered_method, Server* server,
+ ServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag)
+ : RegisteredAsyncRequest(server, context, stream, call_cq, tag) {
+ IssueRequest(registered_method, nullptr, notification_cq);
+ }
+
+ // uses RegisteredAsyncRequest::FinalizeResult
+ };
+
+ template <class Message>
+ class PayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
+ public:
+ PayloadAsyncRequest(void* registered_method, Server* server,
+ ServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag,
+ Message* request)
+ : RegisteredAsyncRequest(server, context, stream, call_cq, tag),
+ request_(request) {
+ IssueRequest(registered_method, &payload_, notification_cq);
+ }
+
+ bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
+ bool serialization_status =
+ *status && payload_ &&
+ SerializationTraits<Message>::Deserialize(payload_, request_,
+ server_->max_message_size_)
+ .ok();
+ bool ret = RegisteredAsyncRequest::FinalizeResult(tag, status);
+ *status = serialization_status && *status;
+ return ret;
+ }
+
+ private:
+ grpc_byte_buffer* payload_;
+ Message* const request_;
+ };
+
+ class GenericAsyncRequest GRPC_FINAL : public BaseAsyncRequest {
+ public:
+ GenericAsyncRequest(Server* server, GenericServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag);
+
+ bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
+
+ private:
+ grpc_call_details call_details_;
+ };
+
+ template <class Message>
+ void RequestAsyncCall(void* registered_method, ServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag,
+ Message* message) {
+ new PayloadAsyncRequest<Message>(registered_method, this, context, stream,
+ call_cq, notification_cq, tag, message);
+ }
- // DispatchImpl
void RequestAsyncCall(void* registered_method, ServerContext* context,
- grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq,
- void* tag) GRPC_OVERRIDE;
+ ServerCompletionQueue* notification_cq, void* tag) {
+ new NoPayloadAsyncRequest(registered_method, this, context, stream, call_cq,
+ notification_cq, tag);
+ }
void RequestAsyncGenericCall(GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq,
+ CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
- void* tag);
+ void* tag) {
+ new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
+ tag);
+ }
const int max_message_size_;
@@ -133,8 +235,6 @@ class Server GRPC_FINAL : public GrpcLibrary,
ThreadPoolInterface* thread_pool_;
// Whether the thread pool is created and owned by the server.
bool thread_pool_owned_;
- private:
- Server() : max_message_size_(-1), server_(NULL) { abort(); }
};
} // namespace grpc
diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h
index d88a3ae262..326b6a125c 100644
--- a/include/grpc++/server_context.h
+++ b/include/grpc++/server_context.h
@@ -60,6 +60,14 @@ template <class W>
class ServerWriter;
template <class R, class W>
class ServerReaderWriter;
+template <class ServiceType, class RequestType, class ResponseType>
+class RpcMethodHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class ClientStreamingHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class ServerStreamingHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class BidiStreamingHandler;
class Call;
class CallOpBuffer;
@@ -105,6 +113,14 @@ class ServerContext {
friend class ::grpc::ServerWriter;
template <class R, class W>
friend class ::grpc::ServerReaderWriter;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class RpcMethodHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class ClientStreamingHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class ServerStreamingHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class BidiStreamingHandler;
// Prevent copying.
ServerContext(const ServerContext&);
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index 472911e62b..dd5e52d6d3 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -93,15 +93,18 @@ template <class R>
class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
public:
// Blocking create a stream and write the first request out.
+ template <class W>
ClientReader(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context, const grpc::protobuf::Message& request)
+ ClientContext* context, const W& request)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
- CallOpBuffer buf;
- buf.AddSendInitialMetadata(&context->send_initial_metadata_);
- buf.AddSendMessage(request);
- buf.AddClientSendClose();
- call_.PerformOps(&buf);
- cq_.Pluck(&buf);
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpClientSendClose> ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_);
+ // TODO(ctiller): don't assert
+ GPR_ASSERT(ops.SendMessage(request).ok());
+ ops.ClientSendClose();
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
}
// Blocking wait for initial metadata from server. The received metadata
@@ -111,28 +114,28 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
- CallOpBuffer buf;
- buf.AddRecvInitialMetadata(context_);
- call_.PerformOps(&buf);
- cq_.Pluck(&buf); // status ignored
+ CallOpSet<CallOpRecvInitialMetadata> ops;
+ ops.RecvInitialMetadata(context_);
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops); // status ignored
}
bool Read(R* msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
if (!context_->initial_metadata_received_) {
- buf.AddRecvInitialMetadata(context_);
+ ops.RecvInitialMetadata(context_);
}
- buf.AddRecvMessage(msg);
- call_.PerformOps(&buf);
- return cq_.Pluck(&buf) && buf.got_message;
+ ops.RecvMessage(msg);
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops) && ops.got_message;
}
Status Finish() GRPC_OVERRIDE {
- CallOpBuffer buf;
+ CallOpSet<CallOpClientRecvStatus> ops;
Status status;
- buf.AddClientRecvStatus(context_, &status);
- call_.PerformOps(&buf);
- GPR_ASSERT(cq_.Pluck(&buf));
+ ops.ClientRecvStatus(context_, &status);
+ call_.PerformOps(&ops);
+ GPR_ASSERT(cq_.Pluck(&ops));
return status;
}
@@ -150,48 +153,49 @@ class ClientWriterInterface : public ClientStreamingInterface,
};
template <class W>
-class ClientWriter GRPC_FINAL : public ClientWriterInterface<W> {
+class ClientWriter : public ClientWriterInterface<W> {
public:
// Blocking create a stream.
+ template <class R>
ClientWriter(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context, grpc::protobuf::Message* response)
- : context_(context),
- response_(response),
- call_(channel->CreateCall(method, context, &cq_)) {
- CallOpBuffer buf;
- buf.AddSendInitialMetadata(&context->send_initial_metadata_);
- call_.PerformOps(&buf);
- cq_.Pluck(&buf);
+ ClientContext* context, R* response)
+ : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
+ finish_ops_.RecvMessage(response);
+
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_);
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
}
bool Write(const W& msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
- buf.AddSendMessage(msg);
- call_.PerformOps(&buf);
- return cq_.Pluck(&buf);
+ CallOpSet<CallOpSendMessage> ops;
+ if (!ops.SendMessage(msg).ok()) {
+ return false;
+ }
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops);
}
bool WritesDone() GRPC_OVERRIDE {
- CallOpBuffer buf;
- buf.AddClientSendClose();
- call_.PerformOps(&buf);
- return cq_.Pluck(&buf);
+ CallOpSet<CallOpClientSendClose> ops;
+ ops.ClientSendClose();
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops);
}
// Read the final response and wait for the final status.
Status Finish() GRPC_OVERRIDE {
- CallOpBuffer buf;
Status status;
- buf.AddRecvMessage(response_);
- buf.AddClientRecvStatus(context_, &status);
- call_.PerformOps(&buf);
- GPR_ASSERT(cq_.Pluck(&buf));
+ finish_ops_.ClientRecvStatus(context_, &status);
+ call_.PerformOps(&finish_ops_);
+ GPR_ASSERT(cq_.Pluck(&finish_ops_));
return status;
}
private:
ClientContext* context_;
- grpc::protobuf::Message* const response_;
+ CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
CompletionQueue cq_;
Call call_;
};
@@ -213,10 +217,10 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
- CallOpBuffer buf;
- buf.AddSendInitialMetadata(&context->send_initial_metadata_);
- call_.PerformOps(&buf);
- cq_.Pluck(&buf);
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_);
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
}
// Blocking wait for initial metadata from server. The received metadata
@@ -226,42 +230,42 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
- CallOpBuffer buf;
- buf.AddRecvInitialMetadata(context_);
- call_.PerformOps(&buf);
- cq_.Pluck(&buf); // status ignored
+ CallOpSet<CallOpRecvInitialMetadata> ops;
+ ops.RecvInitialMetadata(context_);
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops); // status ignored
}
bool Read(R* msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
if (!context_->initial_metadata_received_) {
- buf.AddRecvInitialMetadata(context_);
+ ops.RecvInitialMetadata(context_);
}
- buf.AddRecvMessage(msg);
- call_.PerformOps(&buf);
- return cq_.Pluck(&buf) && buf.got_message;
+ ops.RecvMessage(msg);
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops) && ops.got_message;
}
bool Write(const W& msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
- buf.AddSendMessage(msg);
- call_.PerformOps(&buf);
- return cq_.Pluck(&buf);
+ CallOpSet<CallOpSendMessage> ops;
+ if (!ops.SendMessage(msg).ok()) return false;
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops);
}
bool WritesDone() GRPC_OVERRIDE {
- CallOpBuffer buf;
- buf.AddClientSendClose();
- call_.PerformOps(&buf);
- return cq_.Pluck(&buf);
+ CallOpSet<CallOpClientSendClose> ops;
+ ops.ClientSendClose();
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops);
}
Status Finish() GRPC_OVERRIDE {
- CallOpBuffer buf;
+ CallOpSet<CallOpClientRecvStatus> ops;
Status status;
- buf.AddClientRecvStatus(context_, &status);
- call_.PerformOps(&buf);
- GPR_ASSERT(cq_.Pluck(&buf));
+ ops.ClientRecvStatus(context_, &status);
+ call_.PerformOps(&ops);
+ GPR_ASSERT(cq_.Pluck(&ops));
return status;
}
@@ -279,18 +283,18 @@ class ServerReader GRPC_FINAL : public ReaderInterface<R> {
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- CallOpBuffer buf;
- buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_->PerformOps(&buf);
- call_->cq()->Pluck(&buf);
+ call_->PerformOps(&ops);
+ call_->cq()->Pluck(&ops);
}
bool Read(R* msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
- buf.AddRecvMessage(msg);
- call_->PerformOps(&buf);
- return call_->cq()->Pluck(&buf) && buf.got_message;
+ CallOpSet<CallOpRecvMessage<R>> ops;
+ ops.RecvMessage(msg);
+ call_->PerformOps(&ops);
+ return call_->cq()->Pluck(&ops) && ops.got_message;
}
private:
@@ -306,22 +310,24 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> {
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- CallOpBuffer buf;
- buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_->PerformOps(&buf);
- call_->cq()->Pluck(&buf);
+ call_->PerformOps(&ops);
+ call_->cq()->Pluck(&ops);
}
bool Write(const W& msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
+ if (!ops.SendMessage(msg).ok()) {
+ return false;
+ }
if (!ctx_->sent_initial_metadata_) {
- buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- buf.AddSendMessage(msg);
- call_->PerformOps(&buf);
- return call_->cq()->Pluck(&buf);
+ call_->PerformOps(&ops);
+ return call_->cq()->Pluck(&ops);
}
private:
@@ -339,29 +345,31 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- CallOpBuffer buf;
- buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_->PerformOps(&buf);
- call_->cq()->Pluck(&buf);
+ call_->PerformOps(&ops);
+ call_->cq()->Pluck(&ops);
}
bool Read(R* msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
- buf.AddRecvMessage(msg);
- call_->PerformOps(&buf);
- return call_->cq()->Pluck(&buf) && buf.got_message;
+ CallOpSet<CallOpRecvMessage<R>> ops;
+ ops.RecvMessage(msg);
+ call_->PerformOps(&ops);
+ return call_->cq()->Pluck(&ops) && ops.got_message;
}
bool Write(const W& msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
+ if (!ops.SendMessage(msg).ok()) {
+ return false;
+ }
if (!ctx_->sent_initial_metadata_) {
- buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- buf.AddSendMessage(msg);
- call_->PerformOps(&buf);
- return call_->cq()->Pluck(&buf);
+ call_->PerformOps(&ops);
+ return call_->cq()->Pluck(&ops);
}
private:
@@ -400,57 +408,59 @@ class AsyncWriterInterface {
template <class R>
class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface,
- public AsyncReaderInterface<R> {
-};
+ public AsyncReaderInterface<R> {};
template <class R>
class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
public:
// Create a stream and write the first request out.
+ template <class W>
ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
- const grpc::protobuf::Message& request, void* tag)
+ const W& request, void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_buf_.Reset(tag);
- init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
- init_buf_.AddSendMessage(request);
- init_buf_.AddClientSendClose();
- call_.PerformOps(&init_buf_);
+ init_ops_.set_output_tag(tag);
+ init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+ // TODO(ctiller): don't assert
+ GPR_ASSERT(init_ops_.SendMessage(request).ok());
+ init_ops_.ClientSendClose();
+ call_.PerformOps(&init_ops_);
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_buf_.Reset(tag);
- meta_buf_.AddRecvInitialMetadata(context_);
- call_.PerformOps(&meta_buf_);
+ meta_ops_.set_output_tag(tag);
+ meta_ops_.RecvInitialMetadata(context_);
+ call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
- read_buf_.Reset(tag);
+ read_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- read_buf_.AddRecvInitialMetadata(context_);
+ read_ops_.RecvInitialMetadata(context_);
}
- read_buf_.AddRecvMessage(msg);
- call_.PerformOps(&read_buf_);
+ read_ops_.RecvMessage(msg);
+ call_.PerformOps(&read_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
- finish_buf_.Reset(tag);
+ finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- finish_buf_.AddRecvInitialMetadata(context_);
+ finish_ops_.RecvInitialMetadata(context_);
}
- finish_buf_.AddClientRecvStatus(context_, status);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.ClientRecvStatus(context_, status);
+ call_.PerformOps(&finish_ops_);
}
private:
ClientContext* context_;
Call call_;
- CallOpBuffer init_buf_;
- CallOpBuffer meta_buf_;
- CallOpBuffer read_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+ init_ops_;
+ CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
};
template <class W>
@@ -463,56 +473,57 @@ class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface,
template <class W>
class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
public:
+ template <class R>
ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
- grpc::protobuf::Message* response, void* tag)
- : context_(context),
- response_(response),
- call_(channel->CreateCall(method, context, cq)) {
- init_buf_.Reset(tag);
- init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
- call_.PerformOps(&init_buf_);
+ R* response, void* tag)
+ : context_(context), call_(channel->CreateCall(method, context, cq)) {
+ finish_ops_.RecvMessage(response);
+
+ init_ops_.set_output_tag(tag);
+ init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+ call_.PerformOps(&init_ops_);
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_buf_.Reset(tag);
- meta_buf_.AddRecvInitialMetadata(context_);
- call_.PerformOps(&meta_buf_);
+ meta_ops_.set_output_tag(tag);
+ meta_ops_.RecvInitialMetadata(context_);
+ call_.PerformOps(&meta_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
- write_buf_.Reset(tag);
- write_buf_.AddSendMessage(msg);
- call_.PerformOps(&write_buf_);
+ write_ops_.set_output_tag(tag);
+ // TODO(ctiller): don't assert
+ GPR_ASSERT(write_ops_.SendMessage(msg).ok());
+ call_.PerformOps(&write_ops_);
}
void WritesDone(void* tag) GRPC_OVERRIDE {
- writes_done_buf_.Reset(tag);
- writes_done_buf_.AddClientSendClose();
- call_.PerformOps(&writes_done_buf_);
+ writes_done_ops_.set_output_tag(tag);
+ writes_done_ops_.ClientSendClose();
+ call_.PerformOps(&writes_done_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
- finish_buf_.Reset(tag);
+ finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- finish_buf_.AddRecvInitialMetadata(context_);
+ finish_ops_.RecvInitialMetadata(context_);
}
- finish_buf_.AddRecvMessage(response_);
- finish_buf_.AddClientRecvStatus(context_, status);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.ClientRecvStatus(context_, status);
+ call_.PerformOps(&finish_ops_);
}
private:
ClientContext* context_;
- grpc::protobuf::Message* const response_;
Call call_;
- CallOpBuffer init_buf_;
- CallOpBuffer meta_buf_;
- CallOpBuffer write_buf_;
- CallOpBuffer writes_done_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata> init_ops_;
+ CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+ CallOpSet<CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpClientSendClose> writes_done_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
+ CallOpClientRecvStatus> finish_ops_;
};
// Client-side interface for bi-directional streaming.
@@ -532,58 +543,59 @@ class ClientAsyncReaderWriter GRPC_FINAL
const RpcMethod& method, ClientContext* context,
void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_buf_.Reset(tag);
- init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
- call_.PerformOps(&init_buf_);
+ init_ops_.set_output_tag(tag);
+ init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+ call_.PerformOps(&init_ops_);
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_buf_.Reset(tag);
- meta_buf_.AddRecvInitialMetadata(context_);
- call_.PerformOps(&meta_buf_);
+ meta_ops_.set_output_tag(tag);
+ meta_ops_.RecvInitialMetadata(context_);
+ call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
- read_buf_.Reset(tag);
+ read_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- read_buf_.AddRecvInitialMetadata(context_);
+ read_ops_.RecvInitialMetadata(context_);
}
- read_buf_.AddRecvMessage(msg);
- call_.PerformOps(&read_buf_);
+ read_ops_.RecvMessage(msg);
+ call_.PerformOps(&read_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
- write_buf_.Reset(tag);
- write_buf_.AddSendMessage(msg);
- call_.PerformOps(&write_buf_);
+ write_ops_.set_output_tag(tag);
+ // TODO(ctiller): don't assert
+ GPR_ASSERT(write_ops_.SendMessage(msg).ok());
+ call_.PerformOps(&write_ops_);
}
void WritesDone(void* tag) GRPC_OVERRIDE {
- writes_done_buf_.Reset(tag);
- writes_done_buf_.AddClientSendClose();
- call_.PerformOps(&writes_done_buf_);
+ writes_done_ops_.set_output_tag(tag);
+ writes_done_ops_.ClientSendClose();
+ call_.PerformOps(&writes_done_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
- finish_buf_.Reset(tag);
+ finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- finish_buf_.AddRecvInitialMetadata(context_);
+ finish_ops_.RecvInitialMetadata(context_);
}
- finish_buf_.AddClientRecvStatus(context_, status);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.ClientRecvStatus(context_, status);
+ call_.PerformOps(&finish_ops_);
}
private:
ClientContext* context_;
Call call_;
- CallOpBuffer init_buf_;
- CallOpBuffer meta_buf_;
- CallOpBuffer read_buf_;
- CallOpBuffer write_buf_;
- CallOpBuffer writes_done_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata> init_ops_;
+ CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
+ CallOpSet<CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpClientSendClose> writes_done_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
};
template <class W, class R>
@@ -596,41 +608,44 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- meta_buf_.Reset(tag);
- meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ meta_ops_.set_output_tag(tag);
+ meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_.PerformOps(&meta_buf_);
+ call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
- read_buf_.Reset(tag);
- read_buf_.AddRecvMessage(msg);
- call_.PerformOps(&read_buf_);
+ read_ops_.set_output_tag(tag);
+ read_ops_.RecvMessage(msg);
+ call_.PerformOps(&read_ops_);
}
void Finish(const W& msg, const Status& status, void* tag) {
- finish_buf_.Reset(tag);
+ finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (status.ok()) {
- finish_buf_.AddSendMessage(msg);
+ finish_ops_.ServerSendStatus(
+ ctx_->trailing_metadata_,
+ finish_ops_.SendMessage(msg));
+ } else {
+ finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
}
- finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
- call_.PerformOps(&finish_buf_);
+ call_.PerformOps(&finish_ops_);
}
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.ok());
- finish_buf_.Reset(tag);
+ finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_ops_);
}
private:
@@ -638,9 +653,10 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_;
ServerContext* ctx_;
- CallOpBuffer meta_buf_;
- CallOpBuffer read_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+ CallOpSet<CallOpRecvMessage<R>> read_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus> finish_ops_;
};
template <class W>
@@ -653,30 +669,31 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- meta_buf_.Reset(tag);
- meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ meta_ops_.set_output_tag(tag);
+ meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_.PerformOps(&meta_buf_);
+ call_.PerformOps(&meta_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
- write_buf_.Reset(tag);
+ write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- write_buf_.AddSendMessage(msg);
- call_.PerformOps(&write_buf_);
+ // TODO(ctiller): don't assert
+ GPR_ASSERT(write_ops_.SendMessage(msg).ok());
+ call_.PerformOps(&write_ops_);
}
void Finish(const Status& status, void* tag) {
- finish_buf_.Reset(tag);
+ finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_ops_);
}
private:
@@ -684,9 +701,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_;
ServerContext* ctx_;
- CallOpBuffer meta_buf_;
- CallOpBuffer write_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
};
// Server-side interface for bi-directional streaming.
@@ -701,36 +718,37 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- meta_buf_.Reset(tag);
- meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ meta_ops_.set_output_tag(tag);
+ meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_.PerformOps(&meta_buf_);
+ call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
- read_buf_.Reset(tag);
- read_buf_.AddRecvMessage(msg);
- call_.PerformOps(&read_buf_);
+ read_ops_.set_output_tag(tag);
+ read_ops_.RecvMessage(msg);
+ call_.PerformOps(&read_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
- write_buf_.Reset(tag);
+ write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- write_buf_.AddSendMessage(msg);
- call_.PerformOps(&write_buf_);
+ // TODO(ctiller): don't assert
+ GPR_ASSERT(write_ops_.SendMessage(msg).ok());
+ call_.PerformOps(&write_ops_);
}
void Finish(const Status& status, void* tag) {
- finish_buf_.Reset(tag);
+ finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_ops_);
}
private:
@@ -738,10 +756,10 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_;
ServerContext* ctx_;
- CallOpBuffer meta_buf_;
- CallOpBuffer read_buf_;
- CallOpBuffer write_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+ CallOpSet<CallOpRecvMessage<R>> read_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
};
} // namespace grpc
diff --git a/include/grpc/support/port_platform.h b/include/grpc/support/port_platform.h
index a695acf205..4fbce9c8d9 100644
--- a/include/grpc/support/port_platform.h
+++ b/include/grpc/support/port_platform.h
@@ -223,7 +223,9 @@
#endif
/* Validate platform combinations */
-#if defined(GPR_GCC_ATOMIC) + defined(GPR_GCC_SYNC) + defined(GPR_WIN32_ATOMIC) != 1
+#if defined(GPR_GCC_ATOMIC) + defined(GPR_GCC_SYNC) + \
+ defined(GPR_WIN32_ATOMIC) != \
+ 1
#error Must define exactly one of GPR_GCC_ATOMIC, GPR_GCC_SYNC, GPR_WIN32_ATOMIC
#endif
@@ -231,7 +233,9 @@
#error Must define exactly one of GPR_ARCH_32, GPR_ARCH_64
#endif
-#if defined(GPR_CPU_LINUX) + defined(GPR_CPU_POSIX) + defined(GPR_WIN32) + defined(GPR_CPU_IPHONE) + defined(GPR_CPU_CUSTOM) != 1
+#if defined(GPR_CPU_LINUX) + defined(GPR_CPU_POSIX) + defined(GPR_WIN32) + \
+ defined(GPR_CPU_IPHONE) + defined(GPR_CPU_CUSTOM) != \
+ 1
#error Must define exactly one of GPR_CPU_LINUX, GPR_CPU_POSIX, GPR_WIN32, GPR_CPU_IPHONE, GPR_CPU_CUSTOM
#endif
@@ -239,11 +243,15 @@
#error Must define GPR_POSIX_SOCKET to use GPR_POSIX_MULTIPOLL_WITH_POLL
#endif
-#if defined(GPR_POSIX_SOCKET) + defined(GPR_WINSOCK_SOCKET) + defined(GPR_CUSTOM_SOCKET) != 1
+#if defined(GPR_POSIX_SOCKET) + defined(GPR_WINSOCK_SOCKET) + \
+ defined(GPR_CUSTOM_SOCKET) != \
+ 1
#error Must define exactly one of GPR_POSIX_SOCKET, GPR_WINSOCK_SOCKET, GPR_CUSTOM_SOCKET
#endif
-#if defined(GPR_MSVC_TLS) + defined(GPR_GCC_TLS) + defined(GPR_PTHREAD_TLS) + defined(GPR_CUSTOM_TLS) != 1
+#if defined(GPR_MSVC_TLS) + defined(GPR_GCC_TLS) + defined(GPR_PTHREAD_TLS) + \
+ defined(GPR_CUSTOM_TLS) != \
+ 1
#error Must define exactly one of GPR_MSVC_TLS, GPR_GCC_TLS, GPR_PTHREAD_TLS, GPR_CUSTOM_TLS
#endif
@@ -266,4 +274,12 @@ typedef uintptr_t gpr_uintptr;
power of two */
#define GPR_MAX_ALIGNMENT 16
-#endif /* GRPC_SUPPORT_PORT_PLATFORM_H */
+#ifndef GRPC_MUST_USE_RESULT
+#ifdef __GNUC__
+#define GRPC_MUST_USE_RESULT __attribute__((warn_unused_result))
+#else
+#define GRPC_MUST_USE_RESULT
+#endif
+#endif
+
+#endif /* GRPC_SUPPORT_PORT_PLATFORM_H */
diff --git a/src/compiler/config.h b/src/compiler/config.h
index e81de8d6c8..cd52aca57d 100644
--- a/src/compiler/config.h
+++ b/src/compiler/config.h
@@ -35,6 +35,7 @@
#define SRC_COMPILER_CONFIG_H
#include <grpc++/config.h>
+#include <grpc++/config_protobuf.h>
#ifndef GRPC_CUSTOM_DESCRIPTOR
#include <google/protobuf/descriptor.h>
@@ -48,7 +49,8 @@
#ifndef GRPC_CUSTOM_CODEGENERATOR
#include <google/protobuf/compiler/code_generator.h>
#define GRPC_CUSTOM_CODEGENERATOR ::google::protobuf::compiler::CodeGenerator
-#define GRPC_CUSTOM_GENERATORCONTEXT ::google::protobuf::compiler::GeneratorContext
+#define GRPC_CUSTOM_GENERATORCONTEXT \
+ ::google::protobuf::compiler::GeneratorContext
#endif
#ifndef GRPC_CUSTOM_PRINTER
@@ -57,7 +59,8 @@
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#define GRPC_CUSTOM_PRINTER ::google::protobuf::io::Printer
#define GRPC_CUSTOM_CODEDOUTPUTSTREAM ::google::protobuf::io::CodedOutputStream
-#define GRPC_CUSTOM_STRINGOUTPUTSTREAM ::google::protobuf::io::StringOutputStream
+#define GRPC_CUSTOM_STRINGOUTPUTSTREAM \
+ ::google::protobuf::io::StringOutputStream
#endif
#ifndef GRPC_CUSTOM_PLUGINMAIN
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index 6cd615019b..75659947df 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -97,7 +97,8 @@ grpc::string GetHeaderPrologue(const grpc::protobuf::FileDescriptor *file,
vars["filename_base"] = grpc_generator::StripProto(file->name());
printer.Print(vars, "// Generated by the gRPC protobuf plugin.\n");
- printer.Print(vars, "// If you make any local change, they will be lost.\n");
+ printer.Print(vars,
+ "// If you make any local change, they will be lost.\n");
printer.Print(vars, "// source: $filename$\n");
printer.Print(vars, "#ifndef GRPC_$filename_identifier$__INCLUDED\n");
printer.Print(vars, "#define GRPC_$filename_identifier$__INCLUDED\n");
@@ -113,6 +114,7 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file,
grpc::string temp =
"#include <grpc++/impl/internal_stub.h>\n"
"#include <grpc++/impl/rpc_method.h>\n"
+ "#include <grpc++/impl/proto_utils.h>\n"
"#include <grpc++/impl/service_type.h>\n"
"#include <grpc++/async_unary_call.h>\n"
"#include <grpc++/status.h>\n"
@@ -141,10 +143,10 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file,
return temp;
}
-void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
- const grpc::protobuf::MethodDescriptor *method,
- std::map<grpc::string, grpc::string> *vars,
- bool is_public) {
+void PrintHeaderClientMethodInterfaces(
+ grpc::protobuf::io::Printer *printer,
+ const grpc::protobuf::MethodDescriptor *method,
+ std::map<grpc::string, grpc::string> *vars, bool is_public) {
(*vars)["Method"] = method->name();
(*vars)["Request"] =
grpc_cpp_generator::ClassName(method->input_type(), true);
@@ -157,19 +159,17 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
*vars,
"virtual ::grpc::Status $Method$(::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response) = 0;\n");
- printer->Print(
- *vars,
- "std::unique_ptr< "
- "::grpc::ClientAsyncResponseReaderInterface< $Response$>> "
- "Async$Method$(::grpc::ClientContext* context, "
- "const $Request$& request, "
- "::grpc::CompletionQueue* cq) {\n");
+ printer->Print(*vars,
+ "std::unique_ptr< "
+ "::grpc::ClientAsyncResponseReaderInterface< $Response$>> "
+ "Async$Method$(::grpc::ClientContext* context, "
+ "const $Request$& request, "
+ "::grpc::CompletionQueue* cq) {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< "
- "::grpc::ClientAsyncResponseReaderInterface< $Response$>>("
- "Async$Method$Raw(context, request, cq));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< "
+ "::grpc::ClientAsyncResponseReaderInterface< $Response$>>("
+ "Async$Method$Raw(context, request, cq));\n");
printer->Outdent();
printer->Print("}\n");
} else if (ClientOnlyStreaming(method)) {
@@ -188,14 +188,14 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
printer->Print(
*vars,
"std::unique_ptr< ::grpc::ClientAsyncWriterInterface< $Request$>>"
- " Async$Method$(::grpc::ClientContext* context, $Response$* response, "
+ " Async$Method$(::grpc::ClientContext* context, $Response$* "
+ "response, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< "
- "::grpc::ClientAsyncWriterInterface< $Request$>>("
- "Async$Method$Raw(context, response, cq, tag));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< "
+ "::grpc::ClientAsyncWriterInterface< $Request$>>("
+ "Async$Method$Raw(context, response, cq, tag));\n");
printer->Outdent();
printer->Print("}\n");
} else if (ServerOnlyStreaming(method)) {
@@ -218,18 +218,17 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
"::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< "
- "::grpc::ClientAsyncReaderInterface< $Response$>>("
- "Async$Method$Raw(context, request, cq, tag));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< "
+ "::grpc::ClientAsyncReaderInterface< $Response$>>("
+ "Async$Method$Raw(context, request, cq, tag));\n");
printer->Outdent();
printer->Print("}\n");
} else if (BidiStreaming(method)) {
- printer->Print(
- *vars,
- "std::unique_ptr< ::grpc::ClientReaderWriterInterface< $Request$, $Response$>> "
- "$Method$(::grpc::ClientContext* context) {\n");
+ printer->Print(*vars,
+ "std::unique_ptr< ::grpc::ClientReaderWriterInterface< "
+ "$Request$, $Response$>> "
+ "$Method$(::grpc::ClientContext* context) {\n");
printer->Indent();
printer->Print(
*vars,
@@ -267,12 +266,11 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
"virtual ::grpc::ClientWriterInterface< $Request$>*"
" $Method$Raw("
"::grpc::ClientContext* context, $Response$* response) = 0;\n");
- printer->Print(
- *vars,
- "virtual ::grpc::ClientAsyncWriterInterface< $Request$>*"
- " Async$Method$Raw(::grpc::ClientContext* context, "
- "$Response$* response, "
- "::grpc::CompletionQueue* cq, void* tag) = 0;\n");
+ printer->Print(*vars,
+ "virtual ::grpc::ClientAsyncWriterInterface< $Request$>*"
+ " Async$Method$Raw(::grpc::ClientContext* context, "
+ "$Response$* response, "
+ "::grpc::CompletionQueue* cq, void* tag) = 0;\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
@@ -285,16 +283,15 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
"::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) = 0;\n");
} else if (BidiStreaming(method)) {
- printer->Print(
- *vars,
- "virtual ::grpc::ClientReaderWriterInterface< $Request$, $Response$>* "
- "$Method$Raw(::grpc::ClientContext* context) = 0;\n");
- printer->Print(
- *vars,
- "virtual ::grpc::ClientAsyncReaderWriterInterface< "
- "$Request$, $Response$>* "
- "Async$Method$Raw(::grpc::ClientContext* context, "
- "::grpc::CompletionQueue* cq, void* tag) = 0;\n");
+ printer->Print(*vars,
+ "virtual ::grpc::ClientReaderWriterInterface< $Request$, "
+ "$Response$>* "
+ "$Method$Raw(::grpc::ClientContext* context) = 0;\n");
+ printer->Print(*vars,
+ "virtual ::grpc::ClientAsyncReaderWriterInterface< "
+ "$Request$, $Response$>* "
+ "Async$Method$Raw(::grpc::ClientContext* context, "
+ "::grpc::CompletionQueue* cq, void* tag) = 0;\n");
}
}
}
@@ -321,11 +318,10 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer,
"const $Request$& request, "
"::grpc::CompletionQueue* cq) {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< "
- "::grpc::ClientAsyncResponseReader< $Response$>>("
- "Async$Method$Raw(context, request, cq));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< "
+ "::grpc::ClientAsyncResponseReader< $Response$>>("
+ "Async$Method$Raw(context, request, cq));\n");
printer->Outdent();
printer->Print("}\n");
} else if (ClientOnlyStreaming(method)) {
@@ -335,17 +331,16 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer,
" $Method$("
"::grpc::ClientContext* context, $Response$* response) {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< ::grpc::ClientWriter< $Request$>>"
- "($Method$Raw(context, response));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< ::grpc::ClientWriter< $Request$>>"
+ "($Method$Raw(context, response));\n");
printer->Outdent();
printer->Print("}\n");
- printer->Print(
- *vars,
- "std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>>"
- " Async$Method$(::grpc::ClientContext* context, $Response$* response, "
- "::grpc::CompletionQueue* cq, void* tag) {\n");
+ printer->Print(*vars,
+ "std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>>"
+ " Async$Method$(::grpc::ClientContext* context, "
+ "$Response$* response, "
+ "::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Indent();
printer->Print(
*vars,
@@ -385,53 +380,47 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer,
"std::unique_ptr< ::grpc::ClientReaderWriter< $Request$, $Response$>>"
" $Method$(::grpc::ClientContext* context) {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< "
- "::grpc::ClientReaderWriter< $Request$, $Response$>>("
- "$Method$Raw(context));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< "
+ "::grpc::ClientReaderWriter< $Request$, $Response$>>("
+ "$Method$Raw(context));\n");
printer->Outdent();
printer->Print("}\n");
- printer->Print(
- *vars,
- "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< "
- "$Request$, $Response$>> "
- "Async$Method$(::grpc::ClientContext* context, "
- "::grpc::CompletionQueue* cq, void* tag) {\n");
+ printer->Print(*vars,
+ "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< "
+ "$Request$, $Response$>> "
+ "Async$Method$(::grpc::ClientContext* context, "
+ "::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< "
- "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>>("
- "Async$Method$Raw(context, cq, tag));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< "
+ "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>>("
+ "Async$Method$Raw(context, cq, tag));\n");
printer->Outdent();
printer->Print("}\n");
}
} else {
if (NoStreaming(method)) {
- printer->Print(
- *vars,
- "::grpc::ClientAsyncResponseReader< $Response$>* "
- "Async$Method$Raw(::grpc::ClientContext* context, "
- "const $Request$& request, "
- "::grpc::CompletionQueue* cq) GRPC_OVERRIDE;\n");
+ printer->Print(*vars,
+ "::grpc::ClientAsyncResponseReader< $Response$>* "
+ "Async$Method$Raw(::grpc::ClientContext* context, "
+ "const $Request$& request, "
+ "::grpc::CompletionQueue* cq) GRPC_OVERRIDE;\n");
} else if (ClientOnlyStreaming(method)) {
- printer->Print(
- *vars,
- "::grpc::ClientWriter< $Request$>* $Method$Raw("
- "::grpc::ClientContext* context, $Response$* response) "
- "GRPC_OVERRIDE;\n");
+ printer->Print(*vars,
+ "::grpc::ClientWriter< $Request$>* $Method$Raw("
+ "::grpc::ClientContext* context, $Response$* response) "
+ "GRPC_OVERRIDE;\n");
printer->Print(
*vars,
"::grpc::ClientAsyncWriter< $Request$>* Async$Method$Raw("
"::grpc::ClientContext* context, $Response$* response, "
"::grpc::CompletionQueue* cq, void* tag) GRPC_OVERRIDE;\n");
} else if (ServerOnlyStreaming(method)) {
- printer->Print(
- *vars,
- "::grpc::ClientReader< $Response$>* $Method$Raw("
- "::grpc::ClientContext* context, const $Request$& request)"
- " GRPC_OVERRIDE;\n");
+ printer->Print(*vars,
+ "::grpc::ClientReader< $Response$>* $Method$Raw("
+ "::grpc::ClientContext* context, const $Request$& request)"
+ " GRPC_OVERRIDE;\n");
printer->Print(
*vars,
"::grpc::ClientAsyncReader< $Response$>* Async$Method$Raw("
@@ -629,7 +618,7 @@ grpc::string GetHeaderServices(const grpc::protobuf::FileDescriptor *file,
const Parameters &params) {
grpc::string output;
{
- // Scope the output stream so it closes and finalizes output to the string.
+ // Scope the output stream so it closes and finalizes output to the string.
grpc::protobuf::io::StringOutputStream output_stream(&output);
grpc::protobuf::io::Printer printer(&output_stream, '$');
std::map<grpc::string, grpc::string> vars;
@@ -693,7 +682,8 @@ grpc::string GetSourcePrologue(const grpc::protobuf::FileDescriptor *file,
vars["filename_base"] = grpc_generator::StripProto(file->name());
printer.Print(vars, "// Generated by the gRPC protobuf plugin.\n");
- printer.Print(vars, "// If you make any local change, they will be lost.\n");
+ printer.Print(vars,
+ "// If you make any local change, they will be lost.\n");
printer.Print(vars, "// source: $filename$\n\n");
printer.Print(vars, "#include \"$filename_base$.pb.h\"\n");
printer.Print(vars, "#include \"$filename_base$.grpc.pb.h\"\n");
@@ -1056,8 +1046,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" new ::grpc::RpcMethodHandler< $ns$$Service$::Service, "
"$Request$, "
"$Response$>(\n"
- " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
- " new $Request$, new $Response$));\n");
+ " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
@@ -1066,8 +1055,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::CLIENT_STREAMING,\n"
" new ::grpc::ClientStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
- " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
- " new $Request$, new $Response$));\n");
+ " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
@@ -1076,8 +1064,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::SERVER_STREAMING,\n"
" new ::grpc::ServerStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
- " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
- " new $Request$, new $Response$));\n");
+ " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
@@ -1086,8 +1073,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::BIDI_STREAMING,\n"
" new ::grpc::BidiStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
- " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
- " new $Request$, new $Response$));\n");
+ " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
}
}
printer->Print("return service_;\n");
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index 475a20d883..6e6278cb05 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -41,7 +41,6 @@
#include <grpc/support/slice.h>
#include "src/core/profiling/timers.h"
-#include "src/cpp/proto/proto_utils.h"
#include <grpc++/channel_arguments.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
@@ -75,14 +74,14 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
return Call(c_call, this, cq);
}
-void Channel::PerformOpsOnCall(CallOpBuffer* buf, Call* call) {
+void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
static const size_t MAX_OPS = 8;
- size_t nops = MAX_OPS;
- grpc_op ops[MAX_OPS];
+ size_t nops = 0;
+ grpc_op cops[MAX_OPS];
GRPC_TIMER_BEGIN(GRPC_PTAG_CPP_PERFORM_OPS, call->call());
- buf->FillOps(ops, &nops);
+ ops->FillOps(cops, &nops);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_batch(call->call(), ops, nops, buf));
+ grpc_call_start_batch(call->call(), cops, nops, ops));
GRPC_TIMER_END(GRPC_PTAG_CPP_PERFORM_OPS, call->call());
}
diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h
index cd239247c8..9108713c58 100644
--- a/src/cpp/client/channel.h
+++ b/src/cpp/client/channel.h
@@ -44,22 +44,22 @@ struct grpc_channel;
namespace grpc {
class Call;
-class CallOpBuffer;
+class CallOpSetInterface;
class ChannelArguments;
class CompletionQueue;
class Credentials;
class StreamContextInterface;
-class Channel GRPC_FINAL : public GrpcLibrary,
- public ChannelInterface {
+class Channel GRPC_FINAL : public GrpcLibrary, public ChannelInterface {
public:
Channel(const grpc::string& target, grpc_channel* c_channel);
~Channel() GRPC_OVERRIDE;
- virtual void *RegisterMethod(const char *method) GRPC_OVERRIDE;
+ virtual void* RegisterMethod(const char* method) GRPC_OVERRIDE;
virtual Call CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) GRPC_OVERRIDE;
- virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE;
+ virtual void PerformOpsOnCall(CallOpSetInterface* ops,
+ Call* call) GRPC_OVERRIDE;
private:
const grpc::string target_;
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
index edce6396bd..0a5c976e01 100644
--- a/src/cpp/common/call.cc
+++ b/src/cpp/common/call.cc
@@ -39,107 +39,32 @@
#include <grpc++/channel_interface.h>
#include "src/core/profiling/timers.h"
-#include "src/cpp/proto/proto_utils.h"
namespace grpc {
-CallOpBuffer::CallOpBuffer()
- : return_tag_(this),
- send_initial_metadata_(false),
- initial_metadata_count_(0),
- initial_metadata_(nullptr),
- recv_initial_metadata_(nullptr),
- send_message_(nullptr),
- send_message_buffer_(nullptr),
- send_buf_(nullptr),
- recv_message_(nullptr),
- recv_message_buffer_(nullptr),
- recv_buf_(nullptr),
- max_message_size_(-1),
- client_send_close_(false),
- recv_trailing_metadata_(nullptr),
- recv_status_(nullptr),
- status_code_(GRPC_STATUS_OK),
- status_details_(nullptr),
- status_details_capacity_(0),
- send_status_available_(false),
- send_status_code_(GRPC_STATUS_OK),
- trailing_metadata_count_(0),
- trailing_metadata_(nullptr),
- cancelled_buf_(0),
- recv_closed_(nullptr) {
- memset(&recv_trailing_metadata_arr_, 0, sizeof(recv_trailing_metadata_arr_));
- memset(&recv_initial_metadata_arr_, 0, sizeof(recv_initial_metadata_arr_));
- recv_trailing_metadata_arr_.metadata = nullptr;
- recv_initial_metadata_arr_.metadata = nullptr;
-}
-
-void CallOpBuffer::Reset(void* next_return_tag) {
- return_tag_ = next_return_tag;
-
- send_initial_metadata_ = false;
- initial_metadata_count_ = 0;
- gpr_free(initial_metadata_);
-
- recv_initial_metadata_ = nullptr;
- recv_initial_metadata_arr_.count = 0;
-
- if (send_buf_ && send_message_) {
- grpc_byte_buffer_destroy(send_buf_);
- }
- send_message_ = nullptr;
- send_message_buffer_ = nullptr;
- send_buf_ = nullptr;
-
- got_message = false;
- if (recv_buf_ && recv_message_) {
- grpc_byte_buffer_destroy(recv_buf_);
- }
- recv_message_ = nullptr;
- recv_message_buffer_ = nullptr;
- recv_buf_ = nullptr;
-
- client_send_close_ = false;
-
- recv_trailing_metadata_ = nullptr;
- recv_status_ = nullptr;
- recv_trailing_metadata_arr_.count = 0;
-
- status_code_ = GRPC_STATUS_OK;
-
- send_status_available_ = false;
- send_status_code_ = GRPC_STATUS_OK;
- send_status_details_.clear();
- trailing_metadata_count_ = 0;
- trailing_metadata_ = nullptr;
-
- recv_closed_ = nullptr;
-}
-
-CallOpBuffer::~CallOpBuffer() {
- gpr_free(status_details_);
- gpr_free(recv_initial_metadata_arr_.metadata);
- gpr_free(recv_trailing_metadata_arr_.metadata);
- if (recv_buf_ && recv_message_) {
- grpc_byte_buffer_destroy(recv_buf_);
- }
- if (send_buf_ && send_message_) {
- grpc_byte_buffer_destroy(send_buf_);
+void FillMetadataMap(grpc_metadata_array* arr,
+ std::multimap<grpc::string, grpc::string>* metadata) {
+ for (size_t i = 0; i < arr->count; i++) {
+ // TODO(yangg) handle duplicates?
+ metadata->insert(std::pair<grpc::string, grpc::string>(
+ arr->metadata[i].key,
+ grpc::string(arr->metadata[i].value, arr->metadata[i].value_length)));
}
+ grpc_metadata_array_destroy(arr);
+ grpc_metadata_array_init(arr);
}
-namespace {
// TODO(yangg) if the map is changed before we send, the pointers will be a
// mess. Make sure it does not happen.
grpc_metadata* FillMetadataArray(
- std::multimap<grpc::string, grpc::string>* metadata) {
- if (metadata->empty()) {
+ const std::multimap<grpc::string, grpc::string>& metadata) {
+ if (metadata.empty()) {
return nullptr;
}
grpc_metadata* metadata_array =
- (grpc_metadata*)gpr_malloc(metadata->size() * sizeof(grpc_metadata));
+ (grpc_metadata*)gpr_malloc(metadata.size() * sizeof(grpc_metadata));
size_t i = 0;
- for (auto iter = metadata->cbegin(); iter != metadata->cend(); ++iter, ++i) {
+ for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) {
metadata_array[i].key = iter->first.c_str();
metadata_array[i].value = iter->second.c_str();
metadata_array[i].value_length = iter->second.size();
@@ -147,206 +72,6 @@ grpc_metadata* FillMetadataArray(
return metadata_array;
}
-void FillMetadataMap(grpc_metadata_array* arr,
- std::multimap<grpc::string, grpc::string>* metadata) {
- for (size_t i = 0; i < arr->count; i++) {
- // TODO(yangg) handle duplicates?
- metadata->insert(std::pair<grpc::string, grpc::string>(
- arr->metadata[i].key,
- grpc::string(arr->metadata[i].value, arr->metadata[i].value_length)));
- }
- grpc_metadata_array_destroy(arr);
- grpc_metadata_array_init(arr);
-}
-} // namespace
-
-void CallOpBuffer::AddSendInitialMetadata(
- std::multimap<grpc::string, grpc::string>* metadata) {
- send_initial_metadata_ = true;
- initial_metadata_count_ = metadata->size();
- initial_metadata_ = FillMetadataArray(metadata);
-}
-
-void CallOpBuffer::AddRecvInitialMetadata(ClientContext* ctx) {
- ctx->initial_metadata_received_ = true;
- recv_initial_metadata_ = &ctx->recv_initial_metadata_;
-}
-
-void CallOpBuffer::AddSendInitialMetadata(ClientContext* ctx) {
- AddSendInitialMetadata(&ctx->send_initial_metadata_);
-}
-
-void CallOpBuffer::AddSendMessage(const grpc::protobuf::Message& message) {
- send_message_ = &message;
-}
-
-void CallOpBuffer::AddSendMessage(const ByteBuffer& message) {
- send_message_buffer_ = &message;
-}
-
-void CallOpBuffer::AddRecvMessage(grpc::protobuf::Message* message) {
- recv_message_ = message;
- recv_message_->Clear();
-}
-
-void CallOpBuffer::AddRecvMessage(ByteBuffer* message) {
- recv_message_buffer_ = message;
- recv_message_buffer_->Clear();
-}
-
-void CallOpBuffer::AddClientSendClose() { client_send_close_ = true; }
-
-void CallOpBuffer::AddServerRecvClose(bool* cancelled) {
- recv_closed_ = cancelled;
-}
-
-void CallOpBuffer::AddClientRecvStatus(ClientContext* context, Status* status) {
- recv_trailing_metadata_ = &context->trailing_metadata_;
- recv_status_ = status;
-}
-
-void CallOpBuffer::AddServerSendStatus(
- std::multimap<grpc::string, grpc::string>* metadata, const Status& status) {
- if (metadata != NULL) {
- trailing_metadata_count_ = metadata->size();
- trailing_metadata_ = FillMetadataArray(metadata);
- } else {
- trailing_metadata_count_ = 0;
- }
- send_status_available_ = true;
- send_status_code_ = static_cast<grpc_status_code>(status.error_code());
- send_status_details_ = status.error_message();
-}
-
-void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) {
- *nops = 0;
- if (send_initial_metadata_) {
- ops[*nops].op = GRPC_OP_SEND_INITIAL_METADATA;
- ops[*nops].data.send_initial_metadata.count = initial_metadata_count_;
- ops[*nops].data.send_initial_metadata.metadata = initial_metadata_;
- ops[*nops].flags = 0;
- (*nops)++;
- }
- if (recv_initial_metadata_) {
- ops[*nops].op = GRPC_OP_RECV_INITIAL_METADATA;
- ops[*nops].data.recv_initial_metadata = &recv_initial_metadata_arr_;
- ops[*nops].flags = 0;
- (*nops)++;
- }
- if (send_message_ || send_message_buffer_) {
- if (send_message_) {
- GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_SERIALIZE, 0);
- bool success = SerializeProto(*send_message_, &send_buf_);
- if (!success) {
- abort();
- // TODO handle parse failure
- }
- GRPC_TIMER_END(GRPC_PTAG_PROTO_SERIALIZE, 0);
- } else {
- send_buf_ = send_message_buffer_->buffer();
- }
- ops[*nops].op = GRPC_OP_SEND_MESSAGE;
- ops[*nops].data.send_message = send_buf_;
- ops[*nops].flags = 0;
- (*nops)++;
- }
- if (recv_message_ || recv_message_buffer_) {
- ops[*nops].op = GRPC_OP_RECV_MESSAGE;
- ops[*nops].data.recv_message = &recv_buf_;
- ops[*nops].flags = 0;
- (*nops)++;
- }
- if (client_send_close_) {
- ops[*nops].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
- ops[*nops].flags = 0;
- (*nops)++;
- }
- if (recv_status_) {
- ops[*nops].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
- ops[*nops].data.recv_status_on_client.trailing_metadata =
- &recv_trailing_metadata_arr_;
- ops[*nops].data.recv_status_on_client.status = &status_code_;
- ops[*nops].data.recv_status_on_client.status_details = &status_details_;
- ops[*nops].data.recv_status_on_client.status_details_capacity =
- &status_details_capacity_;
- ops[*nops].flags = 0;
- (*nops)++;
- }
- if (send_status_available_) {
- ops[*nops].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
- ops[*nops].data.send_status_from_server.trailing_metadata_count =
- trailing_metadata_count_;
- ops[*nops].data.send_status_from_server.trailing_metadata =
- trailing_metadata_;
- ops[*nops].data.send_status_from_server.status = send_status_code_;
- ops[*nops].data.send_status_from_server.status_details =
- send_status_details_.empty() ? nullptr : send_status_details_.c_str();
- ops[*nops].flags = 0;
- (*nops)++;
- }
- if (recv_closed_) {
- ops[*nops].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
- ops[*nops].data.recv_close_on_server.cancelled = &cancelled_buf_;
- ops[*nops].flags = 0;
- (*nops)++;
- }
-}
-
-bool CallOpBuffer::FinalizeResult(void** tag, bool* status) {
- // Release send buffers.
- if (send_buf_ && send_message_) {
- if (send_message_) {
- grpc_byte_buffer_destroy(send_buf_);
- }
- send_buf_ = nullptr;
- }
- if (initial_metadata_) {
- gpr_free(initial_metadata_);
- initial_metadata_ = nullptr;
- }
- if (trailing_metadata_count_) {
- gpr_free(trailing_metadata_);
- trailing_metadata_ = nullptr;
- }
- // Set user-facing tag.
- *tag = return_tag_;
- // Process received initial metadata
- if (recv_initial_metadata_) {
- FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_);
- }
- // Parse received message if any.
- if (recv_message_ || recv_message_buffer_) {
- if (recv_buf_) {
- got_message = *status;
- if (recv_message_) {
- GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, 0);
- *status = *status &&
- DeserializeProto(recv_buf_, recv_message_, max_message_size_);
- grpc_byte_buffer_destroy(recv_buf_);
- GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, 0);
- } else {
- recv_message_buffer_->set_buffer(recv_buf_);
- }
- recv_buf_ = nullptr;
- } else {
- // Read failed
- got_message = false;
- *status = false;
- }
- }
- // Parse received status.
- if (recv_status_) {
- FillMetadataMap(&recv_trailing_metadata_arr_, recv_trailing_metadata_);
- *recv_status_ = Status(
- static_cast<StatusCode>(status_code_),
- status_details_ ? grpc::string(status_details_) : grpc::string());
- }
- if (recv_closed_) {
- *recv_closed_ = cancelled_buf_ != 0;
- }
- return true;
-}
-
Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq)
: call_hook_(call_hook), cq_(cq), call_(call), max_message_size_(-1) {}
@@ -357,11 +82,11 @@ Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq,
call_(call),
max_message_size_(max_message_size) {}
-void Call::PerformOps(CallOpBuffer* buffer) {
+void Call::PerformOps(CallOpSetInterface* ops) {
if (max_message_size_ > 0) {
- buffer->set_max_message_size(max_message_size_);
+ ops->set_max_message_size(max_message_size_);
}
- call_hook_->PerformOpsOnCall(buffer, this);
+ call_hook_->PerformOpsOnCall(ops, this);
}
} // namespace grpc
diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc
index f4cf5cf17a..268e4f6d1f 100644
--- a/src/cpp/proto/proto_utils.cc
+++ b/src/cpp/proto/proto_utils.cc
@@ -31,7 +31,7 @@
*
*/
-#include "src/cpp/proto/proto_utils.h"
+#include <grpc++/impl/proto_utils.h>
#include <grpc++/config.h>
#include <grpc/grpc.h>
@@ -67,7 +67,7 @@ class GrpcBufferWriter GRPC_FINAL
slice_ = gpr_slice_malloc(block_size_);
}
*data = GPR_SLICE_START_PTR(slice_);
- byte_count_ += *size = GPR_SLICE_LENGTH(slice_);
+ byte_count_ += * size = GPR_SLICE_LENGTH(slice_);
gpr_slice_buffer_add(slice_buffer_, slice_);
return true;
}
@@ -118,7 +118,7 @@ class GrpcBufferReader GRPC_FINAL
}
gpr_slice_unref(slice_);
*data = GPR_SLICE_START_PTR(slice_);
- byte_count_ += *size = GPR_SLICE_LENGTH(slice_);
+ byte_count_ += * size = GPR_SLICE_LENGTH(slice_);
return true;
}
@@ -152,20 +152,28 @@ class GrpcBufferReader GRPC_FINAL
namespace grpc {
-bool SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) {
+Status SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) {
GrpcBufferWriter writer(bp);
- return msg.SerializeToZeroCopyStream(&writer);
+ return msg.SerializeToZeroCopyStream(&writer) ? Status::OK : Status(INVALID_ARGUMENT, "Failed to serialize message");
}
-bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
- int max_message_size) {
- if (!buffer) return false;
+Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
+ int max_message_size) {
+ if (!buffer) {
+ return Status(INVALID_ARGUMENT, "No payload");
+ }
GrpcBufferReader reader(buffer);
::grpc::protobuf::io::CodedInputStream decoder(&reader);
if (max_message_size > 0) {
decoder.SetTotalBytesLimit(max_message_size, max_message_size);
}
- return msg->ParseFromCodedStream(&decoder) && decoder.ConsumedEntireMessage();
+ if (!msg->ParseFromCodedStream(&decoder)) {
+ return Status(INVALID_ARGUMENT, msg->InitializationErrorString());
+ }
+ if (!decoder.ConsumedEntireMessage()) {
+ return Status(INVALID_ARGUMENT, "Did not read entire message");
+ }
+ return Status::OK;
}
} // namespace grpc
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 024537c34a..31b6a0ee00 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -48,7 +48,6 @@
#include <grpc++/time.h>
#include "src/core/profiling/timers.h"
-#include "src/cpp/proto/proto_utils.h"
namespace grpc {
@@ -69,16 +68,11 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
method->method_type() ==
RpcMethod::SERVER_STREAMING),
- has_response_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
- method->method_type() ==
- RpcMethod::CLIENT_STREAMING),
cq_(nullptr) {
grpc_metadata_array_init(&request_metadata_);
}
- ~SyncRequest() {
- grpc_metadata_array_destroy(&request_metadata_);
- }
+ ~SyncRequest() { grpc_metadata_array_destroy(&request_metadata_); }
static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
void* tag = nullptr;
@@ -91,9 +85,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd;
}
- void SetupRequest() {
- cq_ = grpc_completion_queue_create();
- }
+ void SetupRequest() { cq_ = grpc_completion_queue_create(); }
void TeardownRequest() {
grpc_completion_queue_destroy(cq_);
@@ -125,7 +117,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
mrd->request_metadata_.count),
has_request_payload_(mrd->has_request_payload_),
- has_response_payload_(mrd->has_response_payload_),
request_payload_(mrd->request_payload_),
method_(mrd->method_) {
ctx_.call_ = mrd->call_;
@@ -142,35 +133,10 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
}
void Run() {
- std::unique_ptr<grpc::protobuf::Message> req;
- std::unique_ptr<grpc::protobuf::Message> res;
- if (has_request_payload_) {
- GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, call_.call());
- req.reset(method_->AllocateRequestProto());
- if (!DeserializeProto(request_payload_, req.get(),
- call_.max_message_size())) {
- // FIXME(yangg) deal with deserialization failure
- cq_.Shutdown();
- return;
- }
- GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, call_.call());
- }
- if (has_response_payload_) {
- res.reset(method_->AllocateResponseProto());
- }
ctx_.BeginCompletionOp(&call_);
- auto status = method_->handler()->RunHandler(
- MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get()));
- CallOpBuffer buf;
- if (!ctx_.sent_initial_metadata_) {
- buf.AddSendInitialMetadata(&ctx_.initial_metadata_);
- }
- if (has_response_payload_) {
- buf.AddSendMessage(*res);
- }
- buf.AddServerSendStatus(&ctx_.trailing_metadata_, status);
- call_.PerformOps(&buf);
- cq_.Pluck(&buf); /* status ignored */
+ method_->handler()->RunHandler(MethodHandler::HandlerParameter(
+ &call_, &ctx_, request_payload_, call_.max_message_size()));
+ request_payload_ = nullptr;
void* ignored_tag;
bool ignored_ok;
cq_.Shutdown();
@@ -182,7 +148,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
Call call_;
ServerContext ctx_;
const bool has_request_payload_;
- const bool has_response_payload_;
grpc_byte_buffer* request_payload_;
RpcServiceMethod* const method_;
};
@@ -192,7 +157,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
void* const tag_;
bool in_flight_;
const bool has_request_payload_;
- const bool has_response_payload_;
grpc_call* call_;
gpr_timespec deadline_;
grpc_metadata_array request_metadata_;
@@ -260,9 +224,9 @@ bool Server::RegisterService(RpcService* service) {
}
bool Server::RegisterAsyncService(AsynchronousService* service) {
- GPR_ASSERT(service->dispatch_impl_ == nullptr &&
+ GPR_ASSERT(service->server_ == nullptr &&
"Can only register an asynchronous service against one server.");
- service->dispatch_impl_ = this;
+ service->server_ = this;
service->request_args_ = new void*[service->method_count_];
for (size_t i = 0; i < service->method_count_; ++i) {
void* tag = grpc_server_register_method(server_, service->method_names_[i],
@@ -328,141 +292,87 @@ void Server::Wait() {
}
}
-void Server::PerformOpsOnCall(CallOpBuffer* buf, Call* call) {
+void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
static const size_t MAX_OPS = 8;
- size_t nops = MAX_OPS;
- grpc_op ops[MAX_OPS];
- buf->FillOps(ops, &nops);
+ size_t nops = 0;
+ grpc_op cops[MAX_OPS];
+ ops->FillOps(cops, &nops);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_batch(call->call(), ops, nops, buf));
+ grpc_call_start_batch(call->call(), cops, nops, ops));
}
-class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
- public:
- AsyncRequest(Server* server, void* registered_method, ServerContext* ctx,
- grpc::protobuf::Message* request,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq, void* tag)
- : tag_(tag),
- request_(request),
- stream_(stream),
- call_cq_(call_cq),
- ctx_(ctx),
- generic_ctx_(nullptr),
- server_(server),
- call_(nullptr),
- payload_(nullptr) {
- memset(&array_, 0, sizeof(array_));
- grpc_call_details_init(&call_details_);
- GPR_ASSERT(notification_cq);
- GPR_ASSERT(call_cq);
- grpc_server_request_registered_call(
- server->server_, registered_method, &call_, &call_details_.deadline,
- &array_, request ? &payload_ : nullptr, call_cq->cq(),
- notification_cq->cq(), this);
- }
+Server::BaseAsyncRequest::BaseAsyncRequest(
+ Server* server, ServerContext* context,
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
+ : server_(server),
+ context_(context),
+ stream_(stream),
+ call_cq_(call_cq),
+ tag_(tag),
+ call_(nullptr) {
+ memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
+}
- AsyncRequest(Server* server, GenericServerContext* ctx,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq, void* tag)
- : tag_(tag),
- request_(nullptr),
- stream_(stream),
- call_cq_(call_cq),
- ctx_(nullptr),
- generic_ctx_(ctx),
- server_(server),
- call_(nullptr),
- payload_(nullptr) {
- memset(&array_, 0, sizeof(array_));
- grpc_call_details_init(&call_details_);
- GPR_ASSERT(notification_cq);
- GPR_ASSERT(call_cq);
- grpc_server_request_call(server->server_, &call_, &call_details_, &array_,
- call_cq->cq(), notification_cq->cq(), this);
- }
+Server::BaseAsyncRequest::~BaseAsyncRequest() {}
- ~AsyncRequest() {
- if (payload_) {
- grpc_byte_buffer_destroy(payload_);
+bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
+ if (*status) {
+ for (size_t i = 0; i < initial_metadata_array_.count; i++) {
+ context_->client_metadata_.insert(std::make_pair(
+ grpc::string(initial_metadata_array_.metadata[i].key),
+ grpc::string(initial_metadata_array_.metadata[i].value,
+ initial_metadata_array_.metadata[i].value +
+ initial_metadata_array_.metadata[i].value_length)));
}
- grpc_metadata_array_destroy(&array_);
}
-
- bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
- *tag = tag_;
- bool orig_status = *status;
- if (*status && request_) {
- if (payload_) {
- GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, call_);
- *status =
- DeserializeProto(payload_, request_, server_->max_message_size_);
- GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, call_);
- } else {
- *status = false;
- }
- }
- ServerContext* ctx = ctx_ ? ctx_ : generic_ctx_;
- GPR_ASSERT(ctx);
- if (*status) {
- ctx->deadline_ = call_details_.deadline;
- for (size_t i = 0; i < array_.count; i++) {
- ctx->client_metadata_.insert(std::make_pair(
- grpc::string(array_.metadata[i].key),
- grpc::string(
- array_.metadata[i].value,
- array_.metadata[i].value + array_.metadata[i].value_length)));
- }
- if (generic_ctx_) {
- // TODO(yangg) remove the copy here.
- generic_ctx_->method_ = call_details_.method;
- generic_ctx_->host_ = call_details_.host;
- gpr_free(call_details_.method);
- gpr_free(call_details_.host);
- }
- }
- ctx->call_ = call_;
- ctx->cq_ = call_cq_;
- Call call(call_, server_, call_cq_, server_->max_message_size_);
- if (orig_status && call_) {
- ctx->BeginCompletionOp(&call);
- }
- // just the pointers inside call are copied here
- stream_->BindCall(&call);
- delete this;
- return true;
+ grpc_metadata_array_destroy(&initial_metadata_array_);
+ context_->call_ = call_;
+ context_->cq_ = call_cq_;
+ Call call(call_, server_, call_cq_, server_->max_message_size_);
+ if (*status && call_) {
+ context_->BeginCompletionOp(&call);
}
+ // just the pointers inside call are copied here
+ stream_->BindCall(&call);
+ *tag = tag_;
+ delete this;
+ return true;
+}
- private:
- void* const tag_;
- grpc::protobuf::Message* const request_;
- ServerAsyncStreamingInterface* const stream_;
- CompletionQueue* const call_cq_;
- ServerContext* const ctx_;
- GenericServerContext* const generic_ctx_;
- Server* const server_;
- grpc_call* call_;
- grpc_call_details call_details_;
- grpc_metadata_array array_;
- grpc_byte_buffer* payload_;
-};
+Server::RegisteredAsyncRequest::RegisteredAsyncRequest(
+ Server* server, ServerContext* context,
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
+ : BaseAsyncRequest(server, context, stream, call_cq, tag) {}
+
+void Server::RegisteredAsyncRequest::IssueRequest(
+ void* registered_method, grpc_byte_buffer** payload,
+ ServerCompletionQueue* notification_cq) {
+ grpc_server_request_registered_call(
+ server_->server_, registered_method, &call_, &context_->deadline_,
+ &initial_metadata_array_, payload, call_cq_->cq(), notification_cq->cq(),
+ this);
+}
-void Server::RequestAsyncCall(void* registered_method, ServerContext* context,
- grpc::protobuf::Message* request,
- ServerAsyncStreamingInterface* stream,
- CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq,
- void* tag) {
- new AsyncRequest(this, registered_method, context, request, stream, call_cq,
- notification_cq, tag);
+Server::GenericAsyncRequest::GenericAsyncRequest(
+ Server* server, GenericServerContext* context,
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag)
+ : BaseAsyncRequest(server, context, stream, call_cq, tag) {
+ grpc_call_details_init(&call_details_);
+ GPR_ASSERT(notification_cq);
+ GPR_ASSERT(call_cq);
+ grpc_server_request_call(server->server_, &call_, &call_details_,
+ &initial_metadata_array_, call_cq->cq(),
+ notification_cq->cq(), this);
}
-void Server::RequestAsyncGenericCall(GenericServerContext* context,
- ServerAsyncStreamingInterface* stream,
- CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq,
- void* tag) {
- new AsyncRequest(this, context, stream, call_cq, notification_cq, tag);
+bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) {
+ // TODO(yangg) remove the copy here.
+ static_cast<GenericServerContext*>(context_)->method_ = call_details_.method;
+ static_cast<GenericServerContext*>(context_)->host_ = call_details_.host;
+ gpr_free(call_details_.method);
+ gpr_free(call_details_.host);
+ return BaseAsyncRequest::FinalizeResult(tag, status);
}
void Server::ScheduleCallback() {
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 6b5e41d0a8..699895a3cf 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -43,12 +43,12 @@ namespace grpc {
// CompletionOp
-class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer {
+class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
- CompletionOp() : refs_(2), finalized_(false), cancelled_(false) {
- AddServerRecvClose(&cancelled_);
- }
+ CompletionOp() : refs_(2), finalized_(false), cancelled_(0) {}
+
+ void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE;
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
bool CheckCancelled(CompletionQueue* cq);
@@ -59,7 +59,7 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer {
grpc::mutex mu_;
int refs_;
bool finalized_;
- bool cancelled_;
+ int cancelled_;
};
void ServerContext::CompletionOp::Unref() {
@@ -73,14 +73,20 @@ void ServerContext::CompletionOp::Unref() {
bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) {
cq->TryPluck(this);
grpc::lock_guard<grpc::mutex> g(mu_);
- return finalized_ ? cancelled_ : false;
+ return finalized_ ? cancelled_ != 0 : false;
+}
+
+void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) {
+ ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ ops->data.recv_close_on_server.cancelled = &cancelled_;
+ ops->flags = 0;
+ *nops = 1;
}
bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
- GPR_ASSERT(CallOpBuffer::FinalizeResult(tag, status));
grpc::unique_lock<grpc::mutex> lock(mu_);
finalized_ = true;
- if (!*status) cancelled_ = true;
+ if (!*status) cancelled_ = 1;
if (--refs_ == 0) {
lock.unlock();
delete this;
diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc
index 7132b6b1f1..b9d47b32de 100644
--- a/test/cpp/end2end/generic_end2end_test.cc
+++ b/test/cpp/end2end/generic_end2end_test.cc
@@ -33,10 +33,10 @@
#include <memory>
-#include "src/cpp/proto/proto_utils.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/util/echo.grpc.pb.h"
+#include <grpc++/impl/proto_utils.h>
#include <grpc++/async_generic_service.h>
#include <grpc++/async_unary_call.h>
#include <grpc++/byte_buffer.h>
diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++
index 5616f2c466..79868464a1 100644
--- a/tools/doxygen/Doxyfile.c++
+++ b/tools/doxygen/Doxyfile.c++
@@ -760,7 +760,7 @@ WARN_LOGFILE =
# spaces.
# Note: If this tag is empty the current directory is searched.
-INPUT = include/grpc++/async_generic_service.h include/grpc++/async_unary_call.h include/grpc++/byte_buffer.h include/grpc++/channel_arguments.h include/grpc++/channel_interface.h include/grpc++/client_context.h include/grpc++/completion_queue.h include/grpc++/config.h include/grpc++/create_channel.h include/grpc++/credentials.h include/grpc++/generic_stub.h include/grpc++/impl/call.h include/grpc++/impl/client_unary_call.h include/grpc++/impl/grpc_library.h include/grpc++/impl/internal_stub.h include/grpc++/impl/rpc_method.h include/grpc++/impl/rpc_service_method.h include/grpc++/impl/service_type.h include/grpc++/impl/sync.h include/grpc++/impl/sync_cxx11.h include/grpc++/impl/sync_no_cxx11.h include/grpc++/impl/thd.h include/grpc++/impl/thd_cxx11.h include/grpc++/impl/thd_no_cxx11.h include/grpc++/server.h include/grpc++/server_builder.h include/grpc++/server_context.h include/grpc++/server_credentials.h include/grpc++/slice.h include/grpc++/status.h include/grpc++/status_code_enum.h include/grpc++/stream.h include/grpc++/thread_pool_interface.h include/grpc++/time.h
+INPUT = include/grpc++/async_generic_service.h include/grpc++/async_unary_call.h include/grpc++/byte_buffer.h include/grpc++/channel_arguments.h include/grpc++/channel_interface.h include/grpc++/client_context.h include/grpc++/completion_queue.h include/grpc++/config.h include/grpc++/create_channel.h include/grpc++/credentials.h include/grpc++/generic_stub.h include/grpc++/impl/call.h include/grpc++/impl/client_unary_call.h include/grpc++/impl/grpc_library.h include/grpc++/impl/internal_stub.h include/grpc++/impl/rpc_method.h include/grpc++/impl/rpc_service_method.h include/grpc++/impl/serialization_traits.h include/grpc++/impl/service_type.h include/grpc++/impl/sync.h include/grpc++/impl/sync_cxx11.h include/grpc++/impl/sync_no_cxx11.h include/grpc++/impl/thd.h include/grpc++/impl/thd_cxx11.h include/grpc++/impl/thd_no_cxx11.h include/grpc++/server.h include/grpc++/server_builder.h include/grpc++/server_context.h include/grpc++/server_credentials.h include/grpc++/slice.h include/grpc++/status.h include/grpc++/status_code_enum.h include/grpc++/stream.h include/grpc++/thread_pool_interface.h include/grpc++/time.h
# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index 6d323274c9..48ae86c933 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -760,7 +760,7 @@ WARN_LOGFILE =
# spaces.
# Note: If this tag is empty the current directory is searched.
-INPUT = include/grpc++/async_generic_service.h include/grpc++/async_unary_call.h include/grpc++/byte_buffer.h include/grpc++/channel_arguments.h include/grpc++/channel_interface.h include/grpc++/client_context.h include/grpc++/completion_queue.h include/grpc++/config.h include/grpc++/create_channel.h include/grpc++/credentials.h include/grpc++/generic_stub.h include/grpc++/impl/call.h include/grpc++/impl/client_unary_call.h include/grpc++/impl/grpc_library.h include/grpc++/impl/internal_stub.h include/grpc++/impl/rpc_method.h include/grpc++/impl/rpc_service_method.h include/grpc++/impl/service_type.h include/grpc++/impl/sync.h include/grpc++/impl/sync_cxx11.h include/grpc++/impl/sync_no_cxx11.h include/grpc++/impl/thd.h include/grpc++/impl/thd_cxx11.h include/grpc++/impl/thd_no_cxx11.h include/grpc++/server.h include/grpc++/server_builder.h include/grpc++/server_context.h include/grpc++/server_credentials.h include/grpc++/slice.h include/grpc++/status.h include/grpc++/status_code_enum.h include/grpc++/stream.h include/grpc++/thread_pool_interface.h include/grpc++/time.h src/cpp/client/secure_credentials.h src/cpp/server/secure_server_credentials.h src/cpp/client/channel.h src/cpp/proto/proto_utils.h src/cpp/server/thread_pool.h src/cpp/client/secure_credentials.cc src/cpp/server/secure_server_credentials.cc src/cpp/client/channel.cc src/cpp/client/channel_arguments.cc src/cpp/client/client_context.cc src/cpp/client/client_unary_call.cc src/cpp/client/create_channel.cc src/cpp/client/credentials.cc src/cpp/client/generic_stub.cc src/cpp/client/insecure_credentials.cc src/cpp/client/internal_stub.cc src/cpp/common/call.cc src/cpp/common/completion_queue.cc src/cpp/common/rpc_method.cc src/cpp/proto/proto_utils.cc src/cpp/server/async_generic_service.cc src/cpp/server/create_default_thread_pool.cc src/cpp/server/insecure_server_credentials.cc src/cpp/server/server.cc src/cpp/server/server_builder.cc src/cpp/server/server_context.cc src/cpp/server/server_credentials.cc src/cpp/server/thread_pool.cc src/cpp/util/byte_buffer.cc src/cpp/util/slice.cc src/cpp/util/status.cc src/cpp/util/time.cc
+INPUT = include/grpc++/async_generic_service.h include/grpc++/async_unary_call.h include/grpc++/byte_buffer.h include/grpc++/channel_arguments.h include/grpc++/channel_interface.h include/grpc++/client_context.h include/grpc++/completion_queue.h include/grpc++/config.h include/grpc++/create_channel.h include/grpc++/credentials.h include/grpc++/generic_stub.h include/grpc++/impl/call.h include/grpc++/impl/client_unary_call.h include/grpc++/impl/grpc_library.h include/grpc++/impl/internal_stub.h include/grpc++/impl/rpc_method.h include/grpc++/impl/rpc_service_method.h include/grpc++/impl/serialization_traits.h include/grpc++/impl/service_type.h include/grpc++/impl/sync.h include/grpc++/impl/sync_cxx11.h include/grpc++/impl/sync_no_cxx11.h include/grpc++/impl/thd.h include/grpc++/impl/thd_cxx11.h include/grpc++/impl/thd_no_cxx11.h include/grpc++/server.h include/grpc++/server_builder.h include/grpc++/server_context.h include/grpc++/server_credentials.h include/grpc++/slice.h include/grpc++/status.h include/grpc++/status_code_enum.h include/grpc++/stream.h include/grpc++/thread_pool_interface.h include/grpc++/time.h src/cpp/client/secure_credentials.h src/cpp/server/secure_server_credentials.h src/cpp/client/channel.h src/cpp/proto/proto_utils.h src/cpp/server/thread_pool.h src/cpp/client/secure_credentials.cc src/cpp/server/secure_server_credentials.cc src/cpp/client/channel.cc src/cpp/client/channel_arguments.cc src/cpp/client/client_context.cc src/cpp/client/create_channel.cc src/cpp/client/credentials.cc src/cpp/client/generic_stub.cc src/cpp/client/insecure_credentials.cc src/cpp/client/internal_stub.cc src/cpp/common/call.cc src/cpp/common/completion_queue.cc src/cpp/common/rpc_method.cc src/cpp/proto/proto_utils.cc src/cpp/server/async_generic_service.cc src/cpp/server/create_default_thread_pool.cc src/cpp/server/insecure_server_credentials.cc src/cpp/server/server.cc src/cpp/server/server_builder.cc src/cpp/server/server_context.cc src/cpp/server/server_credentials.cc src/cpp/server/thread_pool.cc src/cpp/util/byte_buffer.cc src/cpp/util/slice.cc src/cpp/util/status.cc src/cpp/util/time.cc
# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses
diff --git a/vsprojects/grpc++/grpc++.vcxproj b/vsprojects/grpc++/grpc++.vcxproj
index d233f9e3d3..092b962aa0 100644
--- a/vsprojects/grpc++/grpc++.vcxproj
+++ b/vsprojects/grpc++/grpc++.vcxproj
@@ -163,6 +163,7 @@
<ClInclude Include="..\..\include\grpc++\impl\internal_stub.h" />
<ClInclude Include="..\..\include\grpc++\impl\rpc_method.h" />
<ClInclude Include="..\..\include\grpc++\impl\rpc_service_method.h" />
+ <ClInclude Include="..\..\include\grpc++\impl\serialization_traits.h" />
<ClInclude Include="..\..\include\grpc++\impl\service_type.h" />
<ClInclude Include="..\..\include\grpc++\impl\sync.h" />
<ClInclude Include="..\..\include\grpc++\impl\sync_cxx11.h" />
@@ -199,8 +200,6 @@
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\client_context.cc">
</ClCompile>
- <ClCompile Include="..\..\src\cpp\client\client_unary_call.cc">
- </ClCompile>
<ClCompile Include="..\..\src\cpp\client\create_channel.cc">
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\credentials.cc">
diff --git a/vsprojects/grpc++/grpc++.vcxproj.filters b/vsprojects/grpc++/grpc++.vcxproj.filters
index dd375c7238..f44fb87fbc 100644
--- a/vsprojects/grpc++/grpc++.vcxproj.filters
+++ b/vsprojects/grpc++/grpc++.vcxproj.filters
@@ -16,9 +16,6 @@
<ClCompile Include="..\..\src\cpp\client\client_context.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>
- <ClCompile Include="..\..\src\cpp\client\client_unary_call.cc">
- <Filter>src\cpp\client</Filter>
- </ClCompile>
<ClCompile Include="..\..\src\cpp\client\create_channel.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>
@@ -135,6 +132,9 @@
<ClInclude Include="..\..\include\grpc++\impl\rpc_service_method.h">
<Filter>include\grpc++\impl</Filter>
</ClInclude>
+ <ClInclude Include="..\..\include\grpc++\impl\serialization_traits.h">
+ <Filter>include\grpc++\impl</Filter>
+ </ClInclude>
<ClInclude Include="..\..\include\grpc++\impl\service_type.h">
<Filter>include\grpc++\impl</Filter>
</ClInclude>
diff --git a/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj b/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj
index 9b2ef9137d..aabbdc04e7 100644
--- a/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj
+++ b/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj
@@ -163,6 +163,7 @@
<ClInclude Include="..\..\include\grpc++\impl\internal_stub.h" />
<ClInclude Include="..\..\include\grpc++\impl\rpc_method.h" />
<ClInclude Include="..\..\include\grpc++\impl\rpc_service_method.h" />
+ <ClInclude Include="..\..\include\grpc++\impl\serialization_traits.h" />
<ClInclude Include="..\..\include\grpc++\impl\service_type.h" />
<ClInclude Include="..\..\include\grpc++\impl\sync.h" />
<ClInclude Include="..\..\include\grpc++\impl\sync_cxx11.h" />
@@ -193,8 +194,6 @@
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\client_context.cc">
</ClCompile>
- <ClCompile Include="..\..\src\cpp\client\client_unary_call.cc">
- </ClCompile>
<ClCompile Include="..\..\src\cpp\client\create_channel.cc">
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\credentials.cc">
diff --git a/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters b/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
index d616e336e4..23efb25ebe 100644
--- a/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
+++ b/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
@@ -10,9 +10,6 @@
<ClCompile Include="..\..\src\cpp\client\client_context.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>
- <ClCompile Include="..\..\src\cpp\client\client_unary_call.cc">
- <Filter>src\cpp\client</Filter>
- </ClCompile>
<ClCompile Include="..\..\src\cpp\client\create_channel.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>
@@ -129,6 +126,9 @@
<ClInclude Include="..\..\include\grpc++\impl\rpc_service_method.h">
<Filter>include\grpc++\impl</Filter>
</ClInclude>
+ <ClInclude Include="..\..\include\grpc++\impl\serialization_traits.h">
+ <Filter>include\grpc++\impl</Filter>
+ </ClInclude>
<ClInclude Include="..\..\include\grpc++\impl\service_type.h">
<Filter>include\grpc++\impl</Filter>
</ClInclude>