aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/surface/call.c19
-rw-r--r--src/cpp/client/client_unary_call.cc4
-rw-r--r--src/cpp/common/call.cc4
-rw-r--r--src/cpp/proto/proto_utils.cc4
-rw-r--r--src/cpp/proto/proto_utils.h11
-rw-r--r--src/cpp/server/async_server_context.cc4
-rw-r--r--src/cpp/server/server.cc15
7 files changed, 35 insertions, 26 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 7cf3c0e4fd..b2033f3dc0 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -140,6 +140,8 @@ struct grpc_call {
gpr_uint8 have_alarm;
/* are we currently performing a send operation */
gpr_uint8 sending;
+ /* are we currently completing requests */
+ gpr_uint8 completing;
/* pairs with completed_requests */
gpr_uint8 num_completed_requests;
/* flag that we need to request more data */
@@ -357,7 +359,7 @@ static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
static void unlock(grpc_call *call) {
send_action sa = SEND_NOTHING;
completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
- int num_completed_requests = call->num_completed_requests;
+ int completing_requests = 0;
int need_more_data =
call->need_more_data &&
(call->write_state >= WRITE_STATE_STARTED || !call->is_client);
@@ -367,10 +369,12 @@ static void unlock(grpc_call *call) {
call->need_more_data = 0;
}
- if (num_completed_requests != 0) {
+ if (!call->completing && call->num_completed_requests != 0) {
+ completing_requests = call->num_completed_requests;
memcpy(completed_requests, call->completed_requests,
sizeof(completed_requests));
call->num_completed_requests = 0;
+ call->completing = 1;
}
if (!call->sending) {
@@ -391,9 +395,14 @@ static void unlock(grpc_call *call) {
enact_send_action(call, sa);
}
- for (i = 0; i < num_completed_requests; i++) {
- completed_requests[i].on_complete(call, completed_requests[i].status,
- completed_requests[i].user_data);
+ if (completing_requests > 0) {
+ for (i = 0; i < completing_requests; i++) {
+ completed_requests[i].on_complete(call, completed_requests[i].status,
+ completed_requests[i].user_data);
+ }
+ lock(call);
+ call->completing = 0;
+ unlock(call);
}
}
diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc
index 684b3cbadb..5c179de9d8 100644
--- a/src/cpp/client/client_unary_call.cc
+++ b/src/cpp/client/client_unary_call.cc
@@ -44,8 +44,8 @@ namespace grpc {
// Wrapper that performs a blocking unary call
Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context,
- const google::protobuf::Message &request,
- google::protobuf::Message *result) {
+ const grpc::protobuf::Message &request,
+ grpc::protobuf::Message *result) {
CompletionQueue cq;
Call call(channel->CreateCall(method, context, &cq));
CallOpBuffer buf;
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
index f3a691114d..b2b6c62785 100644
--- a/src/cpp/common/call.cc
+++ b/src/cpp/common/call.cc
@@ -163,11 +163,11 @@ void CallOpBuffer::AddSendInitialMetadata(ClientContext* ctx) {
AddSendInitialMetadata(&ctx->send_initial_metadata_);
}
-void CallOpBuffer::AddSendMessage(const google::protobuf::Message& message) {
+void CallOpBuffer::AddSendMessage(const grpc::protobuf::Message& message) {
send_message_ = &message;
}
-void CallOpBuffer::AddRecvMessage(google::protobuf::Message* message) {
+void CallOpBuffer::AddRecvMessage(grpc::protobuf::Message* message) {
recv_message_ = message;
recv_message_->Clear();
}
diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc
index 69a6bb080e..e6badd5d6e 100644
--- a/src/cpp/proto/proto_utils.cc
+++ b/src/cpp/proto/proto_utils.cc
@@ -40,7 +40,7 @@
namespace grpc {
-bool SerializeProto(const google::protobuf::Message &msg,
+bool SerializeProto(const grpc::protobuf::Message &msg,
grpc_byte_buffer **bp) {
grpc::string msg_str;
bool success = msg.SerializeToString(&msg_str);
@@ -54,7 +54,7 @@ bool SerializeProto(const google::protobuf::Message &msg,
}
bool DeserializeProto(grpc_byte_buffer *buffer,
- google::protobuf::Message *msg) {
+ grpc::protobuf::Message *msg) {
grpc::string msg_string;
grpc_byte_buffer_reader *reader = grpc_byte_buffer_reader_create(buffer);
gpr_slice slice;
diff --git a/src/cpp/proto/proto_utils.h b/src/cpp/proto/proto_utils.h
index a0af4d6465..7a1b1f8b7c 100644
--- a/src/cpp/proto/proto_utils.h
+++ b/src/cpp/proto/proto_utils.h
@@ -34,23 +34,20 @@
#ifndef GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
#define GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
+#include <grpc++/config.h>
+
struct grpc_byte_buffer;
-namespace google {
-namespace protobuf {
-class Message;
-}
-}
namespace grpc {
// Serialize the msg into a buffer created inside the function. The caller
// should destroy the returned buffer when done with it. If serialization fails,
// false is returned and buffer is left unchanged.
-bool SerializeProto(const google::protobuf::Message &msg,
+bool SerializeProto(const grpc::protobuf::Message &msg,
grpc_byte_buffer **buffer);
// The caller keeps ownership of buffer and msg.
-bool DeserializeProto(grpc_byte_buffer *buffer, google::protobuf::Message *msg);
+bool DeserializeProto(grpc_byte_buffer *buffer, grpc::protobuf::Message *msg);
} // namespace grpc
diff --git a/src/cpp/server/async_server_context.cc b/src/cpp/server/async_server_context.cc
index 5f8c2ba10f..bee75497b8 100644
--- a/src/cpp/server/async_server_context.cc
+++ b/src/cpp/server/async_server_context.cc
@@ -58,14 +58,14 @@ void AsyncServerContext::Accept(grpc_completion_queue *cq) {
call_, GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
}
-bool AsyncServerContext::StartRead(google::protobuf::Message *request) {
+bool AsyncServerContext::StartRead(grpc::protobuf::Message *request) {
GPR_ASSERT(request);
request_ = request;
grpc_call_error err = grpc_call_start_read_old(call_, this);
return err == GRPC_CALL_OK;
}
-bool AsyncServerContext::StartWrite(const google::protobuf::Message &response,
+bool AsyncServerContext::StartWrite(const grpc::protobuf::Message &response,
int flags) {
grpc_byte_buffer *buffer = nullptr;
if (!SerializeProto(response, &buffer)) {
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index ca2e62c5da..e69032a657 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -117,8 +117,8 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
}
void Run() {
- std::unique_ptr<google::protobuf::Message> req;
- std::unique_ptr<google::protobuf::Message> res;
+ std::unique_ptr<grpc::protobuf::Message> req;
+ std::unique_ptr<grpc::protobuf::Message> res;
if (has_request_payload_) {
req.reset(method_->AllocateRequestProto());
if (!DeserializeProto(request_payload_, req.get())) {
@@ -281,7 +281,7 @@ void Server::PerformOpsOnCall(CallOpBuffer* buf, Call* call) {
class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
public:
AsyncRequest(Server* server, void* registered_method, ServerContext* ctx,
- ::google::protobuf::Message* request,
+ grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream, CompletionQueue* cq,
void* tag)
: tag_(tag),
@@ -307,6 +307,7 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
*tag = tag_;
+ bool orig_status = *status;
if (*status && request_) {
if (payload_) {
*status = DeserializeProto(payload_, request_);
@@ -326,7 +327,9 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
}
ctx_->call_ = call_;
Call call(call_, server_, cq_);
- ctx_->BeginCompletionOp(&call);
+ if (orig_status && call_) {
+ ctx_->BeginCompletionOp(&call);
+ }
// just the pointers inside call are copied here
stream_->BindCall(&call);
delete this;
@@ -335,7 +338,7 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
private:
void* const tag_;
- ::google::protobuf::Message* const request_;
+ grpc::protobuf::Message* const request_;
ServerAsyncStreamingInterface* const stream_;
CompletionQueue* const cq_;
ServerContext* const ctx_;
@@ -347,7 +350,7 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
};
void Server::RequestAsyncCall(void* registered_method, ServerContext* context,
- ::google::protobuf::Message* request,
+ grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* cq, void* tag) {
new AsyncRequest(this, registered_method, context, request, stream, cq, tag);