diff options
author | 2018-01-03 16:36:42 -0800 | |
---|---|---|
committer | 2018-01-03 16:40:17 -0800 | |
commit | 758174b2dc11268496cd97cdbae517ebdc61e65c (patch) | |
tree | 4c2abd32c058c4d64f10c443d0304e83a3af231c /tensorflow/contrib/framework | |
parent | 0ad0dfef6953b38aea212e3ab0ca069344419c66 (diff) |
Add critical section resource and op that allows execution within the critical
section.
This op (not in the public API yet) allows exclusive access to certain resources
by bottlenecking subgraph access to run within a serialized critical section.
PiperOrigin-RevId: 180733901
Diffstat (limited to 'tensorflow/contrib/framework')
5 files changed, 107 insertions, 1 deletions
diff --git a/tensorflow/contrib/framework/BUILD b/tensorflow/contrib/framework/BUILD index 5b659ddaa1..e66f4d0201 100644 --- a/tensorflow/contrib/framework/BUILD +++ b/tensorflow/contrib/framework/BUILD @@ -11,11 +11,12 @@ package(default_visibility = [ ]) load("//tensorflow:tensorflow.bzl", "py_test") -load("//tensorflow:tensorflow.bzl", "tf_custom_op_py_library") load("//tensorflow:tensorflow.bzl", "tf_custom_op_library") load("//tensorflow:tensorflow.bzl", "tf_gen_op_wrapper_py") load("//tensorflow:tensorflow.bzl", "tf_gen_op_libs") load("//tensorflow:tensorflow.bzl", "tf_kernel_library") +load("//tensorflow:tensorflow.bzl", "cuda_py_test") +load("//tensorflow:tensorflow.bzl", "tf_custom_op_py_library") tf_custom_op_py_library( name = "framework_py", @@ -31,6 +32,7 @@ tf_custom_op_py_library( "python/ops/arg_scope.py", "python/ops/audio_ops.py", "python/ops/checkpoint_ops.py", + "python/ops/critical_section_ops.py", "python/ops/ops.py", "python/ops/prettyprint_ops.py", "python/ops/sort_ops.py", @@ -70,6 +72,7 @@ tf_custom_op_py_library( "//tensorflow/python:variable_scope", "//tensorflow/python:variables", "//tensorflow/python/eager:context", + "//tensorflow/python/eager:function", "//third_party/py/numpy", "@six_archive//:six", ], @@ -173,6 +176,21 @@ py_test( ], ) +cuda_py_test( + name = "critical_section_test", + size = "medium", + srcs = ["python/ops/critical_section_test.py"], + additional_deps = [ + "//tensorflow/python:client_testlib", + ":framework_py", + "//tensorflow/python:framework_for_generated_wrappers", + "//tensorflow/python:framework_test_lib", + "//tensorflow/python:gradients", + "//tensorflow/python:platform_test", + "//tensorflow/python:resource_variable_ops", + ], +) + py_test( name = "accumulate_n_v2_eager_test", size = "small", diff --git a/tensorflow/contrib/framework/__init__.py b/tensorflow/contrib/framework/__init__.py index 4edc77f86b..de3fd9eacb 100644 --- a/tensorflow/contrib/framework/__init__.py +++ b/tensorflow/contrib/framework/__init__.py @@ -82,6 +82,8 @@ See the @{$python/contrib.framework} guide. @@load_variable_slot_initializer @@sort + +@@CriticalSection """ from __future__ import absolute_import diff --git a/tensorflow/contrib/framework/python/ops/__init__.py b/tensorflow/contrib/framework/python/ops/__init__.py index 685bb94779..3763448860 100644 --- a/tensorflow/contrib/framework/python/ops/__init__.py +++ b/tensorflow/contrib/framework/python/ops/__init__.py @@ -22,6 +22,7 @@ from __future__ import print_function # pylint: disable=wildcard-import from tensorflow.contrib.framework.python.ops.arg_scope import * from tensorflow.contrib.framework.python.ops.checkpoint_ops import * +from tensorflow.contrib.framework.python.ops.critical_section_ops import * from tensorflow.contrib.framework.python.ops.ops import * from tensorflow.contrib.framework.python.ops.prettyprint_ops import * from tensorflow.contrib.framework.python.ops.sort_ops import * diff --git a/tensorflow/contrib/framework/python/ops/critical_section_ops.py b/tensorflow/contrib/framework/python/ops/critical_section_ops.py new file mode 100644 index 0000000000..6857b792a9 --- /dev/null +++ b/tensorflow/contrib/framework/python/ops/critical_section_ops.py @@ -0,0 +1,22 @@ +# Copyright 2017 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Critical Section object and execution logic.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + + +from tensorflow.python.ops.gen_resource_variable_ops import * # pylint: disable=wildcard-import diff --git a/tensorflow/contrib/framework/python/ops/critical_section_test.py b/tensorflow/contrib/framework/python/ops/critical_section_test.py new file mode 100644 index 0000000000..8781946ce6 --- /dev/null +++ b/tensorflow/contrib/framework/python/ops/critical_section_test.py @@ -0,0 +1,63 @@ +# Copyright 2017 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""critical section tests.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from tensorflow.contrib.framework.python.ops import critical_section_ops +from tensorflow.python.framework import dtypes +from tensorflow.python.framework import function +from tensorflow.python.framework import ops +from tensorflow.python.framework import tensor_shape +from tensorflow.python.ops import array_ops +from tensorflow.python.ops import resource_variable_ops +from tensorflow.python.platform import test + + +class CriticalSectionTest(test.TestCase): + + def testCreateCriticalSectionRaw(self): + handle = critical_section_ops.critical_section_op("cs") + v = resource_variable_ops.ResourceVariable(0.0, name="v") + + @function.Defun(dtypes.float32, dtypes.float32) + def fn(a, b): + c = v.read_value() + with ops.control_dependencies([c]): + nv = v.assign_add(a * b) + with ops.control_dependencies([nv]): + return array_ops.identity(c) + + def execute(fn, *args): + output_args = fn.definition.signature.output_arg + return resource_variable_ops.execute_in_critical_section( + critical_section=handle, + arguments=list(args) + fn.captured_inputs, + f=fn, + output_types=[out.type for out in output_args], + output_shapes=[tensor_shape.TensorShape(None) for _ in output_args]) + + num_concurrent = 1000 + r = [execute(fn, 1.0, 2.0)[0] for _ in range(num_concurrent)] + self.evaluate(v.initializer) + r_value = self.evaluate(r) + self.assertAllClose([2.0 * i for i in range(num_concurrent)], + sorted(r_value)) + + +if __name__ == "__main__": + test.main() |