From 73f9524f6c25a55edcb7881b6793eeb5e0ed315e Mon Sep 17 00:00:00 2001 From: Ayush Dubey Date: Wed, 27 Jun 2018 14:31:14 -0700 Subject: Do not capture variables that may be destroyed before callback finishes. PiperOrigin-RevId: 202370201 --- .../collective_param_resolver_distributed.cc | 25 +++++++++++++--------- 1 file changed, 15 insertions(+), 10 deletions(-) (limited to 'tensorflow/core/distributed_runtime/collective_param_resolver_distributed.cc') diff --git a/tensorflow/core/distributed_runtime/collective_param_resolver_distributed.cc b/tensorflow/core/distributed_runtime/collective_param_resolver_distributed.cc index 422d142f04..1dd10d309b 100644 --- a/tensorflow/core/distributed_runtime/collective_param_resolver_distributed.cc +++ b/tensorflow/core/distributed_runtime/collective_param_resolver_distributed.cc @@ -150,21 +150,23 @@ void CollectiveParamResolverDistributed::CompleteInstanceAsync( for (int32 offset : request->subdiv_offset()) { cp->instance.impl_details.subdiv_offsets.push_back(offset); } - VLOG(1) << "New cp " << cp << " for device " << request->device() << " : " + string* device = new string(request->device()); + VLOG(1) << "New cp " << cp << " for device " << *device << " : " << cp->ToString(); - StatusCallback done_and_cleanup = [this, cp, done](const Status& s) { + StatusCallback done_and_cleanup = [this, cp, device, done](const Status& s) { done(s); delete cp; + delete device; }; // Start by completing the group. CompleteGroupDistributed( - request->device(), cp, cancel_mgr, - [this, cp, request, response, cancel_mgr, done_and_cleanup]( + *device, cp, cancel_mgr, + [this, cp, device, response, cancel_mgr, done_and_cleanup]( const Status& cg_status, const GroupRec* gr) { if (cg_status.ok()) { // Then complete the instance. CompleteInstanceDistributed( - request->device(), gr, cp, cancel_mgr, + *device, gr, cp, cancel_mgr, [this, gr, cp, response, done_and_cleanup](const Status& ci_status) { if (ci_status.ok()) { @@ -278,16 +280,18 @@ bool CollectiveParamResolverDistributed::InstanceIsCached(int32 instance_key) { void CollectiveParamResolverDistributed::UpdateInstanceCache( const GroupRec* gr, CollectiveParams* cp, const CompleteInstanceResponse& resp, const StatusCallback& done) { - Notification note; - InstanceRec* ir = nullptr; + using InstanceRecPointer = InstanceRec*; + InstanceRecPointer* irp = new InstanceRecPointer(nullptr); int32 source_rank = resp.source_rank(); - auto continue_with_ir = [this, cp, &ir, source_rank, done](const Status& s) { + auto continue_with_ir = [this, cp, irp, source_rank, done](const Status& s) { if (!s.ok()) { done(s); + delete irp; return; } Status status; + InstanceRec* ir = *irp; do { mutex_lock l(ir->out_mu); ir->WaitForOutMu(l); @@ -320,11 +324,12 @@ void CollectiveParamResolverDistributed::UpdateInstanceCache( } while (false); // Callback outside of lock. done(status); + delete irp; }; FindInstanceRec( - gr, cp, [this, &ir, continue_with_ir](const Status s, InstanceRec* irec) { - ir = irec; + gr, cp, [this, irp, continue_with_ir](const Status s, InstanceRec* irec) { + *irp = irec; continue_with_ir(s); }); } -- cgit v1.2.3