aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Yang Gao <yangg@google.com>2015-03-06 16:11:16 -0800
committerGravatar Yang Gao <yangg@google.com>2015-03-06 16:11:16 -0800
commit5f4539f4e8be43c477bad8fba84c6ce6c125a120 (patch)
tree397c321e5bc0b944c2e684f89916624ea1fbf3bb /src
parentdf6e45c52a40aca45485cc087f11134d4714954e (diff)
Service side should be done, it builds and existing tests pass
Diffstat (limited to 'src')
-rw-r--r--src/cpp/common/call.cc86
-rw-r--r--src/cpp/server/server.cc7
-rw-r--r--src/cpp/server/server_builder.cc3
3 files changed, 64 insertions, 32 deletions
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
index 6ce1e8a7d5..ebe7deec70 100644
--- a/src/cpp/common/call.cc
+++ b/src/cpp/common/call.cc
@@ -31,8 +31,10 @@
*
*/
-#include <grpc/support/alloc.h>
#include <grpc++/impl/call.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc++/byte_buffer.h>
#include <grpc++/client_context.h>
#include <grpc++/channel_interface.h>
@@ -48,9 +50,11 @@ CallOpBuffer::CallOpBuffer()
recv_initial_metadata_(nullptr),
recv_initial_metadata_arr_{0, 0, nullptr},
send_message_(nullptr),
- send_message_buf_(nullptr),
+ send_message_buffer_(nullptr),
+ send_buf_(nullptr),
recv_message_(nullptr),
- recv_message_buf_(nullptr),
+ recv_message_buffer_(nullptr),
+ recv_buf_(nullptr),
client_send_close_(false),
recv_trailing_metadata_(nullptr),
recv_status_(nullptr),
@@ -74,18 +78,20 @@ void CallOpBuffer::Reset(void* next_return_tag) {
recv_initial_metadata_ = nullptr;
recv_initial_metadata_arr_.count = 0;
- send_message_ = nullptr;
- if (send_message_buf_) {
- grpc_byte_buffer_destroy(send_message_buf_);
- send_message_buf_ = nullptr;
+ if (send_buf_ && send_message_) {
+ grpc_byte_buffer_destroy(send_buf_);
}
+ send_message_ = nullptr;
+ send_message_buffer_ = nullptr;
+ send_buf_ = nullptr;
- recv_message_ = nullptr;
got_message = false;
- if (recv_message_buf_) {
- grpc_byte_buffer_destroy(recv_message_buf_);
- recv_message_buf_ = nullptr;
+ if (recv_buf_ && recv_message_) {
+ grpc_byte_buffer_destroy(recv_buf_);
}
+ recv_message_ = nullptr;
+ recv_message_buffer_ = nullptr;
+ recv_buf_ = nullptr;
client_send_close_ = false;
@@ -106,11 +112,11 @@ CallOpBuffer::~CallOpBuffer() {
gpr_free(status_details_);
gpr_free(recv_initial_metadata_arr_.metadata);
gpr_free(recv_trailing_metadata_arr_.metadata);
- if (recv_message_buf_) {
- grpc_byte_buffer_destroy(recv_message_buf_);
+ if (recv_buf_ && recv_message_) {
+ grpc_byte_buffer_destroy(recv_buf_);
}
- if (send_message_buf_) {
- grpc_byte_buffer_destroy(send_message_buf_);
+ if (send_buf_ && send_message_) {
+ grpc_byte_buffer_destroy(send_buf_);
}
}
@@ -166,11 +172,19 @@ void CallOpBuffer::AddSendMessage(const grpc::protobuf::Message& message) {
send_message_ = &message;
}
+void CallOpBuffer::AddSendMessage(const ByteBuffer& message) {
+ send_message_buffer_ = &message;
+}
+
void CallOpBuffer::AddRecvMessage(grpc::protobuf::Message* message) {
recv_message_ = message;
recv_message_->Clear();
}
+void CallOpBuffer::AddRecvMessage(ByteBuffer* message) {
+ recv_message_buffer_ = message;
+}
+
void CallOpBuffer::AddClientSendClose() { client_send_close_ = true; }
void CallOpBuffer::AddServerRecvClose(bool* cancelled) {
@@ -206,19 +220,23 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) {
ops[*nops].data.recv_initial_metadata = &recv_initial_metadata_arr_;
(*nops)++;
}
- if (send_message_) {
- bool success = SerializeProto(*send_message_, &send_message_buf_);
- if (!success) {
- abort();
- // TODO handle parse failure
+ if (send_message_ || send_message_buffer_) {
+ if (send_message_) {
+ bool success = SerializeProto(*send_message_, &send_buf_);
+ if (!success) {
+ abort();
+ // TODO handle parse failure
+ }
+ } else {
+ send_buf_ = send_message_buffer_->buffer();
}
ops[*nops].op = GRPC_OP_SEND_MESSAGE;
- ops[*nops].data.send_message = send_message_buf_;
+ ops[*nops].data.send_message = send_buf_;
(*nops)++;
}
- if (recv_message_) {
+ if (recv_message_ || recv_message_buffer_) {
ops[*nops].op = GRPC_OP_RECV_MESSAGE;
- ops[*nops].data.recv_message = &recv_message_buf_;
+ ops[*nops].data.recv_message = &recv_buf_;
(*nops)++;
}
if (client_send_close_) {
@@ -256,9 +274,11 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) {
bool CallOpBuffer::FinalizeResult(void** tag, bool* status) {
// Release send buffers.
- if (send_message_buf_) {
- grpc_byte_buffer_destroy(send_message_buf_);
- send_message_buf_ = nullptr;
+ if (send_buf_ && send_message_) {
+ if (send_message_) {
+ grpc_byte_buffer_destroy(send_buf_);
+ }
+ send_buf_ = nullptr;
}
if (initial_metadata_) {
gpr_free(initial_metadata_);
@@ -275,12 +295,16 @@ bool CallOpBuffer::FinalizeResult(void** tag, bool* status) {
FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_);
}
// Parse received message if any.
- if (recv_message_) {
- if (recv_message_buf_) {
+ if (recv_message_ || recv_message_buffer_) {
+ if (recv_buf_) {
got_message = *status;
- *status = *status && DeserializeProto(recv_message_buf_, recv_message_);
- grpc_byte_buffer_destroy(recv_message_buf_);
- recv_message_buf_ = nullptr;
+ if (recv_message_) {
+ *status = *status && DeserializeProto(recv_buf_, recv_message_);
+ grpc_byte_buffer_destroy(recv_buf_);
+ } else {
+ recv_message_buffer_->set_buffer(recv_buf_);
+ }
+ recv_buf_ = nullptr;
} else {
// Read failed
got_message = false;
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 4de1ad290e..6e66cd476c 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -371,6 +371,12 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
array_.metadata[i].value,
array_.metadata[i].value + array_.metadata[i].value_length)));
}
+ if (anonymous_ctx_) {
+ anonymous_ctx_->method_.assign(call_details_.method,
+ call_details_.method_capacity);
+ anonymous_ctx_->host_.assign(call_details_.host,
+ call_details_.host_capacity);
+ }
}
ctx->call_ = call_;
Call call(call_, server_, cq_);
@@ -403,6 +409,7 @@ void Server::RequestAsyncCall(void* registered_method, ServerContext* context,
CompletionQueue* cq, void* tag) {
new AsyncRequest(this, registered_method, context, request, stream, cq, tag);
}
+
void Server::RequestAsyncAnonymousCall(AnonymousServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* cq, void* tag) {
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index cc6f8ca9e5..b7e5c84ef6 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -41,7 +41,8 @@
namespace grpc {
-ServerBuilder::ServerBuilder() : thread_pool_(nullptr) {}
+ServerBuilder::ServerBuilder()
+ : anonymous_service_(nullptr), thread_pool_(nullptr) {}
void ServerBuilder::RegisterService(SynchronousService* service) {
services_.push_back(service->service());