diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/surface/call.c | 19 | ||||
-rw-r--r-- | src/cpp/client/client_unary_call.cc | 4 | ||||
-rw-r--r-- | src/cpp/common/call.cc | 4 | ||||
-rw-r--r-- | src/cpp/proto/proto_utils.cc | 4 | ||||
-rw-r--r-- | src/cpp/proto/proto_utils.h | 11 | ||||
-rw-r--r-- | src/cpp/server/async_server_context.cc | 4 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 15 |
7 files changed, 35 insertions, 26 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 7cf3c0e4fd..b2033f3dc0 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -140,6 +140,8 @@ struct grpc_call { gpr_uint8 have_alarm; /* are we currently performing a send operation */ gpr_uint8 sending; + /* are we currently completing requests */ + gpr_uint8 completing; /* pairs with completed_requests */ gpr_uint8 num_completed_requests; /* flag that we need to request more data */ @@ -357,7 +359,7 @@ static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } static void unlock(grpc_call *call) { send_action sa = SEND_NOTHING; completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; - int num_completed_requests = call->num_completed_requests; + int completing_requests = 0; int need_more_data = call->need_more_data && (call->write_state >= WRITE_STATE_STARTED || !call->is_client); @@ -367,10 +369,12 @@ static void unlock(grpc_call *call) { call->need_more_data = 0; } - if (num_completed_requests != 0) { + if (!call->completing && call->num_completed_requests != 0) { + completing_requests = call->num_completed_requests; memcpy(completed_requests, call->completed_requests, sizeof(completed_requests)); call->num_completed_requests = 0; + call->completing = 1; } if (!call->sending) { @@ -391,9 +395,14 @@ static void unlock(grpc_call *call) { enact_send_action(call, sa); } - for (i = 0; i < num_completed_requests; i++) { - completed_requests[i].on_complete(call, completed_requests[i].status, - completed_requests[i].user_data); + if (completing_requests > 0) { + for (i = 0; i < completing_requests; i++) { + completed_requests[i].on_complete(call, completed_requests[i].status, + completed_requests[i].user_data); + } + lock(call); + call->completing = 0; + unlock(call); } } diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc index 684b3cbadb..5c179de9d8 100644 --- a/src/cpp/client/client_unary_call.cc +++ b/src/cpp/client/client_unary_call.cc @@ -44,8 +44,8 @@ namespace grpc { // Wrapper that performs a blocking unary call Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, - const google::protobuf::Message &request, - google::protobuf::Message *result) { + const grpc::protobuf::Message &request, + grpc::protobuf::Message *result) { CompletionQueue cq; Call call(channel->CreateCall(method, context, &cq)); CallOpBuffer buf; diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index f3a691114d..b2b6c62785 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -163,11 +163,11 @@ void CallOpBuffer::AddSendInitialMetadata(ClientContext* ctx) { AddSendInitialMetadata(&ctx->send_initial_metadata_); } -void CallOpBuffer::AddSendMessage(const google::protobuf::Message& message) { +void CallOpBuffer::AddSendMessage(const grpc::protobuf::Message& message) { send_message_ = &message; } -void CallOpBuffer::AddRecvMessage(google::protobuf::Message* message) { +void CallOpBuffer::AddRecvMessage(grpc::protobuf::Message* message) { recv_message_ = message; recv_message_->Clear(); } diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc index 69a6bb080e..e6badd5d6e 100644 --- a/src/cpp/proto/proto_utils.cc +++ b/src/cpp/proto/proto_utils.cc @@ -40,7 +40,7 @@ namespace grpc { -bool SerializeProto(const google::protobuf::Message &msg, +bool SerializeProto(const grpc::protobuf::Message &msg, grpc_byte_buffer **bp) { grpc::string msg_str; bool success = msg.SerializeToString(&msg_str); @@ -54,7 +54,7 @@ bool SerializeProto(const google::protobuf::Message &msg, } bool DeserializeProto(grpc_byte_buffer *buffer, - google::protobuf::Message *msg) { + grpc::protobuf::Message *msg) { grpc::string msg_string; grpc_byte_buffer_reader *reader = grpc_byte_buffer_reader_create(buffer); gpr_slice slice; diff --git a/src/cpp/proto/proto_utils.h b/src/cpp/proto/proto_utils.h index a0af4d6465..7a1b1f8b7c 100644 --- a/src/cpp/proto/proto_utils.h +++ b/src/cpp/proto/proto_utils.h @@ -34,23 +34,20 @@ #ifndef GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H #define GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H +#include <grpc++/config.h> + struct grpc_byte_buffer; -namespace google { -namespace protobuf { -class Message; -} -} namespace grpc { // Serialize the msg into a buffer created inside the function. The caller // should destroy the returned buffer when done with it. If serialization fails, // false is returned and buffer is left unchanged. -bool SerializeProto(const google::protobuf::Message &msg, +bool SerializeProto(const grpc::protobuf::Message &msg, grpc_byte_buffer **buffer); // The caller keeps ownership of buffer and msg. -bool DeserializeProto(grpc_byte_buffer *buffer, google::protobuf::Message *msg); +bool DeserializeProto(grpc_byte_buffer *buffer, grpc::protobuf::Message *msg); } // namespace grpc diff --git a/src/cpp/server/async_server_context.cc b/src/cpp/server/async_server_context.cc index 5f8c2ba10f..bee75497b8 100644 --- a/src/cpp/server/async_server_context.cc +++ b/src/cpp/server/async_server_context.cc @@ -58,14 +58,14 @@ void AsyncServerContext::Accept(grpc_completion_queue *cq) { call_, GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK); } -bool AsyncServerContext::StartRead(google::protobuf::Message *request) { +bool AsyncServerContext::StartRead(grpc::protobuf::Message *request) { GPR_ASSERT(request); request_ = request; grpc_call_error err = grpc_call_start_read_old(call_, this); return err == GRPC_CALL_OK; } -bool AsyncServerContext::StartWrite(const google::protobuf::Message &response, +bool AsyncServerContext::StartWrite(const grpc::protobuf::Message &response, int flags) { grpc_byte_buffer *buffer = nullptr; if (!SerializeProto(response, &buffer)) { diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index ca2e62c5da..e69032a657 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -117,8 +117,8 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { } void Run() { - std::unique_ptr<google::protobuf::Message> req; - std::unique_ptr<google::protobuf::Message> res; + std::unique_ptr<grpc::protobuf::Message> req; + std::unique_ptr<grpc::protobuf::Message> res; if (has_request_payload_) { req.reset(method_->AllocateRequestProto()); if (!DeserializeProto(request_payload_, req.get())) { @@ -281,7 +281,7 @@ void Server::PerformOpsOnCall(CallOpBuffer* buf, Call* call) { class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { public: AsyncRequest(Server* server, void* registered_method, ServerContext* ctx, - ::google::protobuf::Message* request, + grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, CompletionQueue* cq, void* tag) : tag_(tag), @@ -307,6 +307,7 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { *tag = tag_; + bool orig_status = *status; if (*status && request_) { if (payload_) { *status = DeserializeProto(payload_, request_); @@ -326,7 +327,9 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { } ctx_->call_ = call_; Call call(call_, server_, cq_); - ctx_->BeginCompletionOp(&call); + if (orig_status && call_) { + ctx_->BeginCompletionOp(&call); + } // just the pointers inside call are copied here stream_->BindCall(&call); delete this; @@ -335,7 +338,7 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { private: void* const tag_; - ::google::protobuf::Message* const request_; + grpc::protobuf::Message* const request_; ServerAsyncStreamingInterface* const stream_; CompletionQueue* const cq_; ServerContext* const ctx_; @@ -347,7 +350,7 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { }; void Server::RequestAsyncCall(void* registered_method, ServerContext* context, - ::google::protobuf::Message* request, + grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, CompletionQueue* cq, void* tag) { new AsyncRequest(this, registered_method, context, request, stream, cq, tag); |