aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/core/distributed_runtime
diff options
context:
space:
mode:
authorGravatar Ayush Dubey <ayushd@google.com>2018-06-27 14:31:14 -0700
committerGravatar Gunhan Gulsoy <gunan@google.com>2018-06-28 21:37:43 -0700
commit73f9524f6c25a55edcb7881b6793eeb5e0ed315e (patch)
tree62bceaa7c8da253ab9e477ef43d74172235c3012 /tensorflow/core/distributed_runtime
parentac1619586c8fe7809c611824be93f7a10f86ac1f (diff)
Do not capture variables that may be destroyed before callback finishes.
PiperOrigin-RevId: 202370201
Diffstat (limited to 'tensorflow/core/distributed_runtime')
-rw-r--r--tensorflow/core/distributed_runtime/collective_param_resolver_distributed.cc25
1 files changed, 15 insertions, 10 deletions
diff --git a/tensorflow/core/distributed_runtime/collective_param_resolver_distributed.cc b/tensorflow/core/distributed_runtime/collective_param_resolver_distributed.cc
index 422d142f04..1dd10d309b 100644
--- a/tensorflow/core/distributed_runtime/collective_param_resolver_distributed.cc
+++ b/tensorflow/core/distributed_runtime/collective_param_resolver_distributed.cc
@@ -150,21 +150,23 @@ void CollectiveParamResolverDistributed::CompleteInstanceAsync(
for (int32 offset : request->subdiv_offset()) {
cp->instance.impl_details.subdiv_offsets.push_back(offset);
}
- VLOG(1) << "New cp " << cp << " for device " << request->device() << " : "
+ string* device = new string(request->device());
+ VLOG(1) << "New cp " << cp << " for device " << *device << " : "
<< cp->ToString();
- StatusCallback done_and_cleanup = [this, cp, done](const Status& s) {
+ StatusCallback done_and_cleanup = [this, cp, device, done](const Status& s) {
done(s);
delete cp;
+ delete device;
};
// Start by completing the group.
CompleteGroupDistributed(
- request->device(), cp, cancel_mgr,
- [this, cp, request, response, cancel_mgr, done_and_cleanup](
+ *device, cp, cancel_mgr,
+ [this, cp, device, response, cancel_mgr, done_and_cleanup](
const Status& cg_status, const GroupRec* gr) {
if (cg_status.ok()) {
// Then complete the instance.
CompleteInstanceDistributed(
- request->device(), gr, cp, cancel_mgr,
+ *device, gr, cp, cancel_mgr,
[this, gr, cp, response,
done_and_cleanup](const Status& ci_status) {
if (ci_status.ok()) {
@@ -278,16 +280,18 @@ bool CollectiveParamResolverDistributed::InstanceIsCached(int32 instance_key) {
void CollectiveParamResolverDistributed::UpdateInstanceCache(
const GroupRec* gr, CollectiveParams* cp,
const CompleteInstanceResponse& resp, const StatusCallback& done) {
- Notification note;
- InstanceRec* ir = nullptr;
+ using InstanceRecPointer = InstanceRec*;
+ InstanceRecPointer* irp = new InstanceRecPointer(nullptr);
int32 source_rank = resp.source_rank();
- auto continue_with_ir = [this, cp, &ir, source_rank, done](const Status& s) {
+ auto continue_with_ir = [this, cp, irp, source_rank, done](const Status& s) {
if (!s.ok()) {
done(s);
+ delete irp;
return;
}
Status status;
+ InstanceRec* ir = *irp;
do {
mutex_lock l(ir->out_mu);
ir->WaitForOutMu(l);
@@ -320,11 +324,12 @@ void CollectiveParamResolverDistributed::UpdateInstanceCache(
} while (false);
// Callback outside of lock.
done(status);
+ delete irp;
};
FindInstanceRec(
- gr, cp, [this, &ir, continue_with_ir](const Status s, InstanceRec* irec) {
- ir = irec;
+ gr, cp, [this, irp, continue_with_ir](const Status s, InstanceRec* irec) {
+ *irp = irec;
continue_with_ir(s);
});
}