diff options
author | ncteisen <ncteisen@gmail.com> | 2017-06-26 12:14:02 -0700 |
---|---|---|
committer | ncteisen <ncteisen@gmail.com> | 2017-07-05 10:33:21 -0700 |
commit | 95f7a517464f62a61684a45879697ad22e3aea82 (patch) | |
tree | 6206af57991e6e92390bd1e392513cfefe709e7e | |
parent | be7b82ba5ec17d33e1e7b2ebba4c5b2bb2c430ba (diff) |
Fix writelast bug
-rw-r--r-- | include/grpc++/impl/codegen/method_handler_impl.h | 3 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/server_context.h | 3 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/sync_stream.h | 21 |
3 files changed, 20 insertions, 7 deletions
diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 64c91bee22..5afa4e9f7e 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -141,6 +141,9 @@ class ServerStreamingHandler : public MethodHandler { } ops.ServerSendStatus(param.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); + if (param.server_context->has_hanging_ops_) { + param.call->cq()->Pluck(¶m.server_context->hanging_ops_); + } param.call->cq()->Pluck(&ops); } diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index 96fe645a15..d7bd7323f2 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -25,6 +25,7 @@ #include <grpc/impl/codegen/compression_types.h> +#include <grpc++/impl/codegen/call.h> #include <grpc++/impl/codegen/completion_queue_tag.h> #include <grpc++/impl/codegen/config.h> #include <grpc++/impl/codegen/create_auth_context.h> @@ -272,6 +273,8 @@ class ServerContext { uint32_t initial_metadata_flags() const { return 0; } + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> hanging_ops_; + bool has_hanging_ops_; CompletionOp* completion_op_; bool has_notify_when_done_tag_; void* async_notify_when_done_tag_; diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index 09836f340b..b7d81b3382 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -589,20 +589,27 @@ class ServerWriter final : public ServerWriterInterface<W> { if (options.is_last_message()) { options.set_buffer_hint(); } - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; - if (!ops.SendMessage(msg, options).ok()) { + if (!ctx_->hanging_ops_.SendMessage(msg, options).ok()) { return false; } if (!ctx_->sent_initial_metadata_) { - ops.SendInitialMetadata(ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); + ctx_->hanging_ops_.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { - ops.set_compression_level(ctx_->compression_level()); + ctx_->hanging_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops); + call_->PerformOps(&ctx_->hanging_ops_); + // if this is the last message we defer the pluck until AFTER we start + // the trailing md op. This prevents hangs. See + // https://github.com/grpc/grpc/issues/11546 + if (options.is_last_message()) { + ctx_->has_hanging_ops_ = true; + return true; + } + ctx_->has_hanging_ops_ = false; + return call_->cq()->Pluck(&ctx_->hanging_ops_); } private: |