diff options
Diffstat (limited to 'include/grpcpp/impl')
-rw-r--r-- | include/grpcpp/impl/codegen/call_op_set.h | 108 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/client_callback.h | 12 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/client_unary_call.h | 2 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/interceptor.h | 31 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/interceptor_common.h | 88 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/method_handler_impl.h | 4 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/server_callback.h | 12 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/server_interface.h | 2 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/sync_stream.h | 10 |
9 files changed, 222 insertions, 47 deletions
diff --git a/include/grpcpp/impl/codegen/call_op_set.h b/include/grpcpp/impl/codegen/call_op_set.h index b2100c68b7..c0de5ed602 100644 --- a/include/grpcpp/impl/codegen/call_op_set.h +++ b/include/grpcpp/impl/codegen/call_op_set.h @@ -303,9 +303,29 @@ class CallOpSendMessage { template <class M> Status SendMessage(const M& message) GRPC_MUST_USE_RESULT; + /// Send \a message using \a options for the write. The \a options are cleared + /// after use. This form of SendMessage allows gRPC to reference \a message + /// beyond the lifetime of SendMessage. + template <class M> + Status SendMessagePtr(const M* message, + WriteOptions options) GRPC_MUST_USE_RESULT; + + /// This form of SendMessage allows gRPC to reference \a message beyond the + /// lifetime of SendMessage. + template <class M> + Status SendMessagePtr(const M* message) GRPC_MUST_USE_RESULT; + protected: void AddOp(grpc_op* ops, size_t* nops) { - if (!send_buf_.Valid() || hijacked_) return; + if (msg_ == nullptr && !send_buf_.Valid()) return; + if (hijacked_) { + serializer_ = nullptr; + return; + } + if (msg_ != nullptr) { + GPR_CODEGEN_ASSERT(serializer_(msg_).ok()); + } + serializer_ = nullptr; grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_SEND_MESSAGE; op->flags = write_options_.flags(); @@ -314,21 +334,38 @@ class CallOpSendMessage { // Flags are per-message: clear them after use. write_options_.Clear(); } - void FinishOp(bool* status) { send_buf_.Clear(); } + void FinishOp(bool* status) { + if (msg_ == nullptr && !send_buf_.Valid()) return; + if (hijacked_ && failed_send_) { + // Hijacking interceptor failed this Op + *status = false; + } else if (!*status) { + // This Op was passed down to core and the Op failed + failed_send_ = true; + } + } void SetInterceptionHookPoint( InterceptorBatchMethodsImpl* interceptor_methods) { - if (!send_buf_.Valid()) return; + if (msg_ == nullptr && !send_buf_.Valid()) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_MESSAGE); - interceptor_methods->SetSendMessage(&send_buf_); + interceptor_methods->SetSendMessage(&send_buf_, &msg_, &failed_send_, + serializer_); } void SetFinishInterceptionHookPoint( InterceptorBatchMethodsImpl* interceptor_methods) { + if (msg_ != nullptr || send_buf_.Valid()) { + interceptor_methods->AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_SEND_MESSAGE); + } + send_buf_.Clear(); + msg_ = nullptr; // The contents of the SendMessage value that was previously set // has had its references stolen by core's operations - interceptor_methods->SetSendMessage(nullptr); + interceptor_methods->SetSendMessage(nullptr, nullptr, &failed_send_, + nullptr); } void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { @@ -336,25 +373,38 @@ class CallOpSendMessage { } private: + const void* msg_ = nullptr; // The original non-serialized message bool hijacked_ = false; + bool failed_send_ = false; ByteBuffer send_buf_; WriteOptions write_options_; + std::function<Status(const void*)> serializer_; }; template <class M> Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) { write_options_ = options; - bool own_buf; - // TODO(vjpai): Remove the void below when possible - // The void in the template parameter below should not be needed - // (since it should be implicit) but is needed due to an observed - // difference in behavior between clang and gcc for certain internal users - Status result = SerializationTraits<M, void>::Serialize( - message, send_buf_.bbuf_ptr(), &own_buf); - if (!own_buf) { - send_buf_.Duplicate(); - } - return result; + serializer_ = [this](const void* message) { + bool own_buf; + send_buf_.Clear(); + // TODO(vjpai): Remove the void below when possible + // The void in the template parameter below should not be needed + // (since it should be implicit) but is needed due to an observed + // difference in behavior between clang and gcc for certain internal users + Status result = SerializationTraits<M, void>::Serialize( + *static_cast<const M*>(message), send_buf_.bbuf_ptr(), &own_buf); + if (!own_buf) { + send_buf_.Duplicate(); + } + return result; + }; + // Serialize immediately only if we do not have access to the message pointer + if (msg_ == nullptr) { + Status result = serializer_(&message); + serializer_ = nullptr; + return result; + } + return Status(); } template <class M> @@ -362,6 +412,19 @@ Status CallOpSendMessage::SendMessage(const M& message) { return SendMessage(message, WriteOptions()); } +template <class M> +Status CallOpSendMessage::SendMessagePtr(const M* message, + WriteOptions options) { + msg_ = message; + return SendMessage(*message, options); +} + +template <class M> +Status CallOpSendMessage::SendMessagePtr(const M* message) { + msg_ = message; + return SendMessage(*message, WriteOptions()); +} + template <class R> class CallOpRecvMessage { public: @@ -410,14 +473,16 @@ class CallOpRecvMessage { void SetInterceptionHookPoint( InterceptorBatchMethodsImpl* interceptor_methods) { - interceptor_methods->SetRecvMessage(message_); + if (message_ == nullptr) return; + interceptor_methods->SetRecvMessage(message_, &got_message); } void SetFinishInterceptionHookPoint( InterceptorBatchMethodsImpl* interceptor_methods) { - if (!got_message) return; + if (message_ == nullptr) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr); } void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; @@ -505,20 +570,23 @@ class CallOpGenericRecvMessage { void SetInterceptionHookPoint( InterceptorBatchMethodsImpl* interceptor_methods) { - interceptor_methods->SetRecvMessage(message_); + if (!deserialize_) return; + interceptor_methods->SetRecvMessage(message_, &got_message); } void SetFinishInterceptionHookPoint( InterceptorBatchMethodsImpl* interceptor_methods) { - if (!got_message) return; + if (!deserialize_) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr); } void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; if (!deserialize_) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_RECV_MESSAGE); + got_message = true; } private: diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h index 66cf9b7754..52bcea9970 100644 --- a/include/grpcpp/impl/codegen/client_callback.h +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -73,7 +73,7 @@ class CallbackUnaryCallImpl { CallbackWithStatusTag(call.call(), on_completion, ops); // TODO(vjpai): Unify code with sync API as much as possible - Status s = ops->SendMessage(*request); + Status s = ops->SendMessagePtr(request); if (!s.ok()) { tag->force_run(s); return; @@ -340,13 +340,13 @@ class ClientCallbackReaderWriterImpl context_->initial_metadata_flags()); start_corked_ = false; } - // TODO(vjpai): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*msg).ok()); if (options.is_last_message()) { options.set_buffer_hint(); write_ops_.ClientSendClose(); } + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok()); callbacks_outstanding_++; if (started_) { call_.PerformOps(&write_ops_); @@ -524,7 +524,7 @@ class ClientCallbackReaderImpl : context_(context), call_(call), reactor_(reactor) { this->BindReactor(reactor); // TODO(vjpai): don't assert - GPR_CODEGEN_ASSERT(start_ops_.SendMessage(*request).ok()); + GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok()); start_ops_.ClientSendClose(); } @@ -649,13 +649,13 @@ class ClientCallbackWriterImpl context_->initial_metadata_flags()); start_corked_ = false; } - // TODO(vjpai): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*msg).ok()); if (options.is_last_message()) { options.set_buffer_hint(); write_ops_.ClientSendClose(); } + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok()); callbacks_outstanding_++; if (started_) { call_.PerformOps(&write_ops_); diff --git a/include/grpcpp/impl/codegen/client_unary_call.h b/include/grpcpp/impl/codegen/client_unary_call.h index 5151839412..b9f8e1663f 100644 --- a/include/grpcpp/impl/codegen/client_unary_call.h +++ b/include/grpcpp/impl/codegen/client_unary_call.h @@ -57,7 +57,7 @@ class BlockingUnaryCallImpl { CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>, CallOpClientSendClose, CallOpClientRecvStatus> ops; - status_ = ops.SendMessage(request); + status_ = ops.SendMessagePtr(&request); if (!status_.ok()) { return; } diff --git a/include/grpcpp/impl/codegen/interceptor.h b/include/grpcpp/impl/codegen/interceptor.h index 46175cd73b..5dea796a3b 100644 --- a/include/grpcpp/impl/codegen/interceptor.h +++ b/include/grpcpp/impl/codegen/interceptor.h @@ -46,9 +46,10 @@ namespace experimental { /// operation has been requested and it is available. POST_RECV means that a /// result is available but has not yet been passed back to the application. enum class InterceptionHookPoints { - /// The first two in this list are for clients and servers + /// The first three in this list are for clients and servers PRE_SEND_INITIAL_METADATA, PRE_SEND_MESSAGE, + POST_SEND_MESSAGE, PRE_SEND_STATUS, // server only PRE_SEND_CLOSE, // client only: WritesDone for stream; after write in unary /// The following three are for hijacked clients only and can only be @@ -109,7 +110,24 @@ class InterceptorBatchMethods { /// Returns a modifable ByteBuffer holding the serialized form of the message /// that is going to be sent. Valid for PRE_SEND_MESSAGE interceptions. /// A return value of nullptr indicates that this ByteBuffer is not valid. - virtual ByteBuffer* GetSendMessage() = 0; + virtual ByteBuffer* GetSerializedSendMessage() = 0; + + /// Returns a non-modifiable pointer to the non-serialized form of the message + /// to be sent. Valid for PRE_SEND_MESSAGE interceptions. A return value of + /// nullptr indicates that this field is not valid. Also note that this is + /// only supported for sync and callback APIs at the present moment. + virtual const void* GetSendMessage() = 0; + + /// Overwrites the message to be sent with \a message. \a message should be in + /// the non-serialized form expected by the method. Valid for PRE_SEND_MESSAGE + /// interceptions. Note that the interceptor is responsible for maintaining + /// the life of the message for the duration on the send operation, i.e., till + /// POST_SEND_MESSAGE. + virtual void ModifySendMessage(const void* message) = 0; + + /// Checks whether the SEND MESSAGE op succeeded. Valid for POST_SEND_MESSAGE + /// interceptions. + virtual bool GetSendMessageStatus() = 0; /// Returns a modifiable multimap of the initial metadata to be sent. Valid /// for PRE_SEND_INITIAL_METADATA interceptions. A value of nullptr indicates @@ -156,6 +174,15 @@ class InterceptorBatchMethods { /// started from interceptors without infinite regress through the interceptor /// list. virtual std::unique_ptr<ChannelInterface> GetInterceptedChannel() = 0; + + /// On a hijacked RPC, an interceptor can decide to fail a PRE_RECV_MESSAGE + /// op. This would be a signal to the reader that there will be no more + /// messages, or the stream has failed or been cancelled. + virtual void FailHijackedRecvMessage() = 0; + + /// On a hijacked RPC/ to-be hijacked RPC, this can be called to fail a SEND + /// MESSAGE op + virtual void FailHijackedSendMessage() = 0; }; /// Interface for an interceptor. Interceptor authors must create a class diff --git a/include/grpcpp/impl/codegen/interceptor_common.h b/include/grpcpp/impl/codegen/interceptor_common.h index d0aa23cb0a..09721343ff 100644 --- a/include/grpcpp/impl/codegen/interceptor_common.h +++ b/include/grpcpp/impl/codegen/interceptor_common.h @@ -79,7 +79,26 @@ class InterceptorBatchMethodsImpl hooks_[static_cast<size_t>(type)] = true; } - ByteBuffer* GetSendMessage() override { return send_message_; } + ByteBuffer* GetSerializedSendMessage() override { + GPR_CODEGEN_ASSERT(orig_send_message_ != nullptr); + if (*orig_send_message_ != nullptr) { + GPR_CODEGEN_ASSERT(serializer_(*orig_send_message_).ok()); + *orig_send_message_ = nullptr; + } + return send_message_; + } + + const void* GetSendMessage() override { + GPR_CODEGEN_ASSERT(orig_send_message_ != nullptr); + return *orig_send_message_; + } + + void ModifySendMessage(const void* message) override { + GPR_CODEGEN_ASSERT(orig_send_message_ != nullptr); + *orig_send_message_ = message; + } + + bool GetSendMessageStatus() override { return !*fail_send_message_; } std::multimap<grpc::string, grpc::string>* GetSendInitialMetadata() override { return send_initial_metadata_; @@ -110,12 +129,25 @@ class InterceptorBatchMethodsImpl Status* GetRecvStatus() override { return recv_status_; } + void FailHijackedSendMessage() override { + GPR_CODEGEN_ASSERT(hooks_[static_cast<size_t>( + experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)]); + *fail_send_message_ = true; + } + std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvTrailingMetadata() override { return recv_trailing_metadata_->map(); } - void SetSendMessage(ByteBuffer* buf) { send_message_ = buf; } + void SetSendMessage(ByteBuffer* buf, const void** msg, + bool* fail_send_message, + std::function<Status(const void*)> serializer) { + send_message_ = buf; + orig_send_message_ = msg; + fail_send_message_ = fail_send_message; + serializer_ = serializer; + } void SetSendInitialMetadata( std::multimap<grpc::string, grpc::string>* metadata) { @@ -134,7 +166,10 @@ class InterceptorBatchMethodsImpl send_trailing_metadata_ = metadata; } - void SetRecvMessage(void* message) { recv_message_ = message; } + void SetRecvMessage(void* message, bool* got_message) { + recv_message_ = message; + got_message_ = got_message; + } void SetRecvInitialMetadata(MetadataMap* map) { recv_initial_metadata_ = map; @@ -157,6 +192,12 @@ class InterceptorBatchMethodsImpl info->channel(), current_interceptor_index_ + 1)); } + void FailHijackedRecvMessage() override { + GPR_CODEGEN_ASSERT(hooks_[static_cast<size_t>( + experimental::InterceptionHookPoints::PRE_RECV_MESSAGE)]); + *got_message_ = false; + } + // Clears all state void ClearState() { reverse_ = false; @@ -334,6 +375,9 @@ class InterceptorBatchMethodsImpl std::function<void(void)> callback_; ByteBuffer* send_message_ = nullptr; + bool* fail_send_message_ = nullptr; + const void** orig_send_message_ = nullptr; + std::function<Status(const void*)> serializer_; std::multimap<grpc::string, grpc::string>* send_initial_metadata_; @@ -345,6 +389,7 @@ class InterceptorBatchMethodsImpl std::multimap<grpc::string, grpc::string>* send_trailing_metadata_ = nullptr; void* recv_message_ = nullptr; + bool* got_message_ = nullptr; MetadataMap* recv_initial_metadata_ = nullptr; @@ -379,13 +424,36 @@ class CancelInterceptorBatchMethods "Cancel notification"); } - ByteBuffer* GetSendMessage() override { + ByteBuffer* GetSerializedSendMessage() override { GPR_CODEGEN_ASSERT(false && "It is illegal to call GetSendMessage on a method which " "has a Cancel notification"); return nullptr; } + bool GetSendMessageStatus() override { + GPR_CODEGEN_ASSERT( + false && + "It is illegal to call GetSendMessageStatus on a method which " + "has a Cancel notification"); + return false; + } + + const void* GetSendMessage() override { + GPR_CODEGEN_ASSERT( + false && + "It is illegal to call GetOriginalSendMessage on a method which " + "has a Cancel notification"); + return nullptr; + } + + void ModifySendMessage(const void* message) override { + GPR_CODEGEN_ASSERT( + false && + "It is illegal to call ModifySendMessage on a method which " + "has a Cancel notification"); + } + std::multimap<grpc::string, grpc::string>* GetSendInitialMetadata() override { GPR_CODEGEN_ASSERT(false && "It is illegal to call GetSendInitialMetadata on a " @@ -451,6 +519,18 @@ class CancelInterceptorBatchMethods "method which has a Cancel notification"); return std::unique_ptr<ChannelInterface>(nullptr); } + + void FailHijackedRecvMessage() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call FailHijackedRecvMessage on a " + "method which has a Cancel notification"); + } + + void FailHijackedSendMessage() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call FailHijackedSendMessage on a " + "method which has a Cancel notification"); + } }; } // namespace internal } // namespace grpc diff --git a/include/grpcpp/impl/codegen/method_handler_impl.h b/include/grpcpp/impl/codegen/method_handler_impl.h index dd53f975f6..094286294c 100644 --- a/include/grpcpp/impl/codegen/method_handler_impl.h +++ b/include/grpcpp/impl/codegen/method_handler_impl.h @@ -79,7 +79,7 @@ class RpcMethodHandler : public MethodHandler { ops.set_compression_level(param.server_context->compression_level()); } if (status.ok()) { - status = ops.SendMessage(rsp); + status = ops.SendMessagePtr(&rsp); } ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); @@ -139,7 +139,7 @@ class ClientStreamingHandler : public MethodHandler { } } if (status.ok()) { - status = ops.SendMessage(rsp); + status = ops.SendMessagePtr(&rsp); } ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); diff --git a/include/grpcpp/impl/codegen/server_callback.h b/include/grpcpp/impl/codegen/server_callback.h index 1854f6ef2f..a0e59215dd 100644 --- a/include/grpcpp/impl/codegen/server_callback.h +++ b/include/grpcpp/impl/codegen/server_callback.h @@ -320,7 +320,7 @@ class CallbackUnaryHandler : public MethodHandler { // The response is dropped if the status is not OK. if (s.ok()) { finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, - finish_ops_.SendMessage(resp_)); + finish_ops_.SendMessagePtr(&resp_)); } else { finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); } @@ -449,7 +449,7 @@ class CallbackClientStreamingHandler : public MethodHandler { // The response is dropped if the status is not OK. if (s.ok()) { finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, - finish_ops_.SendMessage(resp_)); + finish_ops_.SendMessagePtr(&resp_)); } else { finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); } @@ -642,7 +642,7 @@ class CallbackServerStreamingHandler : public MethodHandler { ctx_->sent_initial_metadata_ = true; } // TODO(vjpai): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*resp, options).ok()); + GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); call_.PerformOps(&write_ops_); } @@ -652,7 +652,7 @@ class CallbackServerStreamingHandler : public MethodHandler { // Don't send any message if the status is bad if (s.ok()) { // TODO(vjpai): don't assert - GPR_CODEGEN_ASSERT(finish_ops_.SendMessage(*resp, options).ok()); + GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok()); } Finish(std::move(s)); } @@ -804,7 +804,7 @@ class CallbackBidiHandler : public MethodHandler { ctx_->sent_initial_metadata_ = true; } // TODO(vjpai): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*resp, options).ok()); + GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); call_.PerformOps(&write_ops_); } @@ -813,7 +813,7 @@ class CallbackBidiHandler : public MethodHandler { // Don't send any message if the status is bad if (s.ok()) { // TODO(vjpai): don't assert - GPR_CODEGEN_ASSERT(finish_ops_.SendMessage(*resp, options).ok()); + GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok()); } Finish(std::move(s)); } diff --git a/include/grpcpp/impl/codegen/server_interface.h b/include/grpcpp/impl/codegen/server_interface.h index e0e2629827..890a5650d0 100644 --- a/include/grpcpp/impl/codegen/server_interface.h +++ b/include/grpcpp/impl/codegen/server_interface.h @@ -272,7 +272,7 @@ class ServerInterface : public internal::CallHook { /* Set interception point for recv message */ interceptor_methods_.AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_MESSAGE); - interceptor_methods_.SetRecvMessage(request_); + interceptor_methods_.SetRecvMessage(request_, nullptr); return RegisteredAsyncRequest::FinalizeResult(tag, status); } diff --git a/include/grpcpp/impl/codegen/sync_stream.h b/include/grpcpp/impl/codegen/sync_stream.h index 6981076f04..d9edad4215 100644 --- a/include/grpcpp/impl/codegen/sync_stream.h +++ b/include/grpcpp/impl/codegen/sync_stream.h @@ -253,7 +253,7 @@ class ClientReader final : public ClientReaderInterface<R> { ops.SendInitialMetadata(&context->send_initial_metadata_, context->initial_metadata_flags()); // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok()); + GPR_CODEGEN_ASSERT(ops.SendMessagePtr(&request).ok()); ops.ClientSendClose(); call_.PerformOps(&ops); cq_.Pluck(&ops); @@ -331,7 +331,7 @@ class ClientWriter : public ClientWriterInterface<W> { context_->initial_metadata_flags()); context_->set_initial_metadata_corked(false); } - if (!ops.SendMessage(msg, options).ok()) { + if (!ops.SendMessagePtr(&msg, options).ok()) { return false; } @@ -502,7 +502,7 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { context_->initial_metadata_flags()); context_->set_initial_metadata_corked(false); } - if (!ops.SendMessage(msg, options).ok()) { + if (!ops.SendMessagePtr(&msg, options).ok()) { return false; } @@ -656,7 +656,7 @@ class ServerWriter final : public ServerWriterInterface<W> { options.set_buffer_hint(); } - if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) { + if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) { return false; } if (!ctx_->sent_initial_metadata_) { @@ -734,7 +734,7 @@ class ServerReaderWriterBody final { if (options.is_last_message()) { options.set_buffer_hint(); } - if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) { + if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) { return false; } if (!ctx_->sent_initial_metadata_) { |