aboutsummaryrefslogtreecommitdiffhomepage
path: root/include/grpc++
diff options
context:
space:
mode:
authorGravatar yang-g <yangg@google.com>2015-07-06 14:05:54 -0700
committerGravatar yang-g <yangg@google.com>2015-07-06 14:05:54 -0700
commit5ea46ab2482c3724fbc7fd0aab55f324fb65999c (patch)
tree55eebc4aae8f06f931c8f75ddf84d56595f99fa1 /include/grpc++
parent3abe60b9d08ff5a784a39f7c4a10c631547c3526 (diff)
parentd426864934ac60f46e538ba81932e405fa8949b1 (diff)
merge with upstream and resolve conflicts
Diffstat (limited to 'include/grpc++')
-rw-r--r--include/grpc++/async_unary_call.h56
-rw-r--r--include/grpc++/byte_buffer.h29
-rw-r--r--include/grpc++/client_context.h10
-rw-r--r--include/grpc++/completion_queue.h30
-rw-r--r--include/grpc++/config.h43
-rw-r--r--include/grpc++/config_protobuf.h72
-rw-r--r--include/grpc++/impl/call.h428
-rw-r--r--include/grpc++/impl/client_unary_call.h26
-rw-r--r--include/grpc++/impl/proto_utils.h76
-rw-r--r--include/grpc++/impl/rpc_service_method.h111
-rw-r--r--include/grpc++/impl/serialization_traits.h68
-rw-r--r--include/grpc++/impl/service_type.h41
-rw-r--r--include/grpc++/server.h126
-rw-r--r--include/grpc++/server_context.h16
-rw-r--r--include/grpc++/stream.h466
15 files changed, 1159 insertions, 439 deletions
diff --git a/include/grpc++/async_unary_call.h b/include/grpc++/async_unary_call.h
index abb6308782..d631ccd134 100644
--- a/include/grpc++/async_unary_call.h
+++ b/include/grpc++/async_unary_call.h
@@ -51,47 +51,50 @@ class ClientAsyncResponseReaderInterface {
virtual ~ClientAsyncResponseReaderInterface() {}
virtual void ReadInitialMetadata(void* tag) = 0;
virtual void Finish(R* msg, Status* status, void* tag) = 0;
-
};
template <class R>
class ClientAsyncResponseReader GRPC_FINAL
: public ClientAsyncResponseReaderInterface<R> {
public:
+ template <class W>
ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
- const grpc::protobuf::Message& request)
+ const W& request)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
- init_buf_.AddSendMessage(request);
- init_buf_.AddClientSendClose();
+ init_buf_.SendInitialMetadata(context->send_initial_metadata_);
+ // TODO(ctiller): don't assert
+ GPR_ASSERT(init_buf_.SendMessage(request).ok());
+ init_buf_.ClientSendClose();
call_.PerformOps(&init_buf_);
}
void ReadInitialMetadata(void* tag) {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_buf_.Reset(tag);
- meta_buf_.AddRecvInitialMetadata(context_);
+ meta_buf_.set_output_tag(tag);
+ meta_buf_.RecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
}
void Finish(R* msg, Status* status, void* tag) {
- finish_buf_.Reset(tag);
+ finish_buf_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- finish_buf_.AddRecvInitialMetadata(context_);
+ finish_buf_.RecvInitialMetadata(context_);
}
- finish_buf_.AddRecvMessage(msg);
- finish_buf_.AddClientRecvStatus(context_, status);
+ finish_buf_.RecvMessage(msg);
+ finish_buf_.ClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
}
private:
ClientContext* context_;
Call call_;
- SneakyCallOpBuffer init_buf_;
- CallOpBuffer meta_buf_;
- CallOpBuffer finish_buf_;
+ SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpClientSendClose> init_buf_;
+ CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>,
+ CallOpClientRecvStatus> finish_buf_;
};
template <class W>
@@ -104,34 +107,36 @@ class ServerAsyncResponseWriter GRPC_FINAL
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- meta_buf_.Reset(tag);
- meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ meta_buf_.set_output_tag(tag);
+ meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
}
void Finish(const W& msg, const Status& status, void* tag) {
- finish_buf_.Reset(tag);
+ finish_buf_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (status.ok()) {
- finish_buf_.AddSendMessage(msg);
+ finish_buf_.ServerSendStatus(
+ ctx_->trailing_metadata_, finish_buf_.SendMessage(msg));
+ } else {
+ finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
}
- finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.ok());
- finish_buf_.Reset(tag);
+ finish_buf_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
+ finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
@@ -140,8 +145,9 @@ class ServerAsyncResponseWriter GRPC_FINAL
Call call_;
ServerContext* ctx_;
- CallOpBuffer meta_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata> meta_buf_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus> finish_buf_;
};
} // namespace grpc
diff --git a/include/grpc++/byte_buffer.h b/include/grpc++/byte_buffer.h
index 3e40eaed1d..cb3c6a1159 100644
--- a/include/grpc++/byte_buffer.h
+++ b/include/grpc++/byte_buffer.h
@@ -39,6 +39,8 @@
#include <grpc/support/log.h>
#include <grpc++/config.h>
#include <grpc++/slice.h>
+#include <grpc++/status.h>
+#include <grpc++/impl/serialization_traits.h>
#include <vector>
@@ -48,7 +50,7 @@ class ByteBuffer GRPC_FINAL {
public:
ByteBuffer() : buffer_(nullptr) {}
- ByteBuffer(Slice* slices, size_t nslices);
+ ByteBuffer(const Slice* slices, size_t nslices);
~ByteBuffer() {
if (buffer_) {
@@ -56,13 +58,16 @@ class ByteBuffer GRPC_FINAL {
}
}
- void Dump(std::vector<Slice>* slices);
+ void Dump(std::vector<Slice>* slices) const;
void Clear();
- size_t Length();
+ size_t Length() const;
private:
- friend class CallOpBuffer;
+ friend class SerializationTraits<ByteBuffer, void>;
+
+ ByteBuffer(const ByteBuffer&);
+ ByteBuffer& operator=(const ByteBuffer&);
// takes ownership
void set_buffer(grpc_byte_buffer* buf) {
@@ -78,6 +83,22 @@ class ByteBuffer GRPC_FINAL {
grpc_byte_buffer* buffer_;
};
+template <>
+class SerializationTraits<ByteBuffer, void> {
+ public:
+ static Status Deserialize(grpc_byte_buffer* byte_buffer, ByteBuffer* dest,
+ int max_message_size) {
+ dest->set_buffer(byte_buffer);
+ return Status::OK;
+ }
+ static Status Serialize(const ByteBuffer& source, grpc_byte_buffer** buffer,
+ bool* own_buffer) {
+ *buffer = source.buffer();
+ *own_buffer = false;
+ return Status::OK;
+ }
+};
+
} // namespace grpc
#endif // GRPCXX_BYTE_BUFFER_H
diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h
index 66d3c249a1..3bf5edc6c0 100644
--- a/include/grpc++/client_context.h
+++ b/include/grpc++/client_context.h
@@ -50,7 +50,6 @@ struct grpc_completion_queue;
namespace grpc {
-class CallOpBuffer;
class ChannelInterface;
class CompletionQueue;
class Credentials;
@@ -118,7 +117,8 @@ class ClientContext {
ClientContext(const ClientContext&);
ClientContext& operator=(const ClientContext&);
- friend class CallOpBuffer;
+ friend class CallOpClientRecvStatus;
+ friend class CallOpRecvInitialMetadata;
friend class Channel;
template <class R>
friend class ::grpc::ClientReader;
@@ -134,6 +134,12 @@ class ClientContext {
friend class ::grpc::ClientAsyncReaderWriter;
template <class R>
friend class ::grpc::ClientAsyncResponseReader;
+ template <class InputMessage, class OutputMessage>
+ friend Status BlockingUnaryCall(ChannelInterface* channel,
+ const RpcMethod& method,
+ ClientContext* context,
+ const InputMessage& request,
+ OutputMessage* result);
grpc_call* call() { return call_; }
void set_call(grpc_call* call,
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h
index e8429c8f41..f32cbff06c 100644
--- a/include/grpc++/completion_queue.h
+++ b/include/grpc++/completion_queue.h
@@ -35,8 +35,8 @@
#define GRPCXX_COMPLETION_QUEUE_H
#include <grpc/support/time.h>
-#include <grpc++/impl/client_unary_call.h>
#include <grpc++/impl/grpc_library.h>
+#include <grpc++/status.h>
#include <grpc++/time.h>
struct grpc_completion_queue;
@@ -55,8 +55,19 @@ template <class W>
class ServerWriter;
template <class R, class W>
class ServerReaderWriter;
-
+template <class ServiceType, class RequestType, class ResponseType>
+class RpcMethodHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class ClientStreamingHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class ServerStreamingHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class BidiStreamingHandler;
+
+class ChannelInterface;
+class ClientContext;
class CompletionQueue;
+class RpcMethod;
class Server;
class ServerBuilder;
class ServerContext;
@@ -84,7 +95,7 @@ class CompletionQueue : public GrpcLibrary {
// Nonblocking (until deadline) read from queue.
// Cannot rely on result of tag or ok if return is TIMEOUT
- template<typename T>
+ template <typename T>
NextStatus AsyncNext(void** tag, bool* ok, const T& deadline) {
TimePoint<T> deadline_tp(deadline);
return AsyncNextInternal(tag, ok, deadline_tp.raw_time());
@@ -118,13 +129,22 @@ class CompletionQueue : public GrpcLibrary {
friend class ::grpc::ServerWriter;
template <class R, class W>
friend class ::grpc::ServerReaderWriter;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class RpcMethodHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class ClientStreamingHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class ServerStreamingHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class BidiStreamingHandler;
friend class ::grpc::Server;
friend class ::grpc::ServerContext;
+ template <class InputMessage, class OutputMessage>
friend Status BlockingUnaryCall(ChannelInterface* channel,
const RpcMethod& method,
ClientContext* context,
- const grpc::protobuf::Message& request,
- grpc::protobuf::Message* result);
+ const InputMessage& request,
+ OutputMessage* result);
NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline);
diff --git a/include/grpc++/config.h b/include/grpc++/config.h
index ca74064be2..1362c0a1fa 100644
--- a/include/grpc++/config.h
+++ b/include/grpc++/config.h
@@ -77,31 +77,6 @@
#define GRPC_OVERRIDE override
#endif
-#ifndef GRPC_CUSTOM_PROTOBUF_INT64
-#include <google/protobuf/stubs/common.h>
-#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64
-#endif
-
-#ifndef GRPC_CUSTOM_MESSAGE
-#include <google/protobuf/message.h>
-#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message
-#endif
-
-#ifndef GRPC_CUSTOM_STRING
-#include <string>
-#define GRPC_CUSTOM_STRING std::string
-#endif
-
-#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM
-#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream.h>
-#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \
- ::google::protobuf::io::ZeroCopyOutputStream
-#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \
- ::google::protobuf::io::ZeroCopyInputStream
-#define GRPC_CUSTOM_CODEDINPUTSTREAM ::google::protobuf::io::CodedInputStream
-#endif
-
#ifdef GRPC_CXX0X_NO_NULLPTR
#include <memory>
const class {
@@ -125,23 +100,15 @@ const class {
} nullptr = {};
#endif
+#ifndef GRPC_CUSTOM_STRING
+#include <string>
+#define GRPC_CUSTOM_STRING std::string
+#endif
+
namespace grpc {
typedef GRPC_CUSTOM_STRING string;
-namespace protobuf {
-
-typedef GRPC_CUSTOM_MESSAGE Message;
-typedef GRPC_CUSTOM_PROTOBUF_INT64 int64;
-
-namespace io {
-typedef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ZeroCopyOutputStream;
-typedef GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ZeroCopyInputStream;
-typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream;
-} // namespace io
-
-} // namespace protobuf
-
} // namespace grpc
#endif // GRPCXX_CONFIG_H
diff --git a/include/grpc++/config_protobuf.h b/include/grpc++/config_protobuf.h
new file mode 100644
index 0000000000..3afc7a58e2
--- /dev/null
+++ b/include/grpc++/config_protobuf.h
@@ -0,0 +1,72 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPCXX_CONFIG_PROTOBUF_H
+#define GRPCXX_CONFIG_PROTOBUF_H
+
+#ifndef GRPC_CUSTOM_PROTOBUF_INT64
+#include <google/protobuf/stubs/common.h>
+#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64
+#endif
+
+#ifndef GRPC_CUSTOM_MESSAGE
+#include <google/protobuf/message.h>
+#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message
+#endif
+
+#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream.h>
+#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \
+ ::google::protobuf::io::ZeroCopyOutputStream
+#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \
+ ::google::protobuf::io::ZeroCopyInputStream
+#define GRPC_CUSTOM_CODEDINPUTSTREAM ::google::protobuf::io::CodedInputStream
+#endif
+
+namespace grpc {
+namespace protobuf {
+
+typedef GRPC_CUSTOM_MESSAGE Message;
+typedef GRPC_CUSTOM_PROTOBUF_INT64 int64;
+
+namespace io {
+typedef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ZeroCopyOutputStream;
+typedef GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ZeroCopyInputStream;
+typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream;
+} // namespace io
+
+} // namespace protobuf
+} // namespace grpc
+
+#endif // GRPCXX_CONFIG_PROTOBUF_H
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h
index aae199db1b..64fa5d6efb 100644
--- a/include/grpc++/impl/call.h
+++ b/include/grpc++/impl/call.h
@@ -34,14 +34,19 @@
#ifndef GRPCXX_IMPL_CALL_H
#define GRPCXX_IMPL_CALL_H
-#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
#include <grpc++/config.h>
#include <grpc++/status.h>
+#include <grpc++/impl/serialization_traits.h>
+#include <functional>
#include <memory>
#include <map>
+#include <string.h>
+
struct grpc_call;
struct grpc_op;
@@ -50,84 +55,383 @@ namespace grpc {
class ByteBuffer;
class Call;
-class CallOpBuffer : public CompletionQueueTag {
+void FillMetadataMap(grpc_metadata_array* arr,
+ std::multimap<grpc::string, grpc::string>* metadata);
+grpc_metadata* FillMetadataArray(
+ const std::multimap<grpc::string, grpc::string>& metadata);
+
+/// Default argument for CallOpSet. I is unused by the class, but can be
+/// used for generating multiple names for the same thing.
+template <int I>
+class CallNoOp {
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops) {}
+ void FinishOp(bool* status, int max_message_size) {}
+};
+
+class CallOpSendInitialMetadata {
public:
- CallOpBuffer();
- ~CallOpBuffer();
-
- void Reset(void* next_return_tag);
-
- // Does not take ownership.
- void AddSendInitialMetadata(
- std::multimap<grpc::string, grpc::string>* metadata);
- void AddSendInitialMetadata(ClientContext* ctx);
- void AddRecvInitialMetadata(ClientContext* ctx);
- void AddSendMessage(const grpc::protobuf::Message& message);
- void AddSendMessage(const ByteBuffer& message);
- void AddRecvMessage(grpc::protobuf::Message* message);
- void AddRecvMessage(ByteBuffer* message);
- void AddClientSendClose();
- void AddClientRecvStatus(ClientContext* ctx, Status* status);
- void AddServerSendStatus(std::multimap<grpc::string, grpc::string>* metadata,
- const Status& status);
- void AddServerRecvClose(bool* cancelled);
-
- // INTERNAL API:
-
- // Convert to an array of grpc_op elements
- void FillOps(grpc_op* ops, size_t* nops);
-
- // Called by completion queue just prior to returning from Next() or Pluck()
- bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
+ CallOpSendInitialMetadata() : send_(false) {}
- void set_max_message_size(int max_message_size) {
- max_message_size_ = max_message_size;
+ void SendInitialMetadata(
+ const std::multimap<grpc::string, grpc::string>& metadata) {
+ send_ = true;
+ initial_metadata_count_ = metadata.size();
+ initial_metadata_ = FillMetadataArray(metadata);
}
- bool got_message;
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops) {
+ if (!send_) return;
+ grpc_op* op = &ops[(*nops)++];
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->flags = 0;
+ op->data.send_initial_metadata.count = initial_metadata_count_;
+ op->data.send_initial_metadata.metadata = initial_metadata_;
+ }
+ void FinishOp(bool* status, int max_message_size) {
+ if (!send_) return;
+ gpr_free(initial_metadata_);
+ send_ = false;
+ }
- private:
- void* return_tag_;
- // Send initial metadata
- bool send_initial_metadata_;
+ bool send_;
size_t initial_metadata_count_;
grpc_metadata* initial_metadata_;
- // Recv initial metadta
- std::multimap<grpc::string, grpc::string>* recv_initial_metadata_;
- grpc_metadata_array recv_initial_metadata_arr_;
- // Send message
- const grpc::protobuf::Message* send_message_;
- const ByteBuffer* send_message_buffer_;
+};
+
+class CallOpSendMessage {
+ public:
+ CallOpSendMessage() : send_buf_(nullptr), own_buf_(false) {}
+
+ template <class M>
+ Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops) {
+ if (send_buf_ == nullptr) return;
+ grpc_op* op = &ops[(*nops)++];
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->flags = 0;
+ op->data.send_message = send_buf_;
+ }
+ void FinishOp(bool* status, int max_message_size) {
+ if (own_buf_) grpc_byte_buffer_destroy(send_buf_);
+ send_buf_ = nullptr;
+ }
+
+ private:
grpc_byte_buffer* send_buf_;
- // Recv message
- grpc::protobuf::Message* recv_message_;
- ByteBuffer* recv_message_buffer_;
+ bool own_buf_;
+};
+
+template <class M>
+Status CallOpSendMessage::SendMessage(const M& message) {
+ return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_);
+}
+
+template <class R>
+class CallOpRecvMessage {
+ public:
+ CallOpRecvMessage() : got_message(false), message_(nullptr) {}
+
+ void RecvMessage(R* message) { message_ = message; }
+
+ bool got_message;
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops) {
+ if (message_ == nullptr) return;
+ grpc_op* op = &ops[(*nops)++];
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->flags = 0;
+ op->data.recv_message = &recv_buf_;
+ }
+
+ void FinishOp(bool* status, int max_message_size) {
+ if (message_ == nullptr) return;
+ if (recv_buf_) {
+ if (*status) {
+ got_message = true;
+ *status = SerializationTraits<R>::Deserialize(recv_buf_, message_,
+ max_message_size)
+ .ok();
+ } else {
+ got_message = false;
+ grpc_byte_buffer_destroy(recv_buf_);
+ }
+ } else {
+ got_message = false;
+ *status = false;
+ }
+ message_ = nullptr;
+ }
+
+ private:
+ R* message_;
grpc_byte_buffer* recv_buf_;
- int max_message_size_;
- // Client send close
- bool client_send_close_;
- // Client recv status
+};
+
+class CallOpGenericRecvMessage {
+ public:
+ CallOpGenericRecvMessage() : got_message(false) {}
+
+ template <class R>
+ void RecvMessage(R* message) {
+ deserialize_ = [message](grpc_byte_buffer* buf,
+ int max_message_size) -> Status {
+ return SerializationTraits<R>::Deserialize(buf, message,
+ max_message_size);
+ };
+ }
+
+ bool got_message;
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops) {
+ if (!deserialize_) return;
+ grpc_op* op = &ops[(*nops)++];
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->flags = 0;
+ op->data.recv_message = &recv_buf_;
+ }
+
+ void FinishOp(bool* status, int max_message_size) {
+ if (!deserialize_) return;
+ if (recv_buf_) {
+ if (*status) {
+ got_message = true;
+ *status = deserialize_(recv_buf_, max_message_size).ok();
+ } else {
+ got_message = false;
+ grpc_byte_buffer_destroy(recv_buf_);
+ }
+ } else {
+ got_message = false;
+ *status = false;
+ }
+ deserialize_ = DeserializeFunc();
+ }
+
+ private:
+ typedef std::function<Status(grpc_byte_buffer*, int)> DeserializeFunc;
+ DeserializeFunc deserialize_;
+ grpc_byte_buffer* recv_buf_;
+};
+
+class CallOpClientSendClose {
+ public:
+ CallOpClientSendClose() : send_(false) {}
+
+ void ClientSendClose() { send_ = true; }
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops) {
+ if (!send_) return;
+ grpc_op* op = &ops[(*nops)++];
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ op->flags = 0;
+ }
+ void FinishOp(bool* status, int max_message_size) { send_ = false; }
+
+ private:
+ bool send_;
+};
+
+class CallOpServerSendStatus {
+ public:
+ CallOpServerSendStatus() : send_status_available_(false) {}
+
+ void ServerSendStatus(
+ const std::multimap<grpc::string, grpc::string>& trailing_metadata,
+ const Status& status) {
+ trailing_metadata_count_ = trailing_metadata.size();
+ trailing_metadata_ = FillMetadataArray(trailing_metadata);
+ send_status_available_ = true;
+ send_status_code_ = static_cast<grpc_status_code>(status.error_code());
+ send_status_details_ = status.error_message();
+ }
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops) {
+ if (!send_status_available_) return;
+ grpc_op* op = &ops[(*nops)++];
+ op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ op->data.send_status_from_server.trailing_metadata_count =
+ trailing_metadata_count_;
+ op->data.send_status_from_server.trailing_metadata = trailing_metadata_;
+ op->data.send_status_from_server.status = send_status_code_;
+ op->data.send_status_from_server.status_details =
+ send_status_details_.empty() ? nullptr : send_status_details_.c_str();
+ op->flags = 0;
+ }
+
+ void FinishOp(bool* status, int max_message_size) {
+ if (!send_status_available_) return;
+ gpr_free(trailing_metadata_);
+ send_status_available_ = false;
+ }
+
+ private:
+ bool send_status_available_;
+ grpc_status_code send_status_code_;
+ grpc::string send_status_details_;
+ size_t trailing_metadata_count_;
+ grpc_metadata* trailing_metadata_;
+};
+
+class CallOpRecvInitialMetadata {
+ public:
+ CallOpRecvInitialMetadata() : recv_initial_metadata_(nullptr) {}
+
+ void RecvInitialMetadata(ClientContext* context) {
+ context->initial_metadata_received_ = true;
+ recv_initial_metadata_ = &context->recv_initial_metadata_;
+ }
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops) {
+ if (!recv_initial_metadata_) return;
+ memset(&recv_initial_metadata_arr_, 0, sizeof(recv_initial_metadata_arr_));
+ grpc_op* op = &ops[(*nops)++];
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata = &recv_initial_metadata_arr_;
+ op->flags = 0;
+ }
+ void FinishOp(bool* status, int max_message_size) {
+ if (recv_initial_metadata_ == nullptr) return;
+ FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_);
+ recv_initial_metadata_ = nullptr;
+ }
+
+ private:
+ std::multimap<grpc::string, grpc::string>* recv_initial_metadata_;
+ grpc_metadata_array recv_initial_metadata_arr_;
+};
+
+class CallOpClientRecvStatus {
+ public:
+ CallOpClientRecvStatus() : recv_status_(nullptr) {}
+
+ void ClientRecvStatus(ClientContext* context, Status* status) {
+ recv_trailing_metadata_ = &context->trailing_metadata_;
+ recv_status_ = status;
+ }
+
+ protected:
+ void AddOp(grpc_op* ops, size_t* nops) {
+ if (recv_status_ == nullptr) return;
+ memset(&recv_trailing_metadata_arr_, 0,
+ sizeof(recv_trailing_metadata_arr_));
+ status_details_ = nullptr;
+ status_details_capacity_ = 0;
+ grpc_op* op = &ops[(*nops)++];
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata =
+ &recv_trailing_metadata_arr_;
+ op->data.recv_status_on_client.status = &status_code_;
+ op->data.recv_status_on_client.status_details = &status_details_;
+ op->data.recv_status_on_client.status_details_capacity =
+ &status_details_capacity_;
+ op->flags = 0;
+ }
+
+ void FinishOp(bool* status, int max_message_size) {
+ if (recv_status_ == nullptr) return;
+ FillMetadataMap(&recv_trailing_metadata_arr_, recv_trailing_metadata_);
+ *recv_status_ = Status(
+ static_cast<StatusCode>(status_code_),
+ status_details_ ? grpc::string(status_details_) : grpc::string());
+ gpr_free(status_details_);
+ recv_status_ = nullptr;
+ }
+
+ private:
std::multimap<grpc::string, grpc::string>* recv_trailing_metadata_;
Status* recv_status_;
grpc_metadata_array recv_trailing_metadata_arr_;
grpc_status_code status_code_;
char* status_details_;
size_t status_details_capacity_;
- // Server send status
- bool send_status_available_;
- grpc_status_code send_status_code_;
- grpc::string send_status_details_;
- size_t trailing_metadata_count_;
- grpc_metadata* trailing_metadata_;
- int cancelled_buf_;
- bool* recv_closed_;
};
-// SneakyCallOpBuffer does not post completions to the completion queue
-class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer {
+/// An abstract collection of call ops, used to generate the
+/// grpc_call_op structure to pass down to the lower layers,
+/// and as it is-a CompletionQueueTag, also massages the final
+/// completion into the correct form for consumption in the C++
+/// API.
+class CallOpSetInterface : public CompletionQueueTag {
+ public:
+ CallOpSetInterface() : max_message_size_(0) {}
+ /// Fills in grpc_op, starting from ops[*nops] and moving
+ /// upwards.
+ virtual void FillOps(grpc_op* ops, size_t* nops) = 0;
+
+ void set_max_message_size(int max_message_size) {
+ max_message_size_ = max_message_size;
+ }
+
+ protected:
+ int max_message_size_;
+};
+
+/// Primary implementaiton of CallOpSetInterface.
+/// Since we cannot use variadic templates, we declare slots up to
+/// the maximum count of ops we'll need in a set. We leverage the
+/// empty base class optimization to slim this class (especially
+/// when there are many unused slots used). To avoid duplicate base classes,
+/// the template parmeter for CallNoOp is varied by argument position.
+template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
+ class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
+ class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
+class CallOpSet : public CallOpSetInterface,
+ public Op1,
+ public Op2,
+ public Op3,
+ public Op4,
+ public Op5,
+ public Op6 {
+ public:
+ CallOpSet() : return_tag_(this) {}
+ void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE {
+ this->Op1::AddOp(ops, nops);
+ this->Op2::AddOp(ops, nops);
+ this->Op3::AddOp(ops, nops);
+ this->Op4::AddOp(ops, nops);
+ this->Op5::AddOp(ops, nops);
+ this->Op6::AddOp(ops, nops);
+ }
+
+ bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
+ this->Op1::FinishOp(status, max_message_size_);
+ this->Op2::FinishOp(status, max_message_size_);
+ this->Op3::FinishOp(status, max_message_size_);
+ this->Op4::FinishOp(status, max_message_size_);
+ this->Op5::FinishOp(status, max_message_size_);
+ this->Op6::FinishOp(status, max_message_size_);
+ *tag = return_tag_;
+ return true;
+ }
+
+ void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
+
+ private:
+ void* return_tag_;
+};
+
+/// A CallOpSet that does not post completions to the completion queue.
+///
+/// Allows hiding some completions that the C core must generate from
+/// C++ users.
+template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
+ class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
+ class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
+class SneakyCallOpSet GRPC_FINAL
+ : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> {
public:
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
- return CallOpBuffer::FinalizeResult(tag, status) && false;
+ typedef CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> Base;
+ return Base::FinalizeResult(tag, status) && false;
}
};
@@ -135,7 +439,7 @@ class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer {
class CallHook {
public:
virtual ~CallHook() {}
- virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) = 0;
+ virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0;
};
// Straightforward wrapping of the C call object
@@ -146,7 +450,7 @@ class Call GRPC_FINAL {
Call(grpc_call* call, CallHook* call_hook_, CompletionQueue* cq,
int max_message_size);
- void PerformOps(CallOpBuffer* buffer);
+ void PerformOps(CallOpSetInterface* ops);
grpc_call* call() { return call_; }
CompletionQueue* cq() { return cq_; }
diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h
index 2f234fd3ac..b77ce7d02c 100644
--- a/include/grpc++/impl/client_unary_call.h
+++ b/include/grpc++/impl/client_unary_call.h
@@ -37,6 +37,8 @@
#include <grpc++/config.h>
#include <grpc++/status.h>
+#include <grpc++/impl/call.h>
+
namespace grpc {
class ChannelInterface;
@@ -45,10 +47,28 @@ class CompletionQueue;
class RpcMethod;
// Wrapper that performs a blocking unary call
+template <class InputMessage, class OutputMessage>
Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context,
- const grpc::protobuf::Message& request,
- grpc::protobuf::Message* result);
+ ClientContext* context, const InputMessage& request,
+ OutputMessage* result) {
+ CompletionQueue cq;
+ Call call(channel->CreateCall(method, context, &cq));
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>,
+ CallOpClientSendClose, CallOpClientRecvStatus> ops;
+ Status status = ops.SendMessage(request);
+ if (!status.ok()) {
+ return status;
+ }
+ ops.SendInitialMetadata(context->send_initial_metadata_);
+ ops.RecvInitialMetadata(context);
+ ops.RecvMessage(result);
+ ops.ClientSendClose();
+ ops.ClientRecvStatus(context, &status);
+ call.PerformOps(&ops);
+ GPR_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.ok());
+ return status;
+}
} // namespace grpc
diff --git a/include/grpc++/impl/proto_utils.h b/include/grpc++/impl/proto_utils.h
new file mode 100644
index 0000000000..ebefa3e1be
--- /dev/null
+++ b/include/grpc++/impl/proto_utils.h
@@ -0,0 +1,76 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
+#define GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
+
+#include <type_traits>
+
+#include <grpc/grpc.h>
+#include <grpc++/impl/serialization_traits.h>
+#include <grpc++/config_protobuf.h>
+#include <grpc++/status.h>
+
+namespace grpc {
+
+// Serialize the msg into a buffer created inside the function. The caller
+// should destroy the returned buffer when done with it. If serialization fails,
+// false is returned and buffer is left unchanged.
+Status SerializeProto(const grpc::protobuf::Message& msg,
+ grpc_byte_buffer** buffer);
+
+// The caller keeps ownership of buffer and msg.
+Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
+ int max_message_size);
+
+template <class T>
+class SerializationTraits<T, typename std::enable_if<std::is_base_of<
+ grpc::protobuf::Message, T>::value>::type> {
+ public:
+ static Status Serialize(const grpc::protobuf::Message& msg,
+ grpc_byte_buffer** buffer, bool* own_buffer) {
+ *own_buffer = true;
+ return SerializeProto(msg, buffer);
+ }
+ static Status Deserialize(grpc_byte_buffer* buffer,
+ grpc::protobuf::Message* msg,
+ int max_message_size) {
+ auto status = DeserializeProto(buffer, msg, max_message_size);
+ grpc_byte_buffer_destroy(buffer);
+ return status;
+ }
+};
+
+} // namespace grpc
+
+#endif // GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h
index 50204d2099..3cfbef7806 100644
--- a/include/grpc++/impl/rpc_service_method.h
+++ b/include/grpc++/impl/rpc_service_method.h
@@ -55,16 +55,19 @@ class MethodHandler {
public:
virtual ~MethodHandler() {}
struct HandlerParameter {
- HandlerParameter(Call* c, ServerContext* context,
- const grpc::protobuf::Message* req,
- grpc::protobuf::Message* resp)
- : call(c), server_context(context), request(req), response(resp) {}
+ HandlerParameter(Call* c, ServerContext* context, grpc_byte_buffer* req,
+ int max_size)
+ : call(c),
+ server_context(context),
+ request(req),
+ max_message_size(max_size) {}
Call* call;
ServerContext* server_context;
- const grpc::protobuf::Message* request;
- grpc::protobuf::Message* response;
+ // Handler required to grpc_byte_buffer_destroy this
+ grpc_byte_buffer* request;
+ int max_message_size;
};
- virtual Status RunHandler(const HandlerParameter& param) = 0;
+ virtual void RunHandler(const HandlerParameter& param) = 0;
};
// A wrapper class of an application provided rpc method handler.
@@ -77,11 +80,25 @@ class RpcMethodHandler : public MethodHandler {
ServiceType* service)
: func_(func), service_(service) {}
- Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
- // Invoke application function, cast proto messages to their actual types.
- return func_(service_, param.server_context,
- dynamic_cast<const RequestType*>(param.request),
- dynamic_cast<ResponseType*>(param.response));
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ RequestType req;
+ Status status = SerializationTraits<RequestType>::Deserialize(
+ param.request, &req, param.max_message_size);
+ ResponseType rsp;
+ if (status.ok()) {
+ status = func_(service_, param.server_context, &req, &rsp);
+ }
+
+ GPR_ASSERT(!param.server_context->sent_initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus> ops;
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ if (status.ok()) {
+ status = ops.SendMessage(rsp);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
}
private:
@@ -102,10 +119,21 @@ class ClientStreamingHandler : public MethodHandler {
ServiceType* service)
: func_(func), service_(service) {}
- Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
ServerReader<RequestType> reader(param.call, param.server_context);
- return func_(service_, param.server_context, &reader,
- dynamic_cast<ResponseType*>(param.response));
+ ResponseType rsp;
+ Status status = func_(service_, param.server_context, &reader, &rsp);
+
+ GPR_ASSERT(!param.server_context->sent_initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus> ops;
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ if (status.ok()) {
+ status = ops.SendMessage(rsp);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
}
private:
@@ -124,10 +152,23 @@ class ServerStreamingHandler : public MethodHandler {
ServiceType* service)
: func_(func), service_(service) {}
- Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
- ServerWriter<ResponseType> writer(param.call, param.server_context);
- return func_(service_, param.server_context,
- dynamic_cast<const RequestType*>(param.request), &writer);
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ RequestType req;
+ Status status = SerializationTraits<RequestType>::Deserialize(
+ param.request, &req, param.max_message_size);
+
+ if (status.ok()) {
+ ServerWriter<ResponseType> writer(param.call, param.server_context);
+ status = func_(service_, param.server_context, &req, &writer);
+ }
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
+ if (!param.server_context->sent_initial_metadata_) {
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
}
private:
@@ -147,10 +188,18 @@ class BidiStreamingHandler : public MethodHandler {
ServiceType* service)
: func_(func), service_(service) {}
- Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
ServerReaderWriter<ResponseType, RequestType> stream(param.call,
param.server_context);
- return func_(service_, param.server_context, &stream);
+ Status status = func_(service_, param.server_context, &stream);
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
+ if (!param.server_context->sent_initial_metadata_) {
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
}
private:
@@ -162,29 +211,15 @@ class BidiStreamingHandler : public MethodHandler {
// Server side rpc method class
class RpcServiceMethod : public RpcMethod {
public:
- // Takes ownership of the handler and two prototype objects.
+ // Takes ownership of the handler
RpcServiceMethod(const char* name, RpcMethod::RpcType type,
- MethodHandler* handler,
- grpc::protobuf::Message* request_prototype,
- grpc::protobuf::Message* response_prototype)
- : RpcMethod(name, type, nullptr),
- handler_(handler),
- request_prototype_(request_prototype),
- response_prototype_(response_prototype) {}
+ MethodHandler* handler)
+ : RpcMethod(name, type, nullptr), handler_(handler) {}
MethodHandler* handler() { return handler_.get(); }
- grpc::protobuf::Message* AllocateRequestProto() {
- return request_prototype_->New();
- }
- grpc::protobuf::Message* AllocateResponseProto() {
- return response_prototype_->New();
- }
-
private:
std::unique_ptr<MethodHandler> handler_;
- std::unique_ptr<grpc::protobuf::Message> request_prototype_;
- std::unique_ptr<grpc::protobuf::Message> response_prototype_;
};
// This class contains all the method information for an rpc service. It is
diff --git a/include/grpc++/impl/serialization_traits.h b/include/grpc++/impl/serialization_traits.h
new file mode 100644
index 0000000000..1f5c674e4c
--- /dev/null
+++ b/include/grpc++/impl/serialization_traits.h
@@ -0,0 +1,68 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPCXX_IMPL_SERIALIZATION_TRAITS_H
+#define GRPCXX_IMPL_SERIALIZATION_TRAITS_H
+
+namespace grpc {
+
+/// Defines how to serialize and deserialize some type.
+///
+/// Used for hooking different message serialization API's into GRPC.
+/// Each SerializationTraits implementation must provide the following
+/// functions:
+/// static Status Serialize(const Message& msg,
+/// grpc_byte_buffer** buffer,
+// bool* own_buffer);
+/// static Status Deserialize(grpc_byte_buffer* buffer,
+/// Message* msg,
+/// int max_message_size);
+///
+/// Serialize is required to convert message to a grpc_byte_buffer, and
+/// to store a pointer to that byte buffer at *buffer. *own_buffer should
+/// be set to true if the caller owns said byte buffer, or false if
+/// ownership is retained elsewhere.
+///
+/// Deserialize is required to convert buffer into the message stored at
+/// msg. max_message_size is passed in as a bound on the maximum number of
+/// message bytes Deserialize should accept.
+///
+/// Both functions return a Status, allowing them to explain what went
+/// wrong if required.
+template <class Message,
+ class UnusedButHereForPartialTemplateSpecialization = void>
+class SerializationTraits;
+
+} // namespace grpc
+
+#endif // GRPCXX_IMPL_SERIALIZATION_TRAITS_H
diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h
index 25e437edad..c33a278f5b 100644
--- a/include/grpc++/impl/service_type.h
+++ b/include/grpc++/impl/service_type.h
@@ -35,6 +35,8 @@
#define GRPCXX_IMPL_SERVICE_TYPE_H
#include <grpc++/config.h>
+#include <grpc++/impl/serialization_traits.h>
+#include <grpc++/server.h>
#include <grpc++/status.h>
namespace grpc {
@@ -65,20 +67,8 @@ class ServerAsyncStreamingInterface {
class AsynchronousService {
public:
- // this is Server, but in disguise to avoid a link dependency
- class DispatchImpl {
- public:
- virtual void RequestAsyncCall(void* registered_method,
- ServerContext* context,
- ::grpc::protobuf::Message* request,
- ServerAsyncStreamingInterface* stream,
- CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq,
- void* tag) = 0;
- };
-
AsynchronousService(const char** method_names, size_t method_count)
- : dispatch_impl_(nullptr),
+ : server_(nullptr),
method_names_(method_names),
method_count_(method_count),
request_args_(nullptr) {}
@@ -86,42 +76,43 @@ class AsynchronousService {
~AsynchronousService() { delete[] request_args_; }
protected:
- void RequestAsyncUnary(int index, ServerContext* context,
- grpc::protobuf::Message* request,
+ template <class Message>
+ void RequestAsyncUnary(int index, ServerContext* context, Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
- dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
- stream, call_cq, notification_cq, tag);
+ server_->RequestAsyncCall(request_args_[index], context, stream, call_cq,
+ notification_cq, tag, request);
}
void RequestClientStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
- dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
- stream, call_cq, notification_cq, tag);
+ server_->RequestAsyncCall(request_args_[index], context, stream, call_cq,
+ notification_cq, tag);
}
+ template <class Message>
void RequestServerStreaming(int index, ServerContext* context,
- grpc::protobuf::Message* request,
+ Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
- dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
- stream, call_cq, notification_cq, tag);
+ server_->RequestAsyncCall(request_args_[index], context, stream, call_cq,
+ notification_cq, tag, request);
}
void RequestBidiStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
- dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
- stream, call_cq, notification_cq, tag);
+ server_->RequestAsyncCall(request_args_[index], context, stream, call_cq,
+ notification_cq, tag);
}
private:
friend class Server;
- DispatchImpl* dispatch_impl_;
+ Server* server_;
const char** const method_names_;
size_t method_count_;
void** request_args_;
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 2cfeb359fc..6a9e757e77 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -41,25 +41,24 @@
#include <grpc++/config.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/grpc_library.h>
-#include <grpc++/impl/service_type.h>
#include <grpc++/impl/sync.h>
#include <grpc++/status.h>
struct grpc_server;
namespace grpc {
+
class AsynchronousService;
class GenericServerContext;
class AsyncGenericService;
class RpcService;
class RpcServiceMethod;
+class ServerAsyncStreamingInterface;
class ServerCredentials;
class ThreadPoolInterface;
// Currently it only supports handling rpcs in a single thread.
-class Server GRPC_FINAL : public GrpcLibrary,
- private CallHook,
- private AsynchronousService::DispatchImpl {
+class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
public:
~Server();
@@ -73,6 +72,7 @@ class Server GRPC_FINAL : public GrpcLibrary,
private:
friend class AsyncGenericService;
+ friend class AsynchronousService;
friend class ServerBuilder;
class SyncRequest;
@@ -96,21 +96,123 @@ class Server GRPC_FINAL : public GrpcLibrary,
void RunRpc();
void ScheduleCallback();
- void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE;
+ void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
+
+ class BaseAsyncRequest : public CompletionQueueTag {
+ public:
+ BaseAsyncRequest(Server* server, ServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* call_cq, void* tag);
+ virtual ~BaseAsyncRequest();
+
+ bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
+
+ protected:
+ Server* const server_;
+ ServerContext* const context_;
+ ServerAsyncStreamingInterface* const stream_;
+ CompletionQueue* const call_cq_;
+ void* const tag_;
+ grpc_call* call_;
+ grpc_metadata_array initial_metadata_array_;
+ };
+
+ class RegisteredAsyncRequest : public BaseAsyncRequest {
+ public:
+ RegisteredAsyncRequest(Server* server, ServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* call_cq, void* tag);
+
+ // uses BaseAsyncRequest::FinalizeResult
+
+ protected:
+ void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
+ ServerCompletionQueue* notification_cq);
+ };
+
+ class NoPayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
+ public:
+ NoPayloadAsyncRequest(void* registered_method, Server* server,
+ ServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag)
+ : RegisteredAsyncRequest(server, context, stream, call_cq, tag) {
+ IssueRequest(registered_method, nullptr, notification_cq);
+ }
+
+ // uses RegisteredAsyncRequest::FinalizeResult
+ };
+
+ template <class Message>
+ class PayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
+ public:
+ PayloadAsyncRequest(void* registered_method, Server* server,
+ ServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag,
+ Message* request)
+ : RegisteredAsyncRequest(server, context, stream, call_cq, tag),
+ request_(request) {
+ IssueRequest(registered_method, &payload_, notification_cq);
+ }
+
+ bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
+ bool serialization_status =
+ *status && payload_ &&
+ SerializationTraits<Message>::Deserialize(payload_, request_,
+ server_->max_message_size_)
+ .ok();
+ bool ret = RegisteredAsyncRequest::FinalizeResult(tag, status);
+ *status = serialization_status && *status;
+ return ret;
+ }
+
+ private:
+ grpc_byte_buffer* payload_;
+ Message* const request_;
+ };
+
+ class GenericAsyncRequest GRPC_FINAL : public BaseAsyncRequest {
+ public:
+ GenericAsyncRequest(Server* server, GenericServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag);
+
+ bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
+
+ private:
+ grpc_call_details call_details_;
+ };
+
+ template <class Message>
+ void RequestAsyncCall(void* registered_method, ServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag,
+ Message* message) {
+ new PayloadAsyncRequest<Message>(registered_method, this, context, stream,
+ call_cq, notification_cq, tag, message);
+ }
- // DispatchImpl
void RequestAsyncCall(void* registered_method, ServerContext* context,
- grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq,
- void* tag) GRPC_OVERRIDE;
+ ServerCompletionQueue* notification_cq, void* tag) {
+ new NoPayloadAsyncRequest(registered_method, this, context, stream, call_cq,
+ notification_cq, tag);
+ }
void RequestAsyncGenericCall(GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq,
+ CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
- void* tag);
+ void* tag) {
+ new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
+ tag);
+ }
const int max_message_size_;
@@ -133,8 +235,6 @@ class Server GRPC_FINAL : public GrpcLibrary,
ThreadPoolInterface* thread_pool_;
// Whether the thread pool is created and owned by the server.
bool thread_pool_owned_;
- private:
- Server() : max_message_size_(-1), server_(NULL) { abort(); }
};
} // namespace grpc
diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h
index 5a6af299e3..3bf21e02bf 100644
--- a/include/grpc++/server_context.h
+++ b/include/grpc++/server_context.h
@@ -62,6 +62,14 @@ template <class W>
class ServerWriter;
template <class R, class W>
class ServerReaderWriter;
+template <class ServiceType, class RequestType, class ResponseType>
+class RpcMethodHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class ClientStreamingHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class ServerStreamingHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class BidiStreamingHandler;
class Call;
class CallOpBuffer;
@@ -109,6 +117,14 @@ class ServerContext {
friend class ::grpc::ServerWriter;
template <class R, class W>
friend class ::grpc::ServerReaderWriter;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class RpcMethodHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class ClientStreamingHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class ServerStreamingHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class BidiStreamingHandler;
// Prevent copying.
ServerContext(const ServerContext&);
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index 472911e62b..dd5e52d6d3 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -93,15 +93,18 @@ template <class R>
class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
public:
// Blocking create a stream and write the first request out.
+ template <class W>
ClientReader(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context, const grpc::protobuf::Message& request)
+ ClientContext* context, const W& request)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
- CallOpBuffer buf;
- buf.AddSendInitialMetadata(&context->send_initial_metadata_);
- buf.AddSendMessage(request);
- buf.AddClientSendClose();
- call_.PerformOps(&buf);
- cq_.Pluck(&buf);
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpClientSendClose> ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_);
+ // TODO(ctiller): don't assert
+ GPR_ASSERT(ops.SendMessage(request).ok());
+ ops.ClientSendClose();
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
}
// Blocking wait for initial metadata from server. The received metadata
@@ -111,28 +114,28 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
- CallOpBuffer buf;
- buf.AddRecvInitialMetadata(context_);
- call_.PerformOps(&buf);
- cq_.Pluck(&buf); // status ignored
+ CallOpSet<CallOpRecvInitialMetadata> ops;
+ ops.RecvInitialMetadata(context_);
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops); // status ignored
}
bool Read(R* msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
if (!context_->initial_metadata_received_) {
- buf.AddRecvInitialMetadata(context_);
+ ops.RecvInitialMetadata(context_);
}
- buf.AddRecvMessage(msg);
- call_.PerformOps(&buf);
- return cq_.Pluck(&buf) && buf.got_message;
+ ops.RecvMessage(msg);
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops) && ops.got_message;
}
Status Finish() GRPC_OVERRIDE {
- CallOpBuffer buf;
+ CallOpSet<CallOpClientRecvStatus> ops;
Status status;
- buf.AddClientRecvStatus(context_, &status);
- call_.PerformOps(&buf);
- GPR_ASSERT(cq_.Pluck(&buf));
+ ops.ClientRecvStatus(context_, &status);
+ call_.PerformOps(&ops);
+ GPR_ASSERT(cq_.Pluck(&ops));
return status;
}
@@ -150,48 +153,49 @@ class ClientWriterInterface : public ClientStreamingInterface,
};
template <class W>
-class ClientWriter GRPC_FINAL : public ClientWriterInterface<W> {
+class ClientWriter : public ClientWriterInterface<W> {
public:
// Blocking create a stream.
+ template <class R>
ClientWriter(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context, grpc::protobuf::Message* response)
- : context_(context),
- response_(response),
- call_(channel->CreateCall(method, context, &cq_)) {
- CallOpBuffer buf;
- buf.AddSendInitialMetadata(&context->send_initial_metadata_);
- call_.PerformOps(&buf);
- cq_.Pluck(&buf);
+ ClientContext* context, R* response)
+ : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
+ finish_ops_.RecvMessage(response);
+
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_);
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
}
bool Write(const W& msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
- buf.AddSendMessage(msg);
- call_.PerformOps(&buf);
- return cq_.Pluck(&buf);
+ CallOpSet<CallOpSendMessage> ops;
+ if (!ops.SendMessage(msg).ok()) {
+ return false;
+ }
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops);
}
bool WritesDone() GRPC_OVERRIDE {
- CallOpBuffer buf;
- buf.AddClientSendClose();
- call_.PerformOps(&buf);
- return cq_.Pluck(&buf);
+ CallOpSet<CallOpClientSendClose> ops;
+ ops.ClientSendClose();
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops);
}
// Read the final response and wait for the final status.
Status Finish() GRPC_OVERRIDE {
- CallOpBuffer buf;
Status status;
- buf.AddRecvMessage(response_);
- buf.AddClientRecvStatus(context_, &status);
- call_.PerformOps(&buf);
- GPR_ASSERT(cq_.Pluck(&buf));
+ finish_ops_.ClientRecvStatus(context_, &status);
+ call_.PerformOps(&finish_ops_);
+ GPR_ASSERT(cq_.Pluck(&finish_ops_));
return status;
}
private:
ClientContext* context_;
- grpc::protobuf::Message* const response_;
+ CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
CompletionQueue cq_;
Call call_;
};
@@ -213,10 +217,10 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
- CallOpBuffer buf;
- buf.AddSendInitialMetadata(&context->send_initial_metadata_);
- call_.PerformOps(&buf);
- cq_.Pluck(&buf);
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_);
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
}
// Blocking wait for initial metadata from server. The received metadata
@@ -226,42 +230,42 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
- CallOpBuffer buf;
- buf.AddRecvInitialMetadata(context_);
- call_.PerformOps(&buf);
- cq_.Pluck(&buf); // status ignored
+ CallOpSet<CallOpRecvInitialMetadata> ops;
+ ops.RecvInitialMetadata(context_);
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops); // status ignored
}
bool Read(R* msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
if (!context_->initial_metadata_received_) {
- buf.AddRecvInitialMetadata(context_);
+ ops.RecvInitialMetadata(context_);
}
- buf.AddRecvMessage(msg);
- call_.PerformOps(&buf);
- return cq_.Pluck(&buf) && buf.got_message;
+ ops.RecvMessage(msg);
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops) && ops.got_message;
}
bool Write(const W& msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
- buf.AddSendMessage(msg);
- call_.PerformOps(&buf);
- return cq_.Pluck(&buf);
+ CallOpSet<CallOpSendMessage> ops;
+ if (!ops.SendMessage(msg).ok()) return false;
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops);
}
bool WritesDone() GRPC_OVERRIDE {
- CallOpBuffer buf;
- buf.AddClientSendClose();
- call_.PerformOps(&buf);
- return cq_.Pluck(&buf);
+ CallOpSet<CallOpClientSendClose> ops;
+ ops.ClientSendClose();
+ call_.PerformOps(&ops);
+ return cq_.Pluck(&ops);
}
Status Finish() GRPC_OVERRIDE {
- CallOpBuffer buf;
+ CallOpSet<CallOpClientRecvStatus> ops;
Status status;
- buf.AddClientRecvStatus(context_, &status);
- call_.PerformOps(&buf);
- GPR_ASSERT(cq_.Pluck(&buf));
+ ops.ClientRecvStatus(context_, &status);
+ call_.PerformOps(&ops);
+ GPR_ASSERT(cq_.Pluck(&ops));
return status;
}
@@ -279,18 +283,18 @@ class ServerReader GRPC_FINAL : public ReaderInterface<R> {
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- CallOpBuffer buf;
- buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_->PerformOps(&buf);
- call_->cq()->Pluck(&buf);
+ call_->PerformOps(&ops);
+ call_->cq()->Pluck(&ops);
}
bool Read(R* msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
- buf.AddRecvMessage(msg);
- call_->PerformOps(&buf);
- return call_->cq()->Pluck(&buf) && buf.got_message;
+ CallOpSet<CallOpRecvMessage<R>> ops;
+ ops.RecvMessage(msg);
+ call_->PerformOps(&ops);
+ return call_->cq()->Pluck(&ops) && ops.got_message;
}
private:
@@ -306,22 +310,24 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> {
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- CallOpBuffer buf;
- buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_->PerformOps(&buf);
- call_->cq()->Pluck(&buf);
+ call_->PerformOps(&ops);
+ call_->cq()->Pluck(&ops);
}
bool Write(const W& msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
+ if (!ops.SendMessage(msg).ok()) {
+ return false;
+ }
if (!ctx_->sent_initial_metadata_) {
- buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- buf.AddSendMessage(msg);
- call_->PerformOps(&buf);
- return call_->cq()->Pluck(&buf);
+ call_->PerformOps(&ops);
+ return call_->cq()->Pluck(&ops);
}
private:
@@ -339,29 +345,31 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- CallOpBuffer buf;
- buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata> ops;
+ ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_->PerformOps(&buf);
- call_->cq()->Pluck(&buf);
+ call_->PerformOps(&ops);
+ call_->cq()->Pluck(&ops);
}
bool Read(R* msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
- buf.AddRecvMessage(msg);
- call_->PerformOps(&buf);
- return call_->cq()->Pluck(&buf) && buf.got_message;
+ CallOpSet<CallOpRecvMessage<R>> ops;
+ ops.RecvMessage(msg);
+ call_->PerformOps(&ops);
+ return call_->cq()->Pluck(&ops) && ops.got_message;
}
bool Write(const W& msg) GRPC_OVERRIDE {
- CallOpBuffer buf;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
+ if (!ops.SendMessage(msg).ok()) {
+ return false;
+ }
if (!ctx_->sent_initial_metadata_) {
- buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- buf.AddSendMessage(msg);
- call_->PerformOps(&buf);
- return call_->cq()->Pluck(&buf);
+ call_->PerformOps(&ops);
+ return call_->cq()->Pluck(&ops);
}
private:
@@ -400,57 +408,59 @@ class AsyncWriterInterface {
template <class R>
class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface,
- public AsyncReaderInterface<R> {
-};
+ public AsyncReaderInterface<R> {};
template <class R>
class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
public:
// Create a stream and write the first request out.
+ template <class W>
ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
- const grpc::protobuf::Message& request, void* tag)
+ const W& request, void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_buf_.Reset(tag);
- init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
- init_buf_.AddSendMessage(request);
- init_buf_.AddClientSendClose();
- call_.PerformOps(&init_buf_);
+ init_ops_.set_output_tag(tag);
+ init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+ // TODO(ctiller): don't assert
+ GPR_ASSERT(init_ops_.SendMessage(request).ok());
+ init_ops_.ClientSendClose();
+ call_.PerformOps(&init_ops_);
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_buf_.Reset(tag);
- meta_buf_.AddRecvInitialMetadata(context_);
- call_.PerformOps(&meta_buf_);
+ meta_ops_.set_output_tag(tag);
+ meta_ops_.RecvInitialMetadata(context_);
+ call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
- read_buf_.Reset(tag);
+ read_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- read_buf_.AddRecvInitialMetadata(context_);
+ read_ops_.RecvInitialMetadata(context_);
}
- read_buf_.AddRecvMessage(msg);
- call_.PerformOps(&read_buf_);
+ read_ops_.RecvMessage(msg);
+ call_.PerformOps(&read_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
- finish_buf_.Reset(tag);
+ finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- finish_buf_.AddRecvInitialMetadata(context_);
+ finish_ops_.RecvInitialMetadata(context_);
}
- finish_buf_.AddClientRecvStatus(context_, status);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.ClientRecvStatus(context_, status);
+ call_.PerformOps(&finish_ops_);
}
private:
ClientContext* context_;
Call call_;
- CallOpBuffer init_buf_;
- CallOpBuffer meta_buf_;
- CallOpBuffer read_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+ init_ops_;
+ CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
};
template <class W>
@@ -463,56 +473,57 @@ class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface,
template <class W>
class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
public:
+ template <class R>
ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
- grpc::protobuf::Message* response, void* tag)
- : context_(context),
- response_(response),
- call_(channel->CreateCall(method, context, cq)) {
- init_buf_.Reset(tag);
- init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
- call_.PerformOps(&init_buf_);
+ R* response, void* tag)
+ : context_(context), call_(channel->CreateCall(method, context, cq)) {
+ finish_ops_.RecvMessage(response);
+
+ init_ops_.set_output_tag(tag);
+ init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+ call_.PerformOps(&init_ops_);
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_buf_.Reset(tag);
- meta_buf_.AddRecvInitialMetadata(context_);
- call_.PerformOps(&meta_buf_);
+ meta_ops_.set_output_tag(tag);
+ meta_ops_.RecvInitialMetadata(context_);
+ call_.PerformOps(&meta_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
- write_buf_.Reset(tag);
- write_buf_.AddSendMessage(msg);
- call_.PerformOps(&write_buf_);
+ write_ops_.set_output_tag(tag);
+ // TODO(ctiller): don't assert
+ GPR_ASSERT(write_ops_.SendMessage(msg).ok());
+ call_.PerformOps(&write_ops_);
}
void WritesDone(void* tag) GRPC_OVERRIDE {
- writes_done_buf_.Reset(tag);
- writes_done_buf_.AddClientSendClose();
- call_.PerformOps(&writes_done_buf_);
+ writes_done_ops_.set_output_tag(tag);
+ writes_done_ops_.ClientSendClose();
+ call_.PerformOps(&writes_done_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
- finish_buf_.Reset(tag);
+ finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- finish_buf_.AddRecvInitialMetadata(context_);
+ finish_ops_.RecvInitialMetadata(context_);
}
- finish_buf_.AddRecvMessage(response_);
- finish_buf_.AddClientRecvStatus(context_, status);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.ClientRecvStatus(context_, status);
+ call_.PerformOps(&finish_ops_);
}
private:
ClientContext* context_;
- grpc::protobuf::Message* const response_;
Call call_;
- CallOpBuffer init_buf_;
- CallOpBuffer meta_buf_;
- CallOpBuffer write_buf_;
- CallOpBuffer writes_done_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata> init_ops_;
+ CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+ CallOpSet<CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpClientSendClose> writes_done_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
+ CallOpClientRecvStatus> finish_ops_;
};
// Client-side interface for bi-directional streaming.
@@ -532,58 +543,59 @@ class ClientAsyncReaderWriter GRPC_FINAL
const RpcMethod& method, ClientContext* context,
void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_buf_.Reset(tag);
- init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
- call_.PerformOps(&init_buf_);
+ init_ops_.set_output_tag(tag);
+ init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+ call_.PerformOps(&init_ops_);
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_buf_.Reset(tag);
- meta_buf_.AddRecvInitialMetadata(context_);
- call_.PerformOps(&meta_buf_);
+ meta_ops_.set_output_tag(tag);
+ meta_ops_.RecvInitialMetadata(context_);
+ call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
- read_buf_.Reset(tag);
+ read_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- read_buf_.AddRecvInitialMetadata(context_);
+ read_ops_.RecvInitialMetadata(context_);
}
- read_buf_.AddRecvMessage(msg);
- call_.PerformOps(&read_buf_);
+ read_ops_.RecvMessage(msg);
+ call_.PerformOps(&read_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
- write_buf_.Reset(tag);
- write_buf_.AddSendMessage(msg);
- call_.PerformOps(&write_buf_);
+ write_ops_.set_output_tag(tag);
+ // TODO(ctiller): don't assert
+ GPR_ASSERT(write_ops_.SendMessage(msg).ok());
+ call_.PerformOps(&write_ops_);
}
void WritesDone(void* tag) GRPC_OVERRIDE {
- writes_done_buf_.Reset(tag);
- writes_done_buf_.AddClientSendClose();
- call_.PerformOps(&writes_done_buf_);
+ writes_done_ops_.set_output_tag(tag);
+ writes_done_ops_.ClientSendClose();
+ call_.PerformOps(&writes_done_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
- finish_buf_.Reset(tag);
+ finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- finish_buf_.AddRecvInitialMetadata(context_);
+ finish_ops_.RecvInitialMetadata(context_);
}
- finish_buf_.AddClientRecvStatus(context_, status);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.ClientRecvStatus(context_, status);
+ call_.PerformOps(&finish_ops_);
}
private:
ClientContext* context_;
Call call_;
- CallOpBuffer init_buf_;
- CallOpBuffer meta_buf_;
- CallOpBuffer read_buf_;
- CallOpBuffer write_buf_;
- CallOpBuffer writes_done_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata> init_ops_;
+ CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
+ CallOpSet<CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpClientSendClose> writes_done_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
};
template <class W, class R>
@@ -596,41 +608,44 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- meta_buf_.Reset(tag);
- meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ meta_ops_.set_output_tag(tag);
+ meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_.PerformOps(&meta_buf_);
+ call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
- read_buf_.Reset(tag);
- read_buf_.AddRecvMessage(msg);
- call_.PerformOps(&read_buf_);
+ read_ops_.set_output_tag(tag);
+ read_ops_.RecvMessage(msg);
+ call_.PerformOps(&read_ops_);
}
void Finish(const W& msg, const Status& status, void* tag) {
- finish_buf_.Reset(tag);
+ finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (status.ok()) {
- finish_buf_.AddSendMessage(msg);
+ finish_ops_.ServerSendStatus(
+ ctx_->trailing_metadata_,
+ finish_ops_.SendMessage(msg));
+ } else {
+ finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
}
- finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
- call_.PerformOps(&finish_buf_);
+ call_.PerformOps(&finish_ops_);
}
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.ok());
- finish_buf_.Reset(tag);
+ finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_ops_);
}
private:
@@ -638,9 +653,10 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_;
ServerContext* ctx_;
- CallOpBuffer meta_buf_;
- CallOpBuffer read_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+ CallOpSet<CallOpRecvMessage<R>> read_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus> finish_ops_;
};
template <class W>
@@ -653,30 +669,31 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- meta_buf_.Reset(tag);
- meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ meta_ops_.set_output_tag(tag);
+ meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_.PerformOps(&meta_buf_);
+ call_.PerformOps(&meta_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
- write_buf_.Reset(tag);
+ write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- write_buf_.AddSendMessage(msg);
- call_.PerformOps(&write_buf_);
+ // TODO(ctiller): don't assert
+ GPR_ASSERT(write_ops_.SendMessage(msg).ok());
+ call_.PerformOps(&write_ops_);
}
void Finish(const Status& status, void* tag) {
- finish_buf_.Reset(tag);
+ finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_ops_);
}
private:
@@ -684,9 +701,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_;
ServerContext* ctx_;
- CallOpBuffer meta_buf_;
- CallOpBuffer write_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
};
// Server-side interface for bi-directional streaming.
@@ -701,36 +718,37 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- meta_buf_.Reset(tag);
- meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ meta_ops_.set_output_tag(tag);
+ meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_.PerformOps(&meta_buf_);
+ call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
- read_buf_.Reset(tag);
- read_buf_.AddRecvMessage(msg);
- call_.PerformOps(&read_buf_);
+ read_ops_.set_output_tag(tag);
+ read_ops_.RecvMessage(msg);
+ call_.PerformOps(&read_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
- write_buf_.Reset(tag);
+ write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- write_buf_.AddSendMessage(msg);
- call_.PerformOps(&write_buf_);
+ // TODO(ctiller): don't assert
+ GPR_ASSERT(write_ops_.SendMessage(msg).ok());
+ call_.PerformOps(&write_ops_);
}
void Finish(const Status& status, void* tag) {
- finish_buf_.Reset(tag);
+ finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
+ finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&finish_ops_);
}
private:
@@ -738,10 +756,10 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_;
ServerContext* ctx_;
- CallOpBuffer meta_buf_;
- CallOpBuffer read_buf_;
- CallOpBuffer write_buf_;
- CallOpBuffer finish_buf_;
+ CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+ CallOpSet<CallOpRecvMessage<R>> read_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
};
} // namespace grpc