diff options
author | Yang Gao <yangg@google.com> | 2015-03-06 16:11:16 -0800 |
---|---|---|
committer | Yang Gao <yangg@google.com> | 2015-03-06 16:11:16 -0800 |
commit | 5f4539f4e8be43c477bad8fba84c6ce6c125a120 (patch) | |
tree | 397c321e5bc0b944c2e684f89916624ea1fbf3bb /src | |
parent | df6e45c52a40aca45485cc087f11134d4714954e (diff) |
Service side should be done, it builds and existing tests pass
Diffstat (limited to 'src')
-rw-r--r-- | src/cpp/common/call.cc | 86 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 7 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 3 |
3 files changed, 64 insertions, 32 deletions
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index 6ce1e8a7d5..ebe7deec70 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -31,8 +31,10 @@ * */ -#include <grpc/support/alloc.h> #include <grpc++/impl/call.h> + +#include <grpc/support/alloc.h> +#include <grpc++/byte_buffer.h> #include <grpc++/client_context.h> #include <grpc++/channel_interface.h> @@ -48,9 +50,11 @@ CallOpBuffer::CallOpBuffer() recv_initial_metadata_(nullptr), recv_initial_metadata_arr_{0, 0, nullptr}, send_message_(nullptr), - send_message_buf_(nullptr), + send_message_buffer_(nullptr), + send_buf_(nullptr), recv_message_(nullptr), - recv_message_buf_(nullptr), + recv_message_buffer_(nullptr), + recv_buf_(nullptr), client_send_close_(false), recv_trailing_metadata_(nullptr), recv_status_(nullptr), @@ -74,18 +78,20 @@ void CallOpBuffer::Reset(void* next_return_tag) { recv_initial_metadata_ = nullptr; recv_initial_metadata_arr_.count = 0; - send_message_ = nullptr; - if (send_message_buf_) { - grpc_byte_buffer_destroy(send_message_buf_); - send_message_buf_ = nullptr; + if (send_buf_ && send_message_) { + grpc_byte_buffer_destroy(send_buf_); } + send_message_ = nullptr; + send_message_buffer_ = nullptr; + send_buf_ = nullptr; - recv_message_ = nullptr; got_message = false; - if (recv_message_buf_) { - grpc_byte_buffer_destroy(recv_message_buf_); - recv_message_buf_ = nullptr; + if (recv_buf_ && recv_message_) { + grpc_byte_buffer_destroy(recv_buf_); } + recv_message_ = nullptr; + recv_message_buffer_ = nullptr; + recv_buf_ = nullptr; client_send_close_ = false; @@ -106,11 +112,11 @@ CallOpBuffer::~CallOpBuffer() { gpr_free(status_details_); gpr_free(recv_initial_metadata_arr_.metadata); gpr_free(recv_trailing_metadata_arr_.metadata); - if (recv_message_buf_) { - grpc_byte_buffer_destroy(recv_message_buf_); + if (recv_buf_ && recv_message_) { + grpc_byte_buffer_destroy(recv_buf_); } - if (send_message_buf_) { - grpc_byte_buffer_destroy(send_message_buf_); + if (send_buf_ && send_message_) { + grpc_byte_buffer_destroy(send_buf_); } } @@ -166,11 +172,19 @@ void CallOpBuffer::AddSendMessage(const grpc::protobuf::Message& message) { send_message_ = &message; } +void CallOpBuffer::AddSendMessage(const ByteBuffer& message) { + send_message_buffer_ = &message; +} + void CallOpBuffer::AddRecvMessage(grpc::protobuf::Message* message) { recv_message_ = message; recv_message_->Clear(); } +void CallOpBuffer::AddRecvMessage(ByteBuffer* message) { + recv_message_buffer_ = message; +} + void CallOpBuffer::AddClientSendClose() { client_send_close_ = true; } void CallOpBuffer::AddServerRecvClose(bool* cancelled) { @@ -206,19 +220,23 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) { ops[*nops].data.recv_initial_metadata = &recv_initial_metadata_arr_; (*nops)++; } - if (send_message_) { - bool success = SerializeProto(*send_message_, &send_message_buf_); - if (!success) { - abort(); - // TODO handle parse failure + if (send_message_ || send_message_buffer_) { + if (send_message_) { + bool success = SerializeProto(*send_message_, &send_buf_); + if (!success) { + abort(); + // TODO handle parse failure + } + } else { + send_buf_ = send_message_buffer_->buffer(); } ops[*nops].op = GRPC_OP_SEND_MESSAGE; - ops[*nops].data.send_message = send_message_buf_; + ops[*nops].data.send_message = send_buf_; (*nops)++; } - if (recv_message_) { + if (recv_message_ || recv_message_buffer_) { ops[*nops].op = GRPC_OP_RECV_MESSAGE; - ops[*nops].data.recv_message = &recv_message_buf_; + ops[*nops].data.recv_message = &recv_buf_; (*nops)++; } if (client_send_close_) { @@ -256,9 +274,11 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) { bool CallOpBuffer::FinalizeResult(void** tag, bool* status) { // Release send buffers. - if (send_message_buf_) { - grpc_byte_buffer_destroy(send_message_buf_); - send_message_buf_ = nullptr; + if (send_buf_ && send_message_) { + if (send_message_) { + grpc_byte_buffer_destroy(send_buf_); + } + send_buf_ = nullptr; } if (initial_metadata_) { gpr_free(initial_metadata_); @@ -275,12 +295,16 @@ bool CallOpBuffer::FinalizeResult(void** tag, bool* status) { FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_); } // Parse received message if any. - if (recv_message_) { - if (recv_message_buf_) { + if (recv_message_ || recv_message_buffer_) { + if (recv_buf_) { got_message = *status; - *status = *status && DeserializeProto(recv_message_buf_, recv_message_); - grpc_byte_buffer_destroy(recv_message_buf_); - recv_message_buf_ = nullptr; + if (recv_message_) { + *status = *status && DeserializeProto(recv_buf_, recv_message_); + grpc_byte_buffer_destroy(recv_buf_); + } else { + recv_message_buffer_->set_buffer(recv_buf_); + } + recv_buf_ = nullptr; } else { // Read failed got_message = false; diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 4de1ad290e..6e66cd476c 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -371,6 +371,12 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { array_.metadata[i].value, array_.metadata[i].value + array_.metadata[i].value_length))); } + if (anonymous_ctx_) { + anonymous_ctx_->method_.assign(call_details_.method, + call_details_.method_capacity); + anonymous_ctx_->host_.assign(call_details_.host, + call_details_.host_capacity); + } } ctx->call_ = call_; Call call(call_, server_, cq_); @@ -403,6 +409,7 @@ void Server::RequestAsyncCall(void* registered_method, ServerContext* context, CompletionQueue* cq, void* tag) { new AsyncRequest(this, registered_method, context, request, stream, cq, tag); } + void Server::RequestAsyncAnonymousCall(AnonymousServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* cq, void* tag) { diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index cc6f8ca9e5..b7e5c84ef6 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -41,7 +41,8 @@ namespace grpc { -ServerBuilder::ServerBuilder() : thread_pool_(nullptr) {} +ServerBuilder::ServerBuilder() + : anonymous_service_(nullptr), thread_pool_(nullptr) {} void ServerBuilder::RegisterService(SynchronousService* service) { services_.push_back(service->service()); |