diff options
69 files changed, 6028 insertions, 336 deletions
diff --git a/RELEASE.md b/RELEASE.md index 7e6325af14..4b03394427 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -6,7 +6,7 @@ * Update `tf.keras` to the Keras 2.1.6 API. * Added [`tf.keras.layers.CuDNNGRU`](https://www.tensorflow.org/versions/r1.9/api_docs/python/tf/keras/layers/CuDNNGRU) and [`tf.keras.layers.CuDNNLSTM`](https://www.tensorflow.org/versions/r1.9/api_docs/python/tf/keras/layers/CuDNNLSTM) layers. [Try it](https://colab.sandbox.google.com/github/tensorflow/tensorflow/blob/master/tensorflow/contrib/eager/python/examples/nmt_with_attention/nmt_with_attention.ipynb?linkId=53292082). * Adding support of core [feature columns](https://www.tensorflow.org/get_started/feature_columns) and [losses](https://www.tensorflow.org/api_docs/python/tf/losses) to [gradient boosted trees estimators](https://github.com/tensorflow/models/tree/master/official/boosted_trees). -* The [python interface](https://tensorflow-dot-devsite.googleplex.com/versions/r1.9/api_docs/python/tf/contrib/lite) +* The [python interface](https://www.tensorflow.org/versions/r1.9/api_docs/python/tf/contrib/lite) for the [TFLite Optimizing Converter](https://github.com/tensorflow/tensorflow/blob/master/tensorflow/contrib/lite/toco/README.md) has been expanded, and the command line interface (AKA: `toco`, `tflite_convert`) is once again included in the standard `pip` installation. diff --git a/tensorflow/BUILD b/tensorflow/BUILD index 6129fd0434..51eea94847 100644 --- a/tensorflow/BUILD +++ b/tensorflow/BUILD @@ -438,6 +438,22 @@ filegroup( data = glob(["docs_src/**/*.md"]), ) +cc_library( + name = "grpc", + deps = select({ + ":linux_s390x": ["@grpc//:grpc_unsecure"], + "//conditions:default": ["@grpc"], + }), +) + +cc_library( + name = "grpc++", + deps = select({ + ":linux_s390x": ["@grpc//:grpc++_unsecure"], + "//conditions:default": ["@grpc//:grpc++"], + }), +) + # A shared object which includes registration mechanisms for ops and # kernels. Does not include the implementations of any ops or kernels. Instead, # the library which loads libtensorflow_framework.so @@ -587,19 +603,3 @@ py_library( visibility = ["//visibility:public"], deps = ["//tensorflow/python:no_contrib"], ) - -cc_library( - name = "grpc", - deps = select({ - ":linux_s390x": ["@grpc//:grpc_unsecure"], - "//conditions:default": ["@grpc"], - }), -) - -cc_library( - name = "grpc++", - deps = select({ - ":linux_s390x": ["@grpc//:grpc++_unsecure"], - "//conditions:default": ["@grpc//:grpc++"], - }), -) diff --git a/tensorflow/compiler/xla/service/cpu/BUILD b/tensorflow/compiler/xla/service/cpu/BUILD index 2c3eb1ae36..3479240610 100644 --- a/tensorflow/compiler/xla/service/cpu/BUILD +++ b/tensorflow/compiler/xla/service/cpu/BUILD @@ -129,7 +129,7 @@ cc_library( "@llvm//:x86_code_gen", # fixdeps: keep "@llvm//:x86_disassembler", # fixdeps: keep ] + select({ - "@org_tensorflow//tensorflow:linux_ppc64le": [ + "//tensorflow:linux_ppc64le": [ "@llvm//:powerpc_disassembler", "@llvm//:powerpc_code_gen", ], diff --git a/tensorflow/compiler/xla/util.h b/tensorflow/compiler/xla/util.h index 6041fae159..b23b968aae 100644 --- a/tensorflow/compiler/xla/util.h +++ b/tensorflow/compiler/xla/util.h @@ -534,6 +534,13 @@ c_count_if(const C& c, Pred&& pred) { return std::count_if(std::begin(c), std::end(c), std::forward<Pred>(pred)); } +// Determines whether `value` is present in `c`. +template <typename C, typename T> +bool c_linear_search(const C& c, T&& value) { + auto last = std::end(c); + return std::find(std::begin(c), last, std::forward<T>(value)) != last; +} + template <typename C, typename Value> int64 FindIndex(const C& c, Value&& value) { auto it = c_find(c, std::forward<Value>(value)); diff --git a/tensorflow/contrib/data/python/kernel_tests/BUILD b/tensorflow/contrib/data/python/kernel_tests/BUILD index d81654e039..c9435eadcd 100644 --- a/tensorflow/contrib/data/python/kernel_tests/BUILD +++ b/tensorflow/contrib/data/python/kernel_tests/BUILD @@ -188,6 +188,7 @@ py_test( "optonly", ], deps = [ + "//tensorflow/contrib/data/python/ops:batching", "//tensorflow/contrib/data/python/ops:error_ops", "//tensorflow/python:array_ops", "//tensorflow/python:client_testlib", diff --git a/tensorflow/contrib/data/python/kernel_tests/map_dataset_op_test.py b/tensorflow/contrib/data/python/kernel_tests/map_dataset_op_test.py index 270a2297b4..a075dfd8b5 100644 --- a/tensorflow/contrib/data/python/kernel_tests/map_dataset_op_test.py +++ b/tensorflow/contrib/data/python/kernel_tests/map_dataset_op_test.py @@ -17,11 +17,16 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import itertools import os +import time import numpy as np +from tensorflow.contrib.data.python.ops import batching from tensorflow.contrib.data.python.ops import error_ops +from tensorflow.core.protobuf import config_pb2 +from tensorflow.python.client import session from tensorflow.python.data.ops import dataset_ops from tensorflow.python.framework import errors from tensorflow.python.framework import ops @@ -135,5 +140,82 @@ class MapDatasetTest(test.TestCase): sess.run(get_next) +class MapDatasetBenchmark(test.Benchmark): + + def benchmarkMapAndBatch(self): + small = itertools.product([1, 4], [1, 4], [1, 4], [16, 64], [100]) + large = itertools.product([16, 64], [16, 64], [16, 64], [256, 1024], [10]) + + num_iters = 100 + + def benchmark(series): + + for num_calls, inter_op, element_size, batch_size, num_steps in series: + dataset = dataset_ops.Dataset.from_tensors( + np.random.randint(100, size=element_size)).repeat().map( + lambda x: x, + num_parallel_calls=num_calls).batch(batch_size=batch_size) + iterator = dataset.make_one_shot_iterator() + get_next = iterator.get_next() + + fused_dataset = dataset_ops.Dataset.from_tensors( + np.random.randint(100, size=element_size)).repeat(None).apply( + batching.map_and_batch( + lambda x: x, + num_parallel_calls=num_calls, + batch_size=batch_size)) + fused_iterator = fused_dataset.make_one_shot_iterator() + fused_get_next = fused_iterator.get_next() + + fused_deltas = [] + with session.Session( + config=config_pb2.ConfigProto( + inter_op_parallelism_threads=inter_op)) as sess: + + for _ in range(5): + sess.run(fused_get_next) + for _ in range(num_iters): + start = time.time() + for _ in range(num_steps): + sess.run(fused_get_next) + end = time.time() + fused_deltas.append(end - start) + + chained_deltas = [] + with session.Session( + config=config_pb2.ConfigProto( + inter_op_parallelism_threads=inter_op)) as sess: + for _ in range(5): + sess.run(get_next) + for _ in range(num_iters): + start = time.time() + for _ in range(num_steps): + sess.run(get_next) + end = time.time() + chained_deltas.append(end - start) + + chained_wall_time = np.median(chained_deltas) / num_iters + fused_wall_time = np.median(fused_deltas) / num_iters + print( + "batch size: %d, num parallel calls: %d, inter-op parallelism: %d, " + "element size: %d, chained wall time: %f, fused wall time: %f" % + (batch_size, num_calls, inter_op, element_size, chained_wall_time, + fused_wall_time)) + + self.report_benchmark( + iters=num_iters, + wall_time=chained_wall_time, + name="chained_batch_size_%d_num_calls_%d_inter_op_%d_elem_size_%d" + % (batch_size, num_calls, inter_op, element_size)) + + self.report_benchmark( + iters=num_iters, + wall_time=fused_wall_time, + name="fused_batch_size_%d_num_calls_%d_inter_op_%d_elem_size_%d" + % (batch_size, num_calls, inter_op, element_size)) + + benchmark(small) + benchmark(large) + if __name__ == "__main__": test.main() diff --git a/tensorflow/contrib/estimator/python/estimator/dnn.py b/tensorflow/contrib/estimator/python/estimator/dnn.py index 4bb90cf81b..9efa8f474d 100644 --- a/tensorflow/contrib/estimator/python/estimator/dnn.py +++ b/tensorflow/contrib/estimator/python/estimator/dnn.py @@ -112,7 +112,8 @@ class DNNEstimator(estimator.Estimator): dropout=None, input_layer_partitioner=None, config=None, - warm_start_from=None): + warm_start_from=None, + batch_norm=False): """Initializes a `DNNEstimator` instance. Args: @@ -142,6 +143,7 @@ class DNNEstimator(estimator.Estimator): string filepath is provided instead of a `WarmStartSettings`, then all weights are warm-started, and it is assumed that vocabularies and Tensor names are unchanged. + batch_norm: Whether to use batch normalization after each hidden layer. """ def _model_fn(features, labels, mode, config): return dnn_lib._dnn_model_fn( # pylint: disable=protected-access @@ -155,7 +157,8 @@ class DNNEstimator(estimator.Estimator): activation_fn=activation_fn, dropout=dropout, input_layer_partitioner=input_layer_partitioner, - config=config) + config=config, + batch_norm=batch_norm) super(DNNEstimator, self).__init__( model_fn=_model_fn, model_dir=model_dir, config=config, warm_start_from=warm_start_from) diff --git a/tensorflow/contrib/gdr/gdr_memory_manager.cc b/tensorflow/contrib/gdr/gdr_memory_manager.cc index 81e70ae30a..1435e19109 100644 --- a/tensorflow/contrib/gdr/gdr_memory_manager.cc +++ b/tensorflow/contrib/gdr/gdr_memory_manager.cc @@ -34,8 +34,9 @@ limitations under the License. #include "tensorflow/core/common_runtime/device.h" #include "tensorflow/core/common_runtime/dma_helper.h" #if GOOGLE_CUDA +#include "tensorflow/core/common_runtime/gpu/gpu_process_state.h" #include "tensorflow/core/common_runtime/gpu/gpu_util.h" -#include "tensorflow/core/common_runtime/gpu/process_state.h" +#include "tensorflow/core/common_runtime/process_state.h" #endif // GOOGLE_CUDA #include "tensorflow/core/framework/allocator_registry.h" #include "tensorflow/core/lib/core/status.h" @@ -274,7 +275,7 @@ Status GdrMemoryManager::Init() { Allocator* allocators[] = { #if GOOGLE_CUDA - ProcessState::singleton()->GetCUDAHostAllocator(0), + GPUProcessState::singleton()->GetCUDAHostAllocator(0), ProcessState::singleton()->GetCPUAllocator(0), #endif // GOOGLE_CUDA cpu_allocator(), @@ -308,7 +309,8 @@ Status GdrMemoryManager::Init() { if (IsGDRAvailable()) { // Note we don't free allocated GPU memory so there is no free visitor int32_t bus_id = TryToReadNumaNode(listening_->verbs->device) + 1; - ProcessState::singleton()->AddGPUAllocVisitor(bus_id, cuda_alloc_visitor); + GPUProcessState::singleton()->AddGPUAllocVisitor(bus_id, + cuda_alloc_visitor); LOG(INFO) << "Instrumenting GPU allocator with bus_id " << bus_id; } #endif // GOOGLE_CUDA @@ -430,7 +432,7 @@ void GdrMemoryManager::TransportOptionsFromTensor( #if GOOGLE_CUDA if (!on_host) { - Allocator* alloc = ProcessState::singleton()->GetCUDAHostAllocator(0); + Allocator* alloc = GPUProcessState::singleton()->GetCUDAHostAllocator(0); Tensor* host_copy = new Tensor(alloc, tensor.dtype(), tensor.shape()); GPUUtil::CopyGPUTensorToCPU( device, device_context, &tensor, host_copy, @@ -532,7 +534,7 @@ void GdrMemoryManager::TensorFromTransportOptions( Tensor host_copy; #if GOOGLE_CUDA if (mr == nullptr && !on_host) { - Allocator* alloc = ProcessState::singleton()->GetCUDAHostAllocator(0); + Allocator* alloc = GPUProcessState::singleton()->GetCUDAHostAllocator(0); host_copy = Tensor(alloc, tensor->dtype(), tensor->shape()); buffer = DMAHelper::buffer(&host_copy); addr = buffer->data(); diff --git a/tensorflow/contrib/lite/java/demo/app/build.gradle b/tensorflow/contrib/lite/java/demo/app/build.gradle index 288a5f73c5..49868c5a75 100644 --- a/tensorflow/contrib/lite/java/demo/app/build.gradle +++ b/tensorflow/contrib/lite/java/demo/app/build.gradle @@ -92,4 +92,4 @@ class DownloadUrlTask extends DefaultTask { void download() { ant.get(src: sourceUrl, dest: target) } -}
\ No newline at end of file +} diff --git a/tensorflow/contrib/lite/java/src/main/java/org/tensorflow/lite/Interpreter.java b/tensorflow/contrib/lite/java/src/main/java/org/tensorflow/lite/Interpreter.java index fd1f0ffa68..4e22a68bf2 100644 --- a/tensorflow/contrib/lite/java/src/main/java/org/tensorflow/lite/Interpreter.java +++ b/tensorflow/contrib/lite/java/src/main/java/org/tensorflow/lite/Interpreter.java @@ -135,7 +135,8 @@ public final class Interpreter implements AutoCloseable { * including int, float, long, and byte. {@link ByteBuffer} is the preferred way to pass large * input data. When {@link ByteBuffer} is used, its content should remain unchanged until * model inference is done. - * @param output a multidimensional array of output data. + * @param output a multidimensional array of output data, or a {@link ByteBuffer} of primitive + * types including int, float, long, and byte. */ public void run(@NonNull Object input, @NonNull Object output) { Object[] inputs = {input}; @@ -155,8 +156,9 @@ public final class Interpreter implements AutoCloseable { * primitive types including int, float, long, and byte. {@link ByteBuffer} is the preferred * way to pass large input data. When {@link ByteBuffer} is used, its content should remain * unchanged until model inference is done. - * @param outputs a map mapping output indices to multidimensional arrays of output data. It only - * needs to keep entries for the outputs to be used. + * @param outputs a map mapping output indices to multidimensional arrays of output data or {@link + * ByteBuffer}s of primitive types including int, float, long, and byte. It only needs to keep + * entries for the outputs to be used. */ public void runForMultipleInputsOutputs( @NonNull Object[] inputs, @NonNull Map<Integer, Object> outputs) { diff --git a/tensorflow/contrib/lite/java/src/main/java/org/tensorflow/lite/Tensor.java b/tensorflow/contrib/lite/java/src/main/java/org/tensorflow/lite/Tensor.java index 09e887aae3..b2a3e04c55 100644 --- a/tensorflow/contrib/lite/java/src/main/java/org/tensorflow/lite/Tensor.java +++ b/tensorflow/contrib/lite/java/src/main/java/org/tensorflow/lite/Tensor.java @@ -15,6 +15,8 @@ limitations under the License. package org.tensorflow.lite; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Arrays; /** @@ -29,8 +31,21 @@ final class Tensor { return new Tensor(nativeHandle); } - /** Reads Tensor content into an array. */ + /** + * Copies the contents of the tensor to {@code dst} and returns {@code dst}. + * + * @param dst the destination buffer, either an explicitly-typed array or a {@link ByteBuffer}. + * @throws IllegalArgumentException if {@code dst} is not compatible with the tensor (for example, + * mismatched data types or shapes). + * @throws BufferOverflowException If {@code dst} is a ByteBuffer with insufficient space for the + * data in this tensor. + */ <T> T copyTo(T dst) { + if (dst instanceof ByteBuffer) { + ByteBuffer dstByteBuffer = (ByteBuffer) dst; + dstByteBuffer.put(buffer()); + return dst; + } if (NativeInterpreterWrapper.dataTypeOf(dst) != dtype) { throw new IllegalArgumentException( String.format( @@ -60,6 +75,12 @@ final class Tensor { this.shapeCopy = shape(nativeHandle); } + private ByteBuffer buffer() { + return buffer(nativeHandle).order(ByteOrder.nativeOrder()); + } + + private static native ByteBuffer buffer(long handle); + private static native int dtype(long handle); private static native int[] shape(long handle); diff --git a/tensorflow/contrib/lite/java/src/main/native/tensor_jni.cc b/tensorflow/contrib/lite/java/src/main/native/tensor_jni.cc index 9e9387da86..08b4d04280 100644 --- a/tensorflow/contrib/lite/java/src/main/native/tensor_jni.cc +++ b/tensorflow/contrib/lite/java/src/main/native/tensor_jni.cc @@ -203,6 +203,16 @@ size_t writeMultiDimensionalArray(JNIEnv* env, jobject src, TfLiteType type, } } +JNIEXPORT jobject JNICALL Java_org_tensorflow_lite_Tensor_buffer(JNIEnv* env, + jclass clazz, + jlong handle) { + TfLiteTensor* tensor = convertLongToTensor(env, handle); + if (tensor == nullptr) return nullptr; + + return env->NewDirectByteBuffer(static_cast<void*>(tensor->data.raw), + static_cast<jlong>(tensor->bytes)); +} + JNIEXPORT void JNICALL Java_org_tensorflow_lite_Tensor_readMultiDimensionalArray(JNIEnv* env, jclass clazz, diff --git a/tensorflow/contrib/lite/java/src/main/native/tensor_jni.h b/tensorflow/contrib/lite/java/src/main/native/tensor_jni.h index 3a4910dcc3..9ba95d9ac4 100644 --- a/tensorflow/contrib/lite/java/src/main/native/tensor_jni.h +++ b/tensorflow/contrib/lite/java/src/main/native/tensor_jni.h @@ -24,8 +24,17 @@ extern "C" { #endif // __cplusplus /* - * Class: org_tensorflow_lite_TfLiteTensor - * Method: + * Class: org_tensorflow_lite_Tensor + * Method: buffer + * Signature: (J)Ljava/nio/ByteBuffer; + */ +JNIEXPORT jobject JNICALL Java_org_tensorflow_lite_Tensor_buffer(JNIEnv* env, + jclass clazz, + jlong handle); + +/* + * Class: org_tensorflow_lite_Tensor + * Method: dtype * Signature: (J)I */ JNIEXPORT jint JNICALL Java_org_tensorflow_lite_Tensor_dtype(JNIEnv* env, @@ -33,8 +42,8 @@ JNIEXPORT jint JNICALL Java_org_tensorflow_lite_Tensor_dtype(JNIEnv* env, jlong handle); /* - * Class: org_tensorflow_lite_TfLiteTensor - * Method: + * Class: org_tensorflow_lite_Tensor + * Method: shape * Signature: (J)[I */ JNIEXPORT jintArray JNICALL Java_org_tensorflow_lite_Tensor_shape(JNIEnv* env, @@ -42,8 +51,8 @@ JNIEXPORT jintArray JNICALL Java_org_tensorflow_lite_Tensor_shape(JNIEnv* env, jlong handle); /* - * Class: org_tensorflow_lite_TfLiteTensor - * Method: + * Class: org_tensorflow_lite_Tensor + * Method: readMultiDimensionalArray * Signature: (JLjava/lang/Object;) */ JNIEXPORT void JNICALL diff --git a/tensorflow/contrib/lite/java/src/test/java/org/tensorflow/lite/InterpreterTest.java b/tensorflow/contrib/lite/java/src/test/java/org/tensorflow/lite/InterpreterTest.java index 82007a6ab5..e6deadffe2 100644 --- a/tensorflow/contrib/lite/java/src/test/java/org/tensorflow/lite/InterpreterTest.java +++ b/tensorflow/contrib/lite/java/src/test/java/org/tensorflow/lite/InterpreterTest.java @@ -165,6 +165,24 @@ public final class InterpreterTest { } @Test + public void testRunWithByteBufferOutput() { + float[] oneD = {1.23f, 6.54f, 7.81f}; + float[][] twoD = {oneD, oneD, oneD, oneD, oneD, oneD, oneD, oneD}; + float[][][] threeD = {twoD, twoD, twoD, twoD, twoD, twoD, twoD, twoD}; + float[][][][] fourD = {threeD, threeD}; + ByteBuffer parsedOutput = + ByteBuffer.allocateDirect(2 * 8 * 8 * 3 * 4).order(ByteOrder.nativeOrder()); + try (Interpreter interpreter = new Interpreter(MODEL_FILE)) { + interpreter.run(fourD, parsedOutput); + } + float[] outputOneD = { + parsedOutput.getFloat(0), parsedOutput.getFloat(4), parsedOutput.getFloat(8) + }; + float[] expected = {3.69f, 19.62f, 23.43f}; + assertThat(outputOneD).usingTolerance(0.1f).containsExactly(expected).inOrder(); + } + + @Test public void testMobilenetRun() { // Create a gray image. float[][][][] img = new float[1][224][224][3]; diff --git a/tensorflow/contrib/lite/java/src/test/java/org/tensorflow/lite/NativeInterpreterWrapperTest.java b/tensorflow/contrib/lite/java/src/test/java/org/tensorflow/lite/NativeInterpreterWrapperTest.java index 9e41cb132d..029e5853e2 100644 --- a/tensorflow/contrib/lite/java/src/test/java/org/tensorflow/lite/NativeInterpreterWrapperTest.java +++ b/tensorflow/contrib/lite/java/src/test/java/org/tensorflow/lite/NativeInterpreterWrapperTest.java @@ -112,6 +112,27 @@ public final class NativeInterpreterWrapperTest { } @Test + public void testRunWithBufferOutput() { + try (NativeInterpreterWrapper wrapper = new NativeInterpreterWrapper(FLOAT_MODEL_PATH)) { + float[] oneD = {1.23f, -6.54f, 7.81f}; + float[][] twoD = {oneD, oneD, oneD, oneD, oneD, oneD, oneD, oneD}; + float[][][] threeD = {twoD, twoD, twoD, twoD, twoD, twoD, twoD, twoD}; + float[][][][] fourD = {threeD, threeD}; + Object[] inputs = {fourD}; + Tensor[] outputs = wrapper.run(inputs); + assertThat(outputs).hasLength(1); + ByteBuffer parsedOutput = + ByteBuffer.allocateDirect(2 * 8 * 8 * 3 * 4).order(ByteOrder.nativeOrder()); + outputs[0].copyTo(parsedOutput); + float[] outputOneD = { + parsedOutput.getFloat(0), parsedOutput.getFloat(4), parsedOutput.getFloat(8) + }; + float[] expected = {3.69f, -19.62f, 23.43f}; + assertThat(outputOneD).usingTolerance(0.1f).containsExactly(expected).inOrder(); + } + } + + @Test public void testRunWithInputsOfSameDims() { NativeInterpreterWrapper wrapper = new NativeInterpreterWrapper(FLOAT_MODEL_PATH); float[] oneD = {1.23f, -6.54f, 7.81f}; diff --git a/tensorflow/contrib/lite/java/src/test/java/org/tensorflow/lite/TensorTest.java b/tensorflow/contrib/lite/java/src/test/java/org/tensorflow/lite/TensorTest.java index 94b6632bb8..dd9d37eeda 100644 --- a/tensorflow/contrib/lite/java/src/test/java/org/tensorflow/lite/TensorTest.java +++ b/tensorflow/contrib/lite/java/src/test/java/org/tensorflow/lite/TensorTest.java @@ -18,6 +18,9 @@ package org.tensorflow.lite; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.fail; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -71,6 +74,32 @@ public final class TensorTest { } @Test + public void testCopyToByteBuffer() { + Tensor tensor = Tensor.fromHandle(nativeHandle); + ByteBuffer parsedOutput = + ByteBuffer.allocateDirect(2 * 8 * 8 * 3 * 4).order(ByteOrder.nativeOrder()); + tensor.copyTo(parsedOutput); + assertThat(parsedOutput.position()).isEqualTo(2 * 8 * 8 * 3 * 4); + float[] outputOneD = { + parsedOutput.getFloat(0), parsedOutput.getFloat(4), parsedOutput.getFloat(8) + }; + float[] expected = {3.69f, 19.62f, 23.43f}; + assertThat(outputOneD).usingTolerance(0.1f).containsExactly(expected).inOrder(); + } + + @Test + public void testCopyToInvalidByteBuffer() { + Tensor tensor = Tensor.fromHandle(nativeHandle); + ByteBuffer parsedOutput = ByteBuffer.allocateDirect(3 * 4).order(ByteOrder.nativeOrder()); + try { + tensor.copyTo(parsedOutput); + fail(); + } catch (BufferOverflowException e) { + // Expected. + } + } + + @Test public void testCopyToWrongType() { Tensor tensor = Tensor.fromHandle(nativeHandle); int[][][][] parsedOutputs = new int[2][8][8][3]; diff --git a/tensorflow/contrib/lite/kernels/gather.cc b/tensorflow/contrib/lite/kernels/gather.cc index 6a2341461f..2b2a9e6620 100644 --- a/tensorflow/contrib/lite/kernels/gather.cc +++ b/tensorflow/contrib/lite/kernels/gather.cc @@ -40,10 +40,8 @@ TfLiteStatus Prepare(TfLiteContext* context, TfLiteNode* node) { TfLiteTensor* output = GetOutput(context, node, kOutputTensor); // Only INT32 positions are supported. TF_LITE_ENSURE_EQ(context, positions->type, kTfLiteInt32); - // Check that input and output types match. - TF_LITE_ENSURE_EQ(context, input->type, output->type); - // TODO(mgubin): only 0D or 1D positions are currently supported. - TF_LITE_ENSURE(context, NumDimensions(positions) <= 1); + // Assign to output the input type. + output->type = input->type; // TODO(mgubin): Only default axis == 0 is supported. TF_LITE_ENSURE_EQ(context, params->axis, 0); // Check conditions for different types. @@ -102,6 +100,7 @@ TfLiteStatus Eval(TfLiteContext* context, TfLiteNode* node) { TF_LITE_GATHER(int32_t, int32_t); break; case kTfLiteString: { + // TODO(mgubin): Currently support only for 1D output tensors. DynamicBuffer buffer; const int32* indexes = positions->data.i32; const int num_strings = GetStringCount(input); diff --git a/tensorflow/contrib/lite/kernels/gather_test.cc b/tensorflow/contrib/lite/kernels/gather_test.cc index cdadbeda18..1d4292955c 100644 --- a/tensorflow/contrib/lite/kernels/gather_test.cc +++ b/tensorflow/contrib/lite/kernels/gather_test.cc @@ -96,6 +96,15 @@ TEST(GatherOpTest, Test0DIndexWith0DResult) { EXPECT_TRUE(m.GetOutputShape().empty()); } +TEST(GatherOpTest, Test2DIndexWith2DResult) { + GatherOpModel m({3}, TensorType_FLOAT32, {1, 2}); + m.SetInputFloat({1.0, 2.0, 3.0}); + m.SetPositions({1, 0}); + m.Invoke(); + EXPECT_THAT(m.GetOutputFloat(), ElementsAreArray(ArrayFloatNear({2.0, 1.0}))); + EXPECT_THAT(m.GetOutputShape(), ElementsAreArray({1, 2})); +} + TEST(FloatGatherOpTest, Duplicate) { GatherOpModel m({1, 2, 2}, TensorType_FLOAT32, {2}); m.SetInputFloat({-2.0, 0.2, 0.7, 0.8}); diff --git a/tensorflow/contrib/lite/kernels/topk_v2.cc b/tensorflow/contrib/lite/kernels/topk_v2.cc index fb0e49c90c..2dd760bbfe 100644 --- a/tensorflow/contrib/lite/kernels/topk_v2.cc +++ b/tensorflow/contrib/lite/kernels/topk_v2.cc @@ -56,11 +56,13 @@ TfLiteStatus ResizeOutput(TfLiteContext* context, TfLiteNode* node) { output_values_shape->data[num_dimensions - 1] = k; TfLiteTensor* output_indexes = GetOutput(context, node, kOutputIndexes); TfLiteTensor* output_values = GetOutput(context, node, kOutputValues); + // Force output types. + output_indexes->type = kTfLiteInt32; + output_values->type = input->type; auto resize_tensor = [context](TfLiteTensor* tensor, TfLiteIntArray* new_size, TfLiteIntArray* delete_on_error) { TfLiteStatus status = context->ResizeTensor(context, tensor, new_size); if (status != kTfLiteOk) { - TfLiteIntArrayFree(new_size); if (delete_on_error != nullptr) { TfLiteIntArrayFree(delete_on_error); } diff --git a/tensorflow/contrib/lite/nnapi_delegate.cc b/tensorflow/contrib/lite/nnapi_delegate.cc index 44cef80ac3..905c0919cb 100644 --- a/tensorflow/contrib/lite/nnapi_delegate.cc +++ b/tensorflow/contrib/lite/nnapi_delegate.cc @@ -534,10 +534,14 @@ TfLiteStatus AddOpsAndParams( case tflite::BuiltinOperator_DIV: nnapi_version = 11; // require NNAPI 1.1 nn_op_type = ANEURALNETWORKS_DIV; + check_and_add_activation( + reinterpret_cast<TfLiteDivParams*>(node.builtin_data)->activation); break; case tflite::BuiltinOperator_SUB: nnapi_version = 11; // require NNAPI 1.1 nn_op_type = ANEURALNETWORKS_SUB; + check_and_add_activation( + reinterpret_cast<TfLiteSubParams*>(node.builtin_data)->activation); break; case tflite::BuiltinOperator_SQUEEZE: nnapi_version = 11; // requires NNAPI 1.1 diff --git a/tensorflow/contrib/lite/toco/graph_transformations/propagate_fixed_sizes.cc b/tensorflow/contrib/lite/toco/graph_transformations/propagate_fixed_sizes.cc index 82b3ab96fe..8eb0423283 100644 --- a/tensorflow/contrib/lite/toco/graph_transformations/propagate_fixed_sizes.cc +++ b/tensorflow/contrib/lite/toco/graph_transformations/propagate_fixed_sizes.cc @@ -1039,9 +1039,6 @@ void ProcessGatherOperator(Model* model, GatherOperator* op) { QCHECK_GE(input_shape.dimensions_count(), 1); op->input_rank = input_shape.dimensions_count(); - // We only support 1-D indices. - QCHECK_EQ(indices_shape.dimensions_count(), 1); - // Copy the input dimensions to the output except for dimension 0, // where the dimension of indices_shape is used. // TODO(mgubin): if axis != 0 this is not true, change when it's supported. diff --git a/tensorflow/contrib/lite/toco/tooling_util.cc b/tensorflow/contrib/lite/toco/tooling_util.cc index 7dc1af9f1d..01113506d0 100644 --- a/tensorflow/contrib/lite/toco/tooling_util.cc +++ b/tensorflow/contrib/lite/toco/tooling_util.cc @@ -447,8 +447,12 @@ void LogSummary(int log_level, const Model& model) { } void LogArray(int log_level, const Model& model, const string& name) { - const auto& array = model.GetArray(name); VLOG(log_level) << "Array: " << name; + if (!model.HasArray(name)) { + VLOG(log_level) << " DOES NOT EXIST"; + return; + } + const auto& array = model.GetArray(name); VLOG(log_level) << " Data type: " << ArrayDataTypeName(array.data_type); VLOG(log_level) << " Final type: " << ArrayDataTypeName(array.final_data_type); diff --git a/tensorflow/contrib/tensorrt/convert/convert_graph.cc b/tensorflow/contrib/tensorrt/convert/convert_graph.cc index 0474d243d3..17b32c0e30 100644 --- a/tensorflow/contrib/tensorrt/convert/convert_graph.cc +++ b/tensorflow/contrib/tensorrt/convert/convert_graph.cc @@ -31,7 +31,7 @@ limitations under the License. #include "tensorflow/contrib/tensorrt/segment/segment.h" #include "tensorflow/core/common_runtime/gpu/gpu_id.h" #include "tensorflow/core/common_runtime/gpu/gpu_id_manager.h" -#include "tensorflow/core/common_runtime/gpu/process_state.h" +#include "tensorflow/core/common_runtime/gpu/gpu_process_state.h" #include "tensorflow/core/framework/function.h" #include "tensorflow/core/framework/graph_to_functiondef.h" #include "tensorflow/core/framework/node_def_builder.h" @@ -651,7 +651,7 @@ std::pair<int, tensorflow::Allocator*> GetDeviceAndAllocator( // to allocators. // TODO(sami): when grappler devices become available else path will not be // necessary - auto pm = tensorflow::ProcessState::singleton(); + auto pm = tensorflow::GPUProcessState::singleton(); if (params.cluster) { // get allocator tensorflow::Device* device = nullptr; if (params.cluster->GetDeviceSet()) { diff --git a/tensorflow/contrib/tpu/python/tpu/tpu_estimator.py b/tensorflow/contrib/tpu/python/tpu/tpu_estimator.py index 14e025973e..49cd318b89 100644 --- a/tensorflow/contrib/tpu/python/tpu/tpu_estimator.py +++ b/tensorflow/contrib/tpu/python/tpu/tpu_estimator.py @@ -216,8 +216,8 @@ class _SIGNAL(object): class TPUEstimatorSpec(model_fn_lib._TPUEstimatorSpec): # pylint: disable=protected-access """Ops and objects returned from a `model_fn` and passed to `TPUEstimator`. - See `EstimatorSpec` for `mode`, 'predictions, 'loss', 'train_op', and - 'export_outputs`. + See `EstimatorSpec` for `mode`, `predictions`, `loss`, `train_op`, and + `export_outputs`. For evaluation, `eval_metrics `is a tuple of `metric_fn` and `tensors`, where `metric_fn` runs on CPU to generate metrics and `tensors` represents the @@ -231,7 +231,7 @@ class TPUEstimatorSpec(model_fn_lib._TPUEstimatorSpec): # pylint: disable=prote size is the first dimension. Once all tensors are available at CPU host from all shards, they are concatenated (on CPU) and passed as positional arguments to the `metric_fn` if `tensors` is list or keyword arguments if `tensors` is - dict. `metric_fn` takes the `tensors` and returns a dict from metric string + a dict. `metric_fn` takes the `tensors` and returns a dict from metric string name to the result of calling a metric function, namely a `(metric_tensor, update_op)` tuple. See `TPUEstimator` for MNIST example how to specify the `eval_metrics`. diff --git a/tensorflow/contrib/verbs/rdma.cc b/tensorflow/contrib/verbs/rdma.cc index 86350a08e5..f7c979e863 100644 --- a/tensorflow/contrib/verbs/rdma.cc +++ b/tensorflow/contrib/verbs/rdma.cc @@ -24,8 +24,8 @@ limitations under the License. #include "tensorflow/core/common_runtime/dma_helper.h" #include "tensorflow/core/common_runtime/process_util.h" #if GOOGLE_CUDA +#include "tensorflow/core/common_runtime/gpu/gpu_process_state.h" #include "tensorflow/core/common_runtime/gpu/gpu_util.h" -#include "tensorflow/core/common_runtime/gpu/process_state.h" #endif #include "tensorflow/core/distributed_runtime/rendezvous_mgr_interface.h" #include "tensorflow/core/distributed_runtime/rpc/grpc_util.h" @@ -1084,7 +1084,7 @@ void RdmaTensorResponse::RecvHandler(Rendezvous::ParsedKey parsed, // The tensor must be copied from GPU to CPU, because either: // 1. The tensor is located on a non GDR compatible GPU. // 2. The tensor's meta-data has changed. - Allocator* alloc = ProcessState::singleton()->GetCUDAHostAllocator(0); + Allocator* alloc = GPUProcessState::singleton()->GetCUDAHostAllocator(0); copy = Tensor(alloc, in.dtype(), in.shape()); CountCopies(rm_.name_, (void*)DMAHelper::base(&in), (void*)DMAHelper::base(©), in.TotalBytes(), true); @@ -1541,7 +1541,7 @@ bool RdmaTensorRequest::AllocateTensors() { if (mr_ == nullptr) { // Can't RDMA directly to result. Use a proxy. proxy_tensor_ = - new Tensor(ProcessState::singleton()->GetCUDAHostAllocator(0), + new Tensor(GPUProcessState::singleton()->GetCUDAHostAllocator(0), result_tensor_->dtype(), result_tensor_->shape()); rdma_addr_ = DMAHelper::base(proxy_tensor_); mr_ = diff --git a/tensorflow/contrib/verbs/rdma_mgr.cc b/tensorflow/contrib/verbs/rdma_mgr.cc index 369bd986df..9cb3d1fbbf 100644 --- a/tensorflow/contrib/verbs/rdma_mgr.cc +++ b/tensorflow/contrib/verbs/rdma_mgr.cc @@ -21,8 +21,9 @@ limitations under the License. #include "tensorflow/contrib/verbs/grpc_verbs_client.h" #include "tensorflow/contrib/verbs/verbs_service.pb.h" #include "tensorflow/core/common_runtime/bfc_allocator.h" +#include "tensorflow/core/common_runtime/gpu/gpu_process_state.h" #include "tensorflow/core/common_runtime/gpu/gpu_util.h" -#include "tensorflow/core/common_runtime/gpu/process_state.h" +#include "tensorflow/core/common_runtime/process_state.h" #include "tensorflow/core/distributed_runtime/rpc/grpc_worker_cache.h" #include "tensorflow/core/distributed_runtime/session_mgr.h" #include "tensorflow/core/framework/allocator_registry.h" @@ -282,7 +283,7 @@ void RdmaMgr::InitAllocators() { Allocator* allocators[] = { #if GOOGLE_CUDA - ProcessState::singleton()->GetCUDAHostAllocator(0), + GPUProcessState::singleton()->GetCUDAHostAllocator(0), ProcessState::singleton()->GetCPUAllocator(0), #endif // GOOGLE_CUDA cpu_allocator(), @@ -323,7 +324,8 @@ void RdmaMgr::InitAllocators() { std::bind(&RdmaMemoryMgr::InsertMemoryRegion, &RdmaMemoryMgr::Singleton(), _1, _2, std::string(buf)); - ProcessState::singleton()->AddGPUAllocVisitor(bus_id, cuda_alloc_visitor); + GPUProcessState::singleton()->AddGPUAllocVisitor(bus_id, + cuda_alloc_visitor); LOG(INFO) << "Instrumenting GPU allocator with bus_id " << bus_id; } #endif // GOOGLE_CUDA diff --git a/tensorflow/core/BUILD b/tensorflow/core/BUILD index 1c6111a748..97880219b8 100644 --- a/tensorflow/core/BUILD +++ b/tensorflow/core/BUILD @@ -2670,6 +2670,8 @@ CORE_CPU_LIB_HEADERS = CORE_CPU_BASE_HDRS + [ "common_runtime/step_stats_collector.h", "common_runtime/threadpool_device.h", "common_runtime/visitable_allocator.h", + "common_runtime/process_state.h", + "common_runtime/pool_allocator.h", "graph/gradients.h", "graph/quantize_training.h", ] + if_mkl(["graph/mkl_graph_util.h"]) @@ -2708,7 +2710,9 @@ tf_cuda_library( "common_runtime/optimization_registry.cc", "common_runtime/parallel_concat_optimizer.cc", "common_runtime/placer.cc", + "common_runtime/pool_allocator.cc", "common_runtime/process_function_library_runtime.cc", + "common_runtime/process_state.cc", "common_runtime/process_util.cc", "common_runtime/renamed_device.cc", "common_runtime/rendezvous_mgr.cc", @@ -2895,6 +2899,7 @@ cc_library( ) GPU_RUNTIME_HEADERS = [ + "common_runtime/gpu/cuda_host_allocator.h", "common_runtime/gpu/gpu_bfc_allocator.h", "common_runtime/gpu/gpu_cudamalloc_allocator.h", "common_runtime/gpu/gpu_debug_allocator.h", @@ -2904,10 +2909,9 @@ GPU_RUNTIME_HEADERS = [ "common_runtime/gpu/gpu_id_utils.h", "common_runtime/gpu/gpu_init.h", "common_runtime/gpu/gpu_managed_allocator.h", + "common_runtime/gpu/gpu_process_state.h", "common_runtime/gpu/gpu_stream_util.h", "common_runtime/gpu/gpu_util.h", - "common_runtime/gpu/pool_allocator.h", - "common_runtime/gpu/process_state.h", "common_runtime/gpu_device_context.h", ] @@ -2920,11 +2924,10 @@ tf_cuda_library( "common_runtime/gpu/gpu_device.cc", "common_runtime/gpu/gpu_device_factory.cc", "common_runtime/gpu/gpu_managed_allocator.cc", + "common_runtime/gpu/gpu_process_state.cc", "common_runtime/gpu/gpu_stream_util.cc", "common_runtime/gpu/gpu_util.cc", "common_runtime/gpu/gpu_util_platform_specific.cc", - "common_runtime/gpu/pool_allocator.cc", - "common_runtime/gpu/process_state.cc", ], hdrs = GPU_RUNTIME_HEADERS, copts = tf_copts(), diff --git a/tensorflow/core/api_def/api_test.cc b/tensorflow/core/api_def/api_test.cc index 6149e5fca8..ae03a61ae6 100644 --- a/tensorflow/core/api_def/api_test.cc +++ b/tensorflow/core/api_def/api_test.cc @@ -149,6 +149,33 @@ void TestAllApiDefAttributeNamesAreValid( } } } + +void TestDeprecatedAttributesSetCorrectly( + const std::unordered_map<string, ApiDef>& api_defs_map) { + for (const auto& name_and_api_def : api_defs_map) { + int num_deprecated_endpoints = 0; + const auto& api_def = name_and_api_def.second; + for (const auto& endpoint : api_def.endpoint()) { + if (endpoint.deprecated()) { + ++num_deprecated_endpoints; + } + } + + const auto& name = name_and_api_def.first; + ASSERT_TRUE(api_def.deprecation_message().empty() || + num_deprecated_endpoints == 0) + << "Endpoints are set to 'deprecated' for deprecated op " << name + << ". If an op is deprecated (i.e. deprecation_message is set), " + << "all the endpoints are deprecated implicitly and 'deprecated' " + << "field should not be set."; + if (num_deprecated_endpoints > 0) { + ASSERT_NE(num_deprecated_endpoints, api_def.endpoint_size()) + << "All " << name << " endpoints are deprecated. Please, set " + << "deprecation_message in api_def_" << name << ".pbtxt instead. " + << "to indicate that the op is deprecated."; + } + } +} } // namespace class BaseApiTest : public ::testing::Test { @@ -236,6 +263,11 @@ TEST_F(BaseApiTest, AllApiDefAttributeNamesAreValid) { TestAllApiDefAttributeNamesAreValid(ops_, api_defs_map_); } +// Checks that deprecation is set correctly. +TEST_F(BaseApiTest, DeprecationSetCorrectly) { + TestDeprecatedAttributesSetCorrectly(api_defs_map_); +} + class PythonApiTest : public ::testing::Test { protected: PythonApiTest() { @@ -272,4 +304,9 @@ TEST_F(PythonApiTest, AllApiDefAttributeNamesAreValid) { TestAllApiDefAttributeNamesAreValid(ops_, api_defs_map_); } +// Checks that deprecation is set correctly. +TEST_F(PythonApiTest, DeprecationSetCorrectly) { + TestDeprecatedAttributesSetCorrectly(api_defs_map_); +} + } // namespace tensorflow diff --git a/tensorflow/core/common_runtime/gpu/cuda_host_allocator.h b/tensorflow/core/common_runtime/gpu/cuda_host_allocator.h new file mode 100644 index 0000000000..636cd43575 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/cuda_host_allocator.h @@ -0,0 +1,60 @@ +/* Copyright 2018 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#ifndef TENSORFLOW_CORE_COMMON_RUNTIME_GPU_CUDA_HOST_ALLOCATOR_H_ +#define TENSORFLOW_CORE_COMMON_RUNTIME_GPU_CUDA_HOST_ALLOCATOR_H_ + +#include "tensorflow/core/framework/allocator.h" +#include "tensorflow/core/platform/macros.h" +#include "tensorflow/core/platform/stream_executor.h" + +namespace tensorflow { +// Allocator for pinned CPU RAM that is made known to CUDA for the +// purpose of efficient DMA with a GPU. +class CUDAHostAllocator : public SubAllocator { + public: + // Note: stream_exec cannot be null. + explicit CUDAHostAllocator(se::StreamExecutor* stream_exec) + : stream_exec_(stream_exec) { + CHECK(stream_exec_ != nullptr); + } + ~CUDAHostAllocator() override {} + + void* Alloc(size_t alignment, size_t num_bytes) override { + void* ptr = nullptr; + if (num_bytes > 0) { + ptr = stream_exec_->HostMemoryAllocate(num_bytes); + if (ptr == nullptr) { + LOG(WARNING) << "could not allocate pinned host memory of size: " + << num_bytes; + } + } + return ptr; + } + + void Free(void* ptr, size_t num_bytes) override { + if (ptr != nullptr) { + stream_exec_->HostMemoryDeallocate(ptr); + } + } + + private: + se::StreamExecutor* stream_exec_; // not owned, non-null + + TF_DISALLOW_COPY_AND_ASSIGN(CUDAHostAllocator); +}; + +} // namespace tensorflow +#endif // TENSORFLOW_CORE_COMMON_RUNTIME_GPU_CUDA_HOST_ALLOCATOR_H_ diff --git a/tensorflow/core/common_runtime/gpu/gpu_device.cc b/tensorflow/core/common_runtime/gpu/gpu_device.cc index 520c2f9c34..3cb51b0dbc 100644 --- a/tensorflow/core/common_runtime/gpu/gpu_device.cc +++ b/tensorflow/core/common_runtime/gpu/gpu_device.cc @@ -36,9 +36,9 @@ limitations under the License. #include "tensorflow/core/common_runtime/gpu/gpu_id_manager.h" #include "tensorflow/core/common_runtime/gpu/gpu_id_utils.h" #include "tensorflow/core/common_runtime/gpu/gpu_init.h" +#include "tensorflow/core/common_runtime/gpu/gpu_process_state.h" #include "tensorflow/core/common_runtime/gpu/gpu_stream_util.h" #include "tensorflow/core/common_runtime/gpu/gpu_util.h" -#include "tensorflow/core/common_runtime/gpu/process_state.h" #include "tensorflow/core/common_runtime/gpu_device_context.h" #include "tensorflow/core/common_runtime/local_device.h" #include "tensorflow/core/framework/allocator.h" @@ -274,7 +274,7 @@ BaseGPUDevice::BaseGPUDevice(const SessionOptions& options, const string& name, tf_gpu_id_(tf_gpu_id), sync_every_op_(sync_every_op), max_streams_(max_streams) { - ProcessState::singleton()->EnableGPUDevice(); + GPUProcessState::singleton()->EnableGPUDevice(); } BaseGPUDevice::~BaseGPUDevice() { @@ -1072,7 +1072,7 @@ Status BaseGPUDeviceFactory::CreateGPUDevice(const SessionOptions& options, se::StreamExecutor* se = GpuIdUtil::ExecutorForCudaGpuId(cuda_gpu_id).ValueOrDie(); const se::DeviceDescription& desc = se->GetDeviceDescription(); - ProcessState* process_state = ProcessState::singleton(); + GPUProcessState* process_state = GPUProcessState::singleton(); Allocator* gpu_allocator = process_state->GetGPUAllocator( options.config.gpu_options(), tf_gpu_id, memory_limit); if (gpu_allocator == nullptr) { @@ -1092,7 +1092,7 @@ Status BaseGPUDeviceFactory::CreateGPUDevice(const SessionOptions& options, BaseGPUDevice* gpu_device = CreateGPUDevice( options, device_name, static_cast<Bytes>(stats.bytes_limit), dev_locality, tf_gpu_id, GetShortDeviceDescription(cuda_gpu_id, desc), gpu_allocator, - process_state->GetCPUAllocator(numa_node)); + ProcessState::singleton()->GetCPUAllocator(numa_node)); LOG(INFO) << "Created TensorFlow device (" << device_name << " with " << (stats.bytes_limit >> 20) << " MB memory) -> physical GPU (" << GetShortDeviceDescription(cuda_gpu_id, desc) << ")"; diff --git a/tensorflow/core/common_runtime/gpu/gpu_device_factory.cc b/tensorflow/core/common_runtime/gpu/gpu_device_factory.cc index 9a000749c6..e1aaf95df6 100644 --- a/tensorflow/core/common_runtime/gpu/gpu_device_factory.cc +++ b/tensorflow/core/common_runtime/gpu/gpu_device_factory.cc @@ -19,7 +19,7 @@ limitations under the License. #include "tensorflow/core/common_runtime/gpu/gpu_device.h" #include "tensorflow/core/common_runtime/gpu/gpu_id.h" -#include "tensorflow/core/common_runtime/gpu/process_state.h" +#include "tensorflow/core/common_runtime/gpu/gpu_process_state.h" #include "tensorflow/core/common_runtime/threadpool_device.h" namespace tensorflow { @@ -40,9 +40,10 @@ class GPUDevice : public BaseGPUDevice { } Allocator* GetAllocator(AllocatorAttributes attr) override { + CHECK(cpu_allocator_) << "bad place 1"; if (attr.on_host()) { if (attr.gpu_compatible() || force_gpu_compatible_) { - ProcessState* ps = ProcessState::singleton(); + GPUProcessState* ps = GPUProcessState::singleton(); return ps->GetCUDAHostAllocator(0); } else { return cpu_allocator_; @@ -90,7 +91,7 @@ class GPUCompatibleCPUDevice : public ThreadPoolDevice { ~GPUCompatibleCPUDevice() override {} Allocator* GetAllocator(AllocatorAttributes attr) override { - ProcessState* ps = ProcessState::singleton(); + GPUProcessState* ps = GPUProcessState::singleton(); if (attr.gpu_compatible() || force_gpu_compatible_) { return ps->GetCUDAHostAllocator(0); } else { diff --git a/tensorflow/core/common_runtime/gpu/gpu_device_test.cc b/tensorflow/core/common_runtime/gpu/gpu_device_test.cc index 5c6cb43eff..daf59f0560 100644 --- a/tensorflow/core/common_runtime/gpu/gpu_device_test.cc +++ b/tensorflow/core/common_runtime/gpu/gpu_device_test.cc @@ -19,7 +19,7 @@ limitations under the License. #include "tensorflow/core/common_runtime/gpu/gpu_id_utils.h" #include "tensorflow/core/common_runtime/gpu/gpu_init.h" -#include "tensorflow/core/common_runtime/gpu/process_state.h" +#include "tensorflow/core/common_runtime/gpu/gpu_process_state.h" #include "tensorflow/core/lib/core/errors.h" #include "tensorflow/core/lib/core/status.h" #include "tensorflow/core/lib/core/status_test_util.h" @@ -58,7 +58,7 @@ void ExpectErrorMessageSubstr(const Status& s, StringPiece substr) { class GPUDeviceTest : public ::testing::Test { public: - void TearDown() override { ProcessState::singleton()->TestOnlyReset(); } + void TearDown() override { GPUProcessState::singleton()->TestOnlyReset(); } protected: static SessionOptions MakeSessionOptions( diff --git a/tensorflow/core/common_runtime/gpu/process_state.cc b/tensorflow/core/common_runtime/gpu/gpu_process_state.cc index 2b442071e2..607cb33b80 100644 --- a/tensorflow/core/common_runtime/gpu/process_state.cc +++ b/tensorflow/core/common_runtime/gpu/gpu_process_state.cc @@ -13,11 +13,12 @@ See the License for the specific language governing permissions and limitations under the License. ==============================================================================*/ -#include "tensorflow/core/common_runtime/gpu/process_state.h" +#include "tensorflow/core/common_runtime/gpu/gpu_process_state.h" #include <cstring> #include <vector> +#include "tensorflow/core/common_runtime/gpu/cuda_host_allocator.h" #include "tensorflow/core/common_runtime/gpu/gpu_bfc_allocator.h" #include "tensorflow/core/common_runtime/gpu/gpu_cudamalloc_allocator.h" #include "tensorflow/core/common_runtime/gpu/gpu_debug_allocator.h" @@ -25,7 +26,7 @@ limitations under the License. #include "tensorflow/core/common_runtime/gpu/gpu_id_manager.h" #include "tensorflow/core/common_runtime/gpu/gpu_id_utils.h" #include "tensorflow/core/common_runtime/gpu/gpu_init.h" -#include "tensorflow/core/common_runtime/gpu/pool_allocator.h" +#include "tensorflow/core/common_runtime/pool_allocator.h" #include "tensorflow/core/framework/allocator.h" #include "tensorflow/core/framework/log_memory.h" #include "tensorflow/core/framework/tracking_allocator.h" @@ -37,19 +38,6 @@ limitations under the License. #include "tensorflow/core/platform/types.h" #include "tensorflow/core/util/env_var.h" -// If these flags need to be runtime configurable, consider adding -// options to ConfigProto. - -// If true, register CPU RAM used to copy to/from GPU RAM with the -// CUDA driver. -const bool FLAGS_brain_mem_reg_cuda_dma = true; - -// If true, record attributes of memory allocations and -// dynamically check for appropriate use of registered memory. -// Should only be true for debugging or diagnosis of -// performance issues. -const bool FLAGS_brain_gpu_record_mem_types = false; - namespace tensorflow { namespace { @@ -67,46 +55,36 @@ bool useCudaMemoryGuardAllocator() { } // namespace -ProcessState* ProcessState::instance_ = nullptr; +GPUProcessState* GPUProcessState::instance_ = nullptr; -/*static*/ ProcessState* ProcessState::singleton() { +/*static*/ GPUProcessState* GPUProcessState::singleton() { if (instance_ == nullptr) { - instance_ = new ProcessState; + instance_ = new GPUProcessState; } + CHECK(instance_->process_state_); return instance_; } -ProcessState::ProcessState() : gpu_device_enabled_(false) { +GPUProcessState::GPUProcessState() : gpu_device_enabled_(false) { CHECK(instance_ == nullptr); - instance_ = this; + process_state_ = ProcessState::singleton(); } -ProcessState::~ProcessState() { +// Normally the GPUProcessState singleton is never explicitly deleted. +// This function is defined for debugging problems with the allocators. +GPUProcessState::~GPUProcessState() { + CHECK_EQ(this, instance_); for (auto p : gpu_allocators_) { delete p; } instance_ = nullptr; } -string ProcessState::MemDesc::DebugString() { - return strings::StrCat((loc == CPU ? "CPU " : "GPU "), dev_index, - ", dma: ", gpu_registered, ", nic: ", nic_registered); -} - -ProcessState::MemDesc ProcessState::PtrType(const void* ptr) { - if (FLAGS_brain_gpu_record_mem_types) { - auto iter = mem_desc_map_.find(ptr); - if (iter != mem_desc_map_.end()) { - return iter->second; - } - } - return MemDesc(); -} - -Allocator* ProcessState::GetGPUAllocator(const GPUOptions& options, - TfGpuId tf_gpu_id, - size_t total_bytes) { +Allocator* GPUProcessState::GetGPUAllocator(const GPUOptions& options, + TfGpuId tf_gpu_id, + size_t total_bytes) { + CHECK(process_state_); #if GOOGLE_CUDA const string& allocator_type = options.allocator_type(); mutex_lock lock(mu_); @@ -114,7 +92,8 @@ Allocator* ProcessState::GetGPUAllocator(const GPUOptions& options, if (tf_gpu_id.value() >= static_cast<int64>(gpu_allocators_.size())) { gpu_allocators_.resize(tf_gpu_id.value() + 1); - if (FLAGS_brain_gpu_record_mem_types) gpu_al_.resize(tf_gpu_id.value() + 1); + if (process_state_->ProcessState::FLAGS_brain_gpu_record_mem_types) + gpu_al_.resize(tf_gpu_id.value() + 1); } if (gpu_allocators_[tf_gpu_id.value()] == nullptr) { @@ -155,9 +134,9 @@ Allocator* ProcessState::GetGPUAllocator(const GPUOptions& options, gpu_allocator->AddAllocVisitor(v); } } - if (FLAGS_brain_gpu_record_mem_types) { - MemDesc md; - md.loc = MemDesc::GPU; + if (process_state_->ProcessState::FLAGS_brain_gpu_record_mem_types) { + ProcessState::MemDesc md; + md.loc = ProcessState::MemDesc::GPU; md.dev_index = cuda_gpu_id.value(); md.gpu_registered = false; md.nic_registered = true; @@ -165,10 +144,11 @@ Allocator* ProcessState::GetGPUAllocator(const GPUOptions& options, gpu_al_.resize(tf_gpu_id.value() + 1); } gpu_al_[tf_gpu_id.value()] = new internal::RecordingAllocator( - &mem_desc_map_, gpu_allocator, md, &mu_); + &process_state_->mem_desc_map_, gpu_allocator, md, &mu_); } } - if (FLAGS_brain_gpu_record_mem_types) return gpu_al_[tf_gpu_id.value()]; + if (process_state_->ProcessState::FLAGS_brain_gpu_record_mem_types) + return gpu_al_[tf_gpu_id.value()]; return gpu_allocators_[tf_gpu_id.value()]; #else LOG(FATAL) << "GPUAllocator unavailable. Not compiled with --config=cuda."; @@ -176,64 +156,13 @@ Allocator* ProcessState::GetGPUAllocator(const GPUOptions& options, #endif // GOOGLE_CUDA } -Allocator* ProcessState::GetCPUAllocator(int numa_node) { - // Although we're temporarily ignoring numa_node, check for legality. - CHECK_GE(numa_node, 0); - // TODO(tucker): actually maintain separate CPUAllocators for - // different numa_nodes. For now, just one. - numa_node = 0; - mutex_lock lock(mu_); - while (cpu_allocators_.size() <= static_cast<size_t>(numa_node)) { - bool use_bfc_allocator = false; - // TODO(reedwm): Switch default to BGFAllocator if it's at least as fast and - // efficient. - Status status = ReadBoolFromEnvVar("TF_CPU_ALLOCATOR_USE_BFC", false, - &use_bfc_allocator); - if (!status.ok()) { - LOG(ERROR) << "GetCPUAllocator: " << status.error_message(); - } - VisitableAllocator* allocator; - if (use_bfc_allocator) { - // TODO(reedwm): evaluate whether 64GB by default is the best choice. - int64 cpu_mem_limit_in_mb = -1; - Status status = ReadInt64FromEnvVar("TF_CPU_BFC_MEM_LIMIT_IN_MB", - 1LL << 16 /*64GB max by default*/, - &cpu_mem_limit_in_mb); - if (!status.ok()) { - LOG(ERROR) << "GetCPUAllocator: " << status.error_message(); - } - int64 cpu_mem_limit = cpu_mem_limit_in_mb * (1LL << 20); - allocator = new BFCAllocator(new BasicCPUAllocator(), cpu_mem_limit, - true /*allow_growth*/, - "bfc_cpu_allocator_for_gpu" /*name*/); - VLOG(2) << "Using BFCAllocator with memory limit of " - << cpu_mem_limit_in_mb << " MB for ProcessState CPU allocator"; - } else { - allocator = new PoolAllocator( - 100 /*pool_size_limit*/, true /*auto_resize*/, - new BasicCPUAllocator(), new NoopRounder, "cpu_pool"); - VLOG(2) << "Using PoolAllocator for ProcessState CPU allocator"; - } - if (LogMemory::IsEnabled()) { - // Wrap the allocator to track allocation ids for better logging - // at the cost of performance. - allocator = new TrackingVisitableAllocator(allocator, true); - } - cpu_allocators_.push_back(allocator); +Allocator* GPUProcessState::GetCUDAHostAllocator(int numa_node) { + CHECK(process_state_); + if (!HasGPUDevice() || + !process_state_->ProcessState::FLAGS_brain_mem_reg_cuda_dma) { + return process_state_->GetCPUAllocator(numa_node); } - return cpu_allocators_[0]; -} - -Allocator* ProcessState::GetCUDAHostAllocator(int numa_node) { - if (!HasGPUDevice() || !FLAGS_brain_mem_reg_cuda_dma) { - return cpu_allocator(); - } - // Although we're temporarily ignoring numa_node, check for legality. CHECK_GE(numa_node, 0); - // TODO(tucker): actually maintain separate CPUAllocators for - // different numa_nodes. For now, just one. - numa_node = 0; - { // Here we optimize the most common use case where cuda_host_allocators_ // and cuda_al_ have already been populated and since we're only reading @@ -241,7 +170,7 @@ Allocator* ProcessState::GetCUDAHostAllocator(int numa_node) { // we take a unique lock and populate these vectors. tf_shared_lock lock(mu_); - if (FLAGS_brain_gpu_record_mem_types && + if (process_state_->ProcessState::FLAGS_brain_gpu_record_mem_types && static_cast<int>(cuda_al_.size()) > 0) { return cuda_al_[0]; } @@ -288,21 +217,25 @@ Allocator* ProcessState::GetCUDAHostAllocator(int numa_node) { allocator = new TrackingVisitableAllocator(allocator, true); } cuda_host_allocators_.push_back(allocator); - if (FLAGS_brain_gpu_record_mem_types) { - MemDesc md; - md.loc = MemDesc::CPU; + if (process_state_->ProcessState::FLAGS_brain_gpu_record_mem_types) { + ProcessState::MemDesc md; + md.loc = ProcessState::MemDesc::CPU; md.dev_index = 0; md.gpu_registered = true; md.nic_registered = false; cuda_al_.push_back(new internal::RecordingAllocator( - &mem_desc_map_, cuda_host_allocators_.back(), md, &mu_)); + &process_state_->mem_desc_map_, cuda_host_allocators_.back(), md, + &mu_)); } } - if (FLAGS_brain_gpu_record_mem_types) return cuda_al_[0]; + if (process_state_->ProcessState::FLAGS_brain_gpu_record_mem_types) + return cuda_al_[0]; return cuda_host_allocators_[0]; } -void ProcessState::AddGPUAllocVisitor(int bus_id, AllocVisitor visitor) { +void GPUProcessState::AddGPUAllocVisitor(int bus_id, + const AllocVisitor& visitor) { + CHECK(process_state_); #if GOOGLE_CUDA mutex_lock lock(mu_); for (int i = 0; i < static_cast<int64>(gpu_allocators_.size()); ++i) { @@ -320,17 +253,17 @@ void ProcessState::AddGPUAllocVisitor(int bus_id, AllocVisitor visitor) { #endif // GOOGLE_CUDA } -void ProcessState::TestOnlyReset() { - mutex_lock lock(mu_); - gpu_device_enabled_ = false; - gpu_visitors_.clear(); - mem_desc_map_.clear(); - gtl::STLDeleteElements(&cpu_allocators_); - gtl::STLDeleteElements(&gpu_allocators_); - gtl::STLDeleteElements(&cuda_host_allocators_); - gtl::STLDeleteElements(&cpu_al_); - gtl::STLDeleteElements(&gpu_al_); - gtl::STLDeleteElements(&cuda_al_); +void GPUProcessState::TestOnlyReset() { + process_state_->ProcessState::TestOnlyReset(); + { + mutex_lock lock(mu_); + gpu_device_enabled_ = false; + gpu_visitors_.clear(); + gtl::STLDeleteElements(&gpu_allocators_); + gtl::STLDeleteElements(&cuda_host_allocators_); + gtl::STLDeleteElements(&gpu_al_); + gtl::STLDeleteElements(&cuda_al_); + } } } // namespace tensorflow diff --git a/tensorflow/core/common_runtime/gpu/process_state.h b/tensorflow/core/common_runtime/gpu/gpu_process_state.h index bc2c4182d7..cb41c3c6bd 100644 --- a/tensorflow/core/common_runtime/gpu/process_state.h +++ b/tensorflow/core/common_runtime/gpu/gpu_process_state.h @@ -1,4 +1,4 @@ -/* Copyright 2015 The TensorFlow Authors. All Rights Reserved. +/* Copyright 2018 The TensorFlow Authors. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -13,8 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. ==============================================================================*/ -#ifndef TENSORFLOW_COMMON_RUNTIME_GPU_PROCESS_STATE_H_ -#define TENSORFLOW_COMMON_RUNTIME_GPU_PROCESS_STATE_H_ +#ifndef TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_PROCESS_STATE_H_ +#define TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_PROCESS_STATE_H_ #include <functional> #include <map> @@ -22,6 +22,7 @@ limitations under the License. #include <vector> #include "tensorflow/core/common_runtime/gpu/gpu_id.h" +#include "tensorflow/core/common_runtime/process_state.h" #include "tensorflow/core/framework/allocator.h" #include "tensorflow/core/platform/mutex.h" #include "tensorflow/core/platform/thread_annotations.h" @@ -34,27 +35,10 @@ class Allocator; class VisitableAllocator; class PoolAllocator; -// Singleton that manages per-process state, e.g. allocation -// of shared resources. -class ProcessState { +// Singleton that manages per-process state when GPUs are present. +class GPUProcessState { public: - static ProcessState* singleton(); - - // Descriptor for memory allocation attributes, used by optional - // runtime correctness analysis logic. - struct MemDesc { - enum MemLoc { CPU, GPU }; - MemLoc loc; - int dev_index; - bool gpu_registered; - bool nic_registered; - MemDesc() - : loc(CPU), - dev_index(0), - gpu_registered(false), - nic_registered(false) {} - string DebugString(); - }; + static GPUProcessState* singleton(); // Query whether any GPU device has been created so far. // Disable thread safety analysis since a race is benign here. @@ -68,14 +52,6 @@ class ProcessState { gpu_device_enabled_ = true; } - // Returns what we know about the memory at ptr. - // If we know nothing, it's called CPU 0 with no other attributes. - MemDesc PtrType(const void* ptr); - - // Returns the one CPUAllocator used for the given numa_node. - // TEMPORARY: ignores numa_node. - Allocator* GetCPUAllocator(int numa_node); - // Returns the one GPU allocator used for the indexed GPU. // Note that this is a system GPU index, not (necessarily) a brain // device index. @@ -107,69 +83,39 @@ class ProcessState { // the index of one of the PCIe buses. If the bus_id is invalid, // results are undefined. typedef std::function<void(void*, size_t)> AllocVisitor; - virtual void AddGPUAllocVisitor(int bus_id, AllocVisitor visitor); - - typedef std::unordered_map<const void*, MemDesc> MDMap; + virtual void AddGPUAllocVisitor(int bus_id, const AllocVisitor& visitor); protected: - ProcessState(); + GPUProcessState(); // Helper method for unit tests to reset the ProcessState singleton by // cleaning up everything. Never use in production. virtual void TestOnlyReset(); - static ProcessState* instance_; + ProcessState::MDMap* mem_desc_map() { + if (process_state_) return &process_state_->mem_desc_map_; + return nullptr; + } + + static GPUProcessState* instance_; + ProcessState* process_state_; // Not owned. bool gpu_device_enabled_; mutex mu_; - std::vector<Allocator*> cpu_allocators_ GUARDED_BY(mu_); std::vector<VisitableAllocator*> gpu_allocators_ GUARDED_BY(mu_); std::vector<std::vector<AllocVisitor>> gpu_visitors_ GUARDED_BY(mu_); std::vector<Allocator*> cuda_host_allocators_ GUARDED_BY(mu_); - virtual ~ProcessState(); + virtual ~GPUProcessState(); // Optional RecordingAllocators that wrap the corresponding // Allocators for runtime attribute use analysis. - MDMap mem_desc_map_; - std::vector<Allocator*> cpu_al_ GUARDED_BY(mu_); std::vector<Allocator*> gpu_al_ GUARDED_BY(mu_); std::vector<Allocator*> cuda_al_ GUARDED_BY(mu_); friend class GPUDeviceTest; }; -namespace internal { -class RecordingAllocator : public Allocator { - public: - RecordingAllocator(ProcessState::MDMap* mm, Allocator* a, - ProcessState::MemDesc md, mutex* mu) - : mm_(mm), a_(a), md_(md), mu_(mu) {} - - string Name() override { return a_->Name(); } - void* AllocateRaw(size_t alignment, size_t num_bytes) override { - void* p = a_->AllocateRaw(alignment, num_bytes); - mutex_lock l(*mu_); - (*mm_)[p] = md_; - return p; - } - void DeallocateRaw(void* p) override { - mutex_lock l(*mu_); - auto iter = mm_->find(p); - mm_->erase(iter); - a_->DeallocateRaw(p); - } - bool TracksAllocationSizes() override { return a_->TracksAllocationSizes(); } - size_t RequestedSize(const void* p) override { return a_->RequestedSize(p); } - size_t AllocatedSize(const void* p) override { return a_->AllocatedSize(p); } - void GetStats(AllocatorStats* stats) override { a_->GetStats(stats); } - void ClearStats() override { a_->ClearStats(); } - ProcessState::MDMap* mm_; // not owned - Allocator* a_; // not owned - ProcessState::MemDesc md_; - mutex* mu_; -}; -} // namespace internal } // namespace tensorflow -#endif // TENSORFLOW_COMMON_RUNTIME_GPU_PROCESS_STATE_H_ +#endif // TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_PROCESS_STATE_H_ diff --git a/tensorflow/core/common_runtime/gpu/gpu_util.cc b/tensorflow/core/common_runtime/gpu/gpu_util.cc index 042a1c0fe0..5851360cab 100644 --- a/tensorflow/core/common_runtime/gpu/gpu_util.cc +++ b/tensorflow/core/common_runtime/gpu/gpu_util.cc @@ -19,7 +19,7 @@ limitations under the License. #include "tensorflow/core/common_runtime/device.h" #include "tensorflow/core/common_runtime/dma_helper.h" #include "tensorflow/core/common_runtime/gpu/gpu_event_mgr.h" -#include "tensorflow/core/common_runtime/gpu/process_state.h" +#include "tensorflow/core/common_runtime/gpu/gpu_process_state.h" #include "tensorflow/core/common_runtime/gpu_device_context.h" #include "tensorflow/core/framework/tensor.h" #include "tensorflow/core/framework/tensor.pb.h" @@ -150,7 +150,7 @@ void GPUUtil::SetProtoFromGPU(const Tensor& tensor, Device* dev, const int64 total_bytes = is_dead ? 0 : tensor.TotalBytes(); if (total_bytes > 0) { tracing::ScopedAnnotation annotation("SetProtoFromGPU"); - alloc = ProcessState::singleton()->GetCUDAHostAllocator(0); + alloc = GPUProcessState::singleton()->GetCUDAHostAllocator(0); buf = alloc->Allocate<char>(total_bytes); if (LogMemory::IsEnabled()) { LogMemory::RecordRawAllocation("SetProtoFromGPU", diff --git a/tensorflow/core/common_runtime/gpu/pool_allocator_test.cc b/tensorflow/core/common_runtime/gpu/pool_allocator_test.cc index a4c8d5fe86..583bff2c07 100644 --- a/tensorflow/core/common_runtime/gpu/pool_allocator_test.cc +++ b/tensorflow/core/common_runtime/gpu/pool_allocator_test.cc @@ -15,8 +15,9 @@ limitations under the License. #if GOOGLE_CUDA -#include "tensorflow/core/common_runtime/gpu/pool_allocator.h" +#include "tensorflow/core/common_runtime/pool_allocator.h" +#include "tensorflow/core/common_runtime/gpu/cuda_host_allocator.h" #include "tensorflow/core/platform/stream_executor.h" #include "tensorflow/core/platform/test.h" @@ -96,7 +97,8 @@ TEST(PoolAllocatorTest, Alignment) { TEST(PoolAllocatorTest, AutoResize) { PoolAllocator pool(2 /*pool_size_limit*/, true /*auto_resize*/, - new BasicCPUAllocator, new NoopRounder, "pool"); + new BasicCPUAllocator(0 /*numa_node*/), new NoopRounder, + "pool"); // Alloc/dealloc 10 sizes just a few times, confirming pool size // stays at 2. diff --git a/tensorflow/core/common_runtime/gpu/pool_allocator.cc b/tensorflow/core/common_runtime/pool_allocator.cc index 66fff16e8f..10a24ed14c 100644 --- a/tensorflow/core/common_runtime/gpu/pool_allocator.cc +++ b/tensorflow/core/common_runtime/pool_allocator.cc @@ -13,7 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. ==============================================================================*/ -#include "tensorflow/core/common_runtime/gpu/pool_allocator.h" +#include "tensorflow/core/common_runtime/pool_allocator.h" #include <errno.h> #ifndef _MSC_VER @@ -284,4 +284,12 @@ void PoolAllocator::AddFreeVisitor(Visitor visitor) { free_visitors_.push_back(visitor); } +void* BasicCPUAllocator::Alloc(size_t alignment, size_t num_bytes) { + return port::AlignedMalloc(num_bytes, static_cast<int>(alignment)); +} + +void BasicCPUAllocator::Free(void* ptr, size_t num_bytes) { + port::AlignedFree(ptr); +} + } // namespace tensorflow diff --git a/tensorflow/core/common_runtime/gpu/pool_allocator.h b/tensorflow/core/common_runtime/pool_allocator.h index 310158aba1..607734445b 100644 --- a/tensorflow/core/common_runtime/gpu/pool_allocator.h +++ b/tensorflow/core/common_runtime/pool_allocator.h @@ -13,12 +13,11 @@ See the License for the specific language governing permissions and limitations under the License. ==============================================================================*/ -#ifndef TENSORFLOW_COMMON_RUNTIME_GPU_POOL_ALLOCATOR_H_ -#define TENSORFLOW_COMMON_RUNTIME_GPU_POOL_ALLOCATOR_H_ +#ifndef TENSORFLOW_CORE_COMMON_RUNTIME_POOL_ALLOCATOR_H_ +#define TENSORFLOW_CORE_COMMON_RUNTIME_POOL_ALLOCATOR_H_ // Simple LRU pool allocators for various flavors of CPU RAM that -// implement the VisitableAllocator interface. GPU memory is managed -// by GPURegionAllocator. +// implement the VisitableAllocator interface. #include <atomic> #include <map> @@ -28,9 +27,7 @@ limitations under the License. #include "tensorflow/core/lib/core/bits.h" #include "tensorflow/core/platform/logging.h" #include "tensorflow/core/platform/macros.h" -#include "tensorflow/core/platform/mem.h" #include "tensorflow/core/platform/mutex.h" -#include "tensorflow/core/platform/stream_executor.h" #include "tensorflow/core/platform/types.h" namespace tensorflow { @@ -168,48 +165,18 @@ class Pow2Rounder : public RoundUpInterface { class BasicCPUAllocator : public SubAllocator { public: + // Argument numa_node is currently ignored. + explicit BasicCPUAllocator(int numa_node) : numa_node_(numa_node) {} + ~BasicCPUAllocator() override {} - void* Alloc(size_t alignment, size_t num_bytes) override { - return port::AlignedMalloc(num_bytes, alignment); - } - void Free(void* ptr, size_t num_bytes) override { port::AlignedFree(ptr); } -}; + void* Alloc(size_t alignment, size_t num_bytes) override; -// Allocator for pinned CPU RAM that is made known to CUDA for the -// purpose of efficient DMA with a GPU. -class CUDAHostAllocator : public SubAllocator { - public: - // Note: stream_exec cannot be null. - explicit CUDAHostAllocator(se::StreamExecutor* stream_exec) - : stream_exec_(stream_exec) { - CHECK(stream_exec_ != nullptr); - } - ~CUDAHostAllocator() override {} - - void* Alloc(size_t alignment, size_t num_bytes) override { - void* ptr = nullptr; - if (num_bytes > 0) { - ptr = stream_exec_->HostMemoryAllocate(num_bytes); - if (ptr == nullptr) { - LOG(WARNING) << "could not allocate pinned host memory of size: " - << num_bytes; - } - } - return ptr; - } - - void Free(void* ptr, size_t num_bytes) override { - if (ptr != nullptr) { - stream_exec_->HostMemoryDeallocate(ptr); - } - } + void Free(void* ptr, size_t num_bytes) override; private: - se::StreamExecutor* stream_exec_; // not owned, non-null - - TF_DISALLOW_COPY_AND_ASSIGN(CUDAHostAllocator); + int numa_node_; }; } // namespace tensorflow -#endif // TENSORFLOW_COMMON_RUNTIME_GPU_POOL_ALLOCATOR_H_ +#endif // TENSORFLOW_CORE_COMMON_RUNTIME_POOL_ALLOCATOR_H_ diff --git a/tensorflow/core/common_runtime/process_state.cc b/tensorflow/core/common_runtime/process_state.cc new file mode 100644 index 0000000000..4d83b25ce6 --- /dev/null +++ b/tensorflow/core/common_runtime/process_state.cc @@ -0,0 +1,129 @@ +/* Copyright 2015 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include "tensorflow/core/common_runtime/process_state.h" + +#include <cstring> +#include <vector> + +#include "tensorflow/core/common_runtime/bfc_allocator.h" +#include "tensorflow/core/common_runtime/pool_allocator.h" +#include "tensorflow/core/framework/allocator.h" +#include "tensorflow/core/framework/log_memory.h" +#include "tensorflow/core/framework/tracking_allocator.h" +#include "tensorflow/core/lib/gtl/stl_util.h" +#include "tensorflow/core/lib/strings/strcat.h" +#include "tensorflow/core/platform/logging.h" +#include "tensorflow/core/platform/mutex.h" +#include "tensorflow/core/platform/types.h" +#include "tensorflow/core/util/env_var.h" + +namespace tensorflow { + +ProcessState* ProcessState::instance_ = nullptr; + +/*static*/ ProcessState* ProcessState::singleton() { + if (instance_ == nullptr) { + instance_ = new ProcessState; + } + + return instance_; +} + +ProcessState::ProcessState() : numa_enabled_(false) { + CHECK(instance_ == nullptr); +} + +// Normally the ProcessState singleton is never explicitly deleted. +// This function is defined for debugging problems with the allocators. +ProcessState::~ProcessState() { + CHECK_EQ(this, instance_); + instance_ = nullptr; + for (Allocator* a : cpu_allocators_) { + delete a; + } +} + +string ProcessState::MemDesc::DebugString() { + return strings::StrCat((loc == CPU ? "CPU " : "GPU "), dev_index, + ", dma: ", gpu_registered, ", nic: ", nic_registered); +} + +ProcessState::MemDesc ProcessState::PtrType(const void* ptr) { + if (FLAGS_brain_gpu_record_mem_types) { + auto iter = mem_desc_map_.find(ptr); + if (iter != mem_desc_map_.end()) { + return iter->second; + } + } + return MemDesc(); +} + +Allocator* ProcessState::GetCPUAllocator(int numa_node) { + CHECK_GE(numa_node, 0); + if (!numa_enabled_) numa_node = 0; + mutex_lock lock(mu_); + while (cpu_allocators_.size() <= static_cast<size_t>(numa_node)) { + bool use_bfc_allocator = false; + // TODO(reedwm): Switch default to BGFAllocator if it's at least as fast and + // efficient. + Status status = ReadBoolFromEnvVar("TF_CPU_ALLOCATOR_USE_BFC", false, + &use_bfc_allocator); + if (!status.ok()) { + LOG(ERROR) << "GetCPUAllocator: " << status.error_message(); + } + VisitableAllocator* allocator; + if (use_bfc_allocator) { + // TODO(reedwm): evaluate whether 64GB by default is the best choice. + int64 cpu_mem_limit_in_mb = -1; + Status status = ReadInt64FromEnvVar("TF_CPU_BFC_MEM_LIMIT_IN_MB", + 1LL << 16 /*64GB max by default*/, + &cpu_mem_limit_in_mb); + if (!status.ok()) { + LOG(ERROR) << "GetCPUAllocator: " << status.error_message(); + } + int64 cpu_mem_limit = cpu_mem_limit_in_mb * (1LL << 20); + allocator = new BFCAllocator( + new BasicCPUAllocator(numa_enabled_ ? numa_node : -1), cpu_mem_limit, + true /*allow_growth*/, "bfc_cpu_allocator_for_gpu" /*name*/); + VLOG(2) << "Using BFCAllocator with memory limit of " + << cpu_mem_limit_in_mb << " MB for ProcessState CPU allocator"; + } else { + allocator = new PoolAllocator( + 100 /*pool_size_limit*/, true /*auto_resize*/, + new BasicCPUAllocator(numa_enabled_ ? numa_node : -1), + new NoopRounder, "cpu_pool"); + VLOG(2) << "Using PoolAllocator for ProcessState CPU allocator " + << "numa_enabled_=" << numa_enabled_ + << " numa_node=" << numa_node; + } + if (LogMemory::IsEnabled()) { + // Wrap the allocator to track allocation ids for better logging + // at the cost of performance. + allocator = new TrackingVisitableAllocator(allocator, true); + } + cpu_allocators_.push_back(allocator); + } + return cpu_allocators_[numa_node]; +} + +void ProcessState::TestOnlyReset() { + mutex_lock lock(mu_); + mem_desc_map_.clear(); + gtl::STLDeleteElements(&cpu_allocators_); + gtl::STLDeleteElements(&cpu_al_); +} + +} // namespace tensorflow diff --git a/tensorflow/core/common_runtime/process_state.h b/tensorflow/core/common_runtime/process_state.h new file mode 100644 index 0000000000..0f4ae230bb --- /dev/null +++ b/tensorflow/core/common_runtime/process_state.h @@ -0,0 +1,132 @@ +/* Copyright 2015 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#ifndef TENSORFLOW_CORE_COMMON_RUNTIME_PROCESS_STATE_H_ +#define TENSORFLOW_CORE_COMMON_RUNTIME_PROCESS_STATE_H_ + +#include <functional> +#include <map> +#include <unordered_map> +#include <vector> + +#include "tensorflow/core/framework/allocator.h" +#include "tensorflow/core/platform/mutex.h" +#include "tensorflow/core/platform/thread_annotations.h" +#include "tensorflow/core/platform/types.h" +#include "tensorflow/core/protobuf/config.pb.h" + +namespace tensorflow { + +class Allocator; +class VisitableAllocator; +class PoolAllocator; + +// Singleton that manages per-process state, e.g. allocation of +// shared resources. +class ProcessState { + public: + static ProcessState* singleton(); + + // Descriptor for memory allocation attributes, used by optional + // runtime correctness analysis logic. + struct MemDesc { + enum MemLoc { CPU, GPU }; + MemLoc loc; + int dev_index; + bool gpu_registered; + bool nic_registered; + MemDesc() + : loc(CPU), + dev_index(0), + gpu_registered(false), + nic_registered(false) {} + string DebugString(); + }; + + // If NUMA Allocators are desired, call this before calling any + // Allocator accessor. + void EnableNUMA() { numa_enabled_ = true; } + + // Returns what we know about the memory at ptr. + // If we know nothing, it's called CPU 0 with no other attributes. + MemDesc PtrType(const void* ptr); + + // Returns the one CPUAllocator used for the given numa_node. + // TEMPORARY: ignores numa_node. + Allocator* GetCPUAllocator(int numa_node); + + typedef std::unordered_map<const void*, MemDesc> MDMap; + + protected: + ProcessState(); + friend class GPUProcessState; + + // If these flags need to be runtime configurable consider adding + // them to ConfigProto. + static const bool FLAGS_brain_mem_reg_cuda_dma = true; + static const bool FLAGS_brain_gpu_record_mem_types = false; + + // Helper method for unit tests to reset the ProcessState singleton by + // cleaning up everything. Never use in production. + virtual void TestOnlyReset(); + + static ProcessState* instance_; + bool numa_enabled_; + + mutex mu_; + + std::vector<Allocator*> cpu_allocators_ GUARDED_BY(mu_); + + virtual ~ProcessState(); + + // Optional RecordingAllocators that wrap the corresponding + // Allocators for runtime attribute use analysis. + MDMap mem_desc_map_; + std::vector<Allocator*> cpu_al_ GUARDED_BY(mu_); +}; + +namespace internal { +class RecordingAllocator : public Allocator { + public: + RecordingAllocator(ProcessState::MDMap* mm, Allocator* a, + ProcessState::MemDesc md, mutex* mu) + : mm_(mm), a_(a), md_(md), mu_(mu) {} + + string Name() override { return a_->Name(); } + void* AllocateRaw(size_t alignment, size_t num_bytes) override { + void* p = a_->AllocateRaw(alignment, num_bytes); + mutex_lock l(*mu_); + (*mm_)[p] = md_; + return p; + } + void DeallocateRaw(void* p) override { + mutex_lock l(*mu_); + auto iter = mm_->find(p); + mm_->erase(iter); + a_->DeallocateRaw(p); + } + bool TracksAllocationSizes() override { return a_->TracksAllocationSizes(); } + size_t RequestedSize(const void* p) override { return a_->RequestedSize(p); } + size_t AllocatedSize(const void* p) override { return a_->AllocatedSize(p); } + void GetStats(AllocatorStats* stats) override { a_->GetStats(stats); } + void ClearStats() override { a_->ClearStats(); } + ProcessState::MDMap* mm_; // not owned + Allocator* a_; // not owned + ProcessState::MemDesc md_; + mutex* mu_; +}; +} // namespace internal +} // namespace tensorflow +#endif // TENSORFLOW_CORE_COMMON_RUNTIME_PROCESS_STATE_H_ diff --git a/tensorflow/core/framework/api_def.proto b/tensorflow/core/framework/api_def.proto index 3f8dd272e7..c6cda06342 100644 --- a/tensorflow/core/framework/api_def.proto +++ b/tensorflow/core/framework/api_def.proto @@ -30,6 +30,10 @@ import "tensorflow/core/framework/attr_value.proto"; message ApiDef { // Name of the op (in the OpDef) to specify the API for. string graph_op_name = 1; + // If this op is deprecated, set deprecation message to the message + // that should be logged when this op is used. + // The message should indicate alternative op to use, if any. + string deprecation_message = 12; enum Visibility { // Normally this is "VISIBLE" unless you are inheriting a @@ -56,6 +60,12 @@ message ApiDef { // use a snake_case convention instead of CamelCase. string name = 1; + // Set if this endpoint is deprecated. If set to true, a message suggesting + // to use a non-deprecated endpoint instead will be printed. If all + // endpoints are deprecated, set deprecation_message in ApiDef instead. + bool deprecated = 3; + // Deprecated: set deprecated to "true" instead. We can auto-generate + // the message. // If this endpoint is deprecated, set deprecation_message to a // message that should be logged when the endpoint is used. // The message should indicate alternative endpoint to use, if any. diff --git a/tensorflow/core/lib/db/sqlite_test.cc b/tensorflow/core/lib/db/sqlite_test.cc index c099160b0c..1590055960 100644 --- a/tensorflow/core/lib/db/sqlite_test.cc +++ b/tensorflow/core/lib/db/sqlite_test.cc @@ -73,6 +73,7 @@ TEST_F(SqliteTest, InsertAndSelectDouble) { EXPECT_EQ(1, stmt.ColumnInt(1)); } +#ifdef DSQLITE_ENABLE_JSON1 TEST_F(SqliteTest, Json1Extension) { string s1 = "{\"key\": 42}"; string s2 = "{\"key\": \"value\"}"; @@ -85,6 +86,7 @@ TEST_F(SqliteTest, Json1Extension) { EXPECT_EQ(42, stmt.ColumnInt(0)); EXPECT_EQ("value", stmt.ColumnString(1)); } +#endif //DSQLITE_ENABLE_JSON1 TEST_F(SqliteTest, NulCharsInString) { string s; // XXX: Want to write {2, '\0'} but not sure why not. diff --git a/tensorflow/core/lib/gtl/manual_constructor_test.cc b/tensorflow/core/lib/gtl/manual_constructor_test.cc index 4e832ce8d8..35cbc78b66 100644 --- a/tensorflow/core/lib/gtl/manual_constructor_test.cc +++ b/tensorflow/core/lib/gtl/manual_constructor_test.cc @@ -95,9 +95,6 @@ TEST(ManualConstructorTest, Alignment) { #ifdef ARCH_K8 EXPECT_EQ(reinterpret_cast<intptr_t>(test2.b.get()) % 16, 0); #endif -#ifdef ARCH_PIII - EXPECT_EQ(reinterpret_cast<intptr_t>(test2.b.get()) % 4, 0); -#endif } TEST(ManualConstructorTest, DefaultInitialize) { diff --git a/tensorflow/core/ops/compat/ops_history.v1.pbtxt b/tensorflow/core/ops/compat/ops_history.v1.pbtxt index 5b9fbc2437..8fc87408a2 100644 --- a/tensorflow/core/ops/compat/ops_history.v1.pbtxt +++ b/tensorflow/core/ops/compat/ops_history.v1.pbtxt @@ -66720,6 +66720,54 @@ op { } } op { + name: "SparseSliceGrad" + input_arg { + name: "backprop_val_grad" + type_attr: "T" + } + input_arg { + name: "input_indices" + type: DT_INT64 + } + input_arg { + name: "input_start" + type: DT_INT64 + } + input_arg { + name: "output_indices" + type: DT_INT64 + } + output_arg { + name: "val_grad" + type_attr: "T" + } + attr { + name: "T" + type: "type" + allowed_values { + list { + type: DT_FLOAT + type: DT_DOUBLE + type: DT_INT32 + type: DT_UINT8 + type: DT_INT16 + type: DT_INT8 + type: DT_COMPLEX64 + type: DT_INT64 + type: DT_QINT8 + type: DT_QUINT8 + type: DT_QINT32 + type: DT_BFLOAT16 + type: DT_UINT16 + type: DT_COMPLEX128 + type: DT_HALF + type: DT_UINT32 + type: DT_UINT64 + } + } + } +} +op { name: "SparseSoftmax" input_arg { name: "sp_indices" diff --git a/tensorflow/core/ops/ops.pbtxt b/tensorflow/core/ops/ops.pbtxt index 36ecc0d9db..32bbd4521a 100644 --- a/tensorflow/core/ops/ops.pbtxt +++ b/tensorflow/core/ops/ops.pbtxt @@ -30138,6 +30138,54 @@ op { } } op { + name: "SparseSliceGrad" + input_arg { + name: "backprop_val_grad" + type_attr: "T" + } + input_arg { + name: "input_indices" + type: DT_INT64 + } + input_arg { + name: "input_start" + type: DT_INT64 + } + input_arg { + name: "output_indices" + type: DT_INT64 + } + output_arg { + name: "val_grad" + type_attr: "T" + } + attr { + name: "T" + type: "type" + allowed_values { + list { + type: DT_FLOAT + type: DT_DOUBLE + type: DT_INT32 + type: DT_UINT8 + type: DT_INT16 + type: DT_INT8 + type: DT_COMPLEX64 + type: DT_INT64 + type: DT_QINT8 + type: DT_QUINT8 + type: DT_QINT32 + type: DT_BFLOAT16 + type: DT_UINT16 + type: DT_COMPLEX128 + type: DT_HALF + type: DT_UINT32 + type: DT_UINT64 + } + } + } +} +op { name: "SparseSoftmax" input_arg { name: "sp_indices" diff --git a/tensorflow/python/BUILD b/tensorflow/python/BUILD index f89f080ef8..ebfcfff4a5 100644 --- a/tensorflow/python/BUILD +++ b/tensorflow/python/BUILD @@ -127,12 +127,14 @@ py_library( ":util", ":weights_broadcast_ops", "//tensorflow/core:protos_all_py", + "//tensorflow/python/compat", "//tensorflow/python/data", "//tensorflow/python/feature_column:feature_column_py", "//tensorflow/python/keras", "//tensorflow/python/ops/distributions", "//tensorflow/python/ops/linalg", "//tensorflow/python/ops/losses", + "//tensorflow/python/ops/parallel_for", "//tensorflow/python/profiler", "//tensorflow/python/saved_model", "//third_party/py/numpy", diff --git a/tensorflow/python/compat/BUILD b/tensorflow/python/compat/BUILD index 5f55b22818..58ceafca06 100644 --- a/tensorflow/python/compat/BUILD +++ b/tensorflow/python/compat/BUILD @@ -2,9 +2,21 @@ licenses(["notice"]) # Apache 2.0 exports_files(["LICENSE"]) +load("//tensorflow:tensorflow.bzl", "tf_py_test") + py_library( name = "compat", srcs = ["compat.py"], srcs_version = "PY2AND3", visibility = ["//tensorflow:internal"], ) + +tf_py_test( + name = "compat_test", + size = "small", + srcs = ["compat_test.py"], + additional_deps = [ + ":compat", + "//tensorflow/python:client_testlib", + ], +) diff --git a/tensorflow/python/compat/compat.py b/tensorflow/python/compat/compat.py index e05ad55447..68a6421c2c 100644 --- a/tensorflow/python/compat/compat.py +++ b/tensorflow/python/compat/compat.py @@ -23,6 +23,7 @@ from __future__ import division from __future__ import print_function import datetime +from tensorflow.python.util import tf_contextlib _FORWARD_COMPATIBILITY_HORIZON = datetime.date(2018, 8, 1) @@ -79,3 +80,46 @@ def forward_compatible(year, month, day): source code after (year, month, day). """ return _FORWARD_COMPATIBILITY_HORIZON > datetime.date(year, month, day) + + +@tf_contextlib.contextmanager +def forward_compatibility_horizon(year, month, day): + """Context manager for testing forward compatibility of generated graphs. + + To ensure forward compatibility of generated graphs (see `forward_compatible`) + with older binaries, new features can be gated with: + + ```python + if compat.forward_compatible(year=2018, month=08, date=01): + generate_graph_with_new_features() + else: + generate_graph_so_older_binaries_can_consume_it() + ``` + + However, when adding new features, one may want to unittest it before + the forward compatibility window expires. This context manager enables + such tests. For example: + + ```python + from tensorflow.python.compat import compat + + def testMyNewFeature(self): + with compat.forward_compatibility_horizon(2018, 08, 02): + # Test that generate_graph_with_new_features() has an effect + ``` + + Args : + year: A year (e.g. 2018). + month: A month (1 <= month <= 12) in year. + day: A day (1 <= day <= 31, or 30, or 29, or 28) in month. + + Yields: + Nothing. + """ + global _FORWARD_COMPATIBILITY_HORIZON + try: + old_compat_date = _FORWARD_COMPATIBILITY_HORIZON + _FORWARD_COMPATIBILITY_HORIZON = datetime.date(year, month, day) + yield + finally: + _FORWARD_COMPATIBILITY_HORIZON = old_compat_date diff --git a/tensorflow/python/compat/compat_test.py b/tensorflow/python/compat/compat_test.py new file mode 100644 index 0000000000..946abbb300 --- /dev/null +++ b/tensorflow/python/compat/compat_test.py @@ -0,0 +1,70 @@ +# Copyright 2018 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Tests for forward and backwards compatibility utilties.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import datetime +from tensorflow.python.compat import compat +from tensorflow.python.platform import test + + +class CompatTest(test.TestCase): + + def _compatibility_date(self): + date = compat._FORWARD_COMPATIBILITY_HORIZON # pylint: disable=protected-access + return (date.year, date.month, date.day) + + def _n_days_after(self, n): + date = compat._FORWARD_COMPATIBILITY_HORIZON + datetime.timedelta(days=n) # pylint: disable=protected-access + return (date.year, date.month, date.day) + + def test_basic(self): + compatibility_date = self._compatibility_date() + one_day_before = self._n_days_after(-1) + self.assertTrue(compat.forward_compatible(*one_day_before)) + self.assertFalse(compat.forward_compatible(*compatibility_date)) + + def test_decorator(self): + compatibility_date = self._compatibility_date() + one_day_after = self._n_days_after(1) + with compat.forward_compatibility_horizon(*one_day_after): + self.assertTrue(compat.forward_compatible(*compatibility_date)) + self.assertFalse(compat.forward_compatible(*one_day_after)) + + # After exiting context manager, value should be reset. + self.assertFalse(compat.forward_compatible(*compatibility_date)) + + def test_decorator_with_failure(self): + compatibility_date = self._compatibility_date() + one_day_after = self._n_days_after(1) + + class DummyError(Exception): + pass + + try: + with compat.forward_compatibility_horizon(*one_day_after): + raise DummyError() + except DummyError: + pass # silence DummyError + + # After exiting context manager, value should be reset. + self.assertFalse(compat.forward_compatible(*compatibility_date)) + + +if __name__ == '__main__': + test.main() diff --git a/tensorflow/python/estimator/canned/dnn.py b/tensorflow/python/estimator/canned/dnn.py index 2c7c4285ca..c08cf61220 100644 --- a/tensorflow/python/estimator/canned/dnn.py +++ b/tensorflow/python/estimator/canned/dnn.py @@ -26,6 +26,7 @@ from tensorflow.python.estimator.canned import head as head_lib from tensorflow.python.estimator.canned import optimizers from tensorflow.python.feature_column import feature_column as feature_column_lib from tensorflow.python.layers import core as core_layers +from tensorflow.python.layers import normalization from tensorflow.python.ops import init_ops from tensorflow.python.ops import nn from tensorflow.python.ops import partitioned_variables @@ -45,7 +46,7 @@ def _add_hidden_layer_summary(value, tag): def _dnn_logit_fn_builder(units, hidden_units, feature_columns, activation_fn, - dropout, input_layer_partitioner): + dropout, input_layer_partitioner, batch_norm): """Function builder for a dnn logit_fn. Args: @@ -58,6 +59,7 @@ def _dnn_logit_fn_builder(units, hidden_units, feature_columns, activation_fn, dropout: When not `None`, the probability we will drop out a given coordinate. input_layer_partitioner: Partitioner for input layer. + batch_norm: Whether to use batch normalization after each hidden layer. Returns: A logit_fn (see below). @@ -83,6 +85,7 @@ def _dnn_logit_fn_builder(units, hidden_units, feature_columns, activation_fn, A `Tensor` representing the logits, or a list of `Tensor`'s representing multiple logits in the MultiHead case. """ + is_training = mode == model_fn.ModeKeys.TRAIN with variable_scope.variable_scope( 'input_from_feature_columns', values=tuple(six.itervalues(features)), @@ -98,8 +101,20 @@ def _dnn_logit_fn_builder(units, hidden_units, feature_columns, activation_fn, activation=activation_fn, kernel_initializer=init_ops.glorot_uniform_initializer(), name=hidden_layer_scope) - if dropout is not None and mode == model_fn.ModeKeys.TRAIN: + if dropout is not None and is_training: net = core_layers.dropout(net, rate=dropout, training=True) + if batch_norm: + # TODO(hjm): In future, if this becomes popular, we can enable + # customization of the batch normalization params by accepting a + # list of `BatchNormalization` instances as `batch_norm`. + net = normalization.batch_normalization( + net, + # The default momentum 0.99 actually crashes on certain + # problem, so here we use 0.999, which is the default of + # tf.contrib.layers.batch_norm. + momentum=0.999, + training=is_training, + name='batchnorm_%d' % layer_id) _add_hidden_layer_summary(net, hidden_layer_scope.name) with variable_scope.variable_scope('logits', values=(net,)) as logits_scope: @@ -127,7 +142,8 @@ def _dnn_model_fn(features, dropout=None, input_layer_partitioner=None, config=None, - tpu_estimator_spec=False): + tpu_estimator_spec=False, + batch_norm=False): """Deep Neural Net model_fn. Args: @@ -150,6 +166,7 @@ def _dnn_model_fn(features, config: `RunConfig` object to configure the runtime settings. tpu_estimator_spec: Whether to return a `_TPUEstimatorSpec` or or `model_fn.EstimatorSpec` instance. + batch_norm: Whether to use batch normalization after each hidden layer. Returns: An `EstimatorSpec` instance. @@ -182,7 +199,8 @@ def _dnn_model_fn(features, feature_columns=feature_columns, activation_fn=activation_fn, dropout=dropout, - input_layer_partitioner=input_layer_partitioner) + input_layer_partitioner=input_layer_partitioner, + batch_norm=batch_norm) logits = logit_fn(features=features, mode=mode) if tpu_estimator_spec: @@ -299,6 +317,7 @@ class DNNClassifier(estimator.Estimator): config=None, warm_start_from=None, loss_reduction=losses.Reduction.SUM, + batch_norm=False, ): """Initializes a `DNNClassifier` instance. @@ -345,6 +364,7 @@ class DNNClassifier(estimator.Estimator): names are unchanged. loss_reduction: One of `tf.losses.Reduction` except `NONE`. Describes how to reduce training loss over batch. Defaults to `SUM`. + batch_norm: Whether to use batch normalization after each hidden layer. """ head = head_lib._binary_logistic_or_multi_class_head( # pylint: disable=protected-access n_classes, weight_column, label_vocabulary, loss_reduction) @@ -361,7 +381,8 @@ class DNNClassifier(estimator.Estimator): activation_fn=activation_fn, dropout=dropout, input_layer_partitioner=input_layer_partitioner, - config=config) + config=config, + batch_norm=batch_norm) super(DNNClassifier, self).__init__( model_fn=_model_fn, model_dir=model_dir, config=config, @@ -465,6 +486,7 @@ class DNNRegressor(estimator.Estimator): config=None, warm_start_from=None, loss_reduction=losses.Reduction.SUM, + batch_norm=False, ): """Initializes a `DNNRegressor` instance. @@ -505,6 +527,7 @@ class DNNRegressor(estimator.Estimator): names are unchanged. loss_reduction: One of `tf.losses.Reduction` except `NONE`. Describes how to reduce training loss over batch. Defaults to `SUM`. + batch_norm: Whether to use batch normalization after each hidden layer. """ def _model_fn(features, labels, mode, config): @@ -522,7 +545,8 @@ class DNNRegressor(estimator.Estimator): activation_fn=activation_fn, dropout=dropout, input_layer_partitioner=input_layer_partitioner, - config=config) + config=config, + batch_norm=batch_norm) super(DNNRegressor, self).__init__( model_fn=_model_fn, model_dir=model_dir, config=config, diff --git a/tensorflow/python/estimator/canned/dnn_linear_combined.py b/tensorflow/python/estimator/canned/dnn_linear_combined.py index 2f20e4b289..5f453d6fe8 100644 --- a/tensorflow/python/estimator/canned/dnn_linear_combined.py +++ b/tensorflow/python/estimator/canned/dnn_linear_combined.py @@ -88,7 +88,8 @@ def _dnn_linear_combined_model_fn(features, dnn_activation_fn=nn.relu, dnn_dropout=None, input_layer_partitioner=None, - config=None): + config=None, + batch_norm=False): """Deep Neural Net and Linear combined model_fn. Args: @@ -115,6 +116,7 @@ def _dnn_linear_combined_model_fn(features, coordinate. input_layer_partitioner: Partitioner for input layer. config: `RunConfig` object to configure the runtime settings. + batch_norm: Whether to use batch normalization after each hidden layer. Returns: An `EstimatorSpec` instance. @@ -164,7 +166,8 @@ def _dnn_linear_combined_model_fn(features, feature_columns=dnn_feature_columns, activation_fn=dnn_activation_fn, dropout=dnn_dropout, - input_layer_partitioner=input_layer_partitioner) + input_layer_partitioner=input_layer_partitioner, + batch_norm=batch_norm) dnn_logits = dnn_logit_fn(features=features, mode=mode) linear_parent_scope = 'linear' @@ -321,7 +324,8 @@ class DNNLinearCombinedClassifier(estimator.Estimator): input_layer_partitioner=None, config=None, warm_start_from=None, - loss_reduction=losses.Reduction.SUM): + loss_reduction=losses.Reduction.SUM, + batch_norm=False): """Initializes a DNNLinearCombinedClassifier instance. Args: @@ -374,6 +378,7 @@ class DNNLinearCombinedClassifier(estimator.Estimator): names are unchanged. loss_reduction: One of `tf.losses.Reduction` except `NONE`. Describes how to reduce training loss over batch. Defaults to `SUM`. + batch_norm: Whether to use batch normalization after each hidden layer. Raises: ValueError: If both linear_feature_columns and dnn_features_columns are @@ -413,7 +418,8 @@ class DNNLinearCombinedClassifier(estimator.Estimator): dnn_activation_fn=dnn_activation_fn, dnn_dropout=dnn_dropout, input_layer_partitioner=input_layer_partitioner, - config=config) + config=config, + batch_norm=batch_norm) super(DNNLinearCombinedClassifier, self).__init__( model_fn=_model_fn, model_dir=model_dir, config=config, @@ -515,7 +521,8 @@ class DNNLinearCombinedRegressor(estimator.Estimator): input_layer_partitioner=None, config=None, warm_start_from=None, - loss_reduction=losses.Reduction.SUM): + loss_reduction=losses.Reduction.SUM, + batch_norm=False): """Initializes a DNNLinearCombinedRegressor instance. Args: @@ -562,6 +569,7 @@ class DNNLinearCombinedRegressor(estimator.Estimator): names are unchanged. loss_reduction: One of `tf.losses.Reduction` except `NONE`. Describes how to reduce training loss over batch. Defaults to `SUM`. + batch_norm: Whether to use batch normalization after each hidden layer. Raises: ValueError: If both linear_feature_columns and dnn_features_columns are @@ -592,7 +600,8 @@ class DNNLinearCombinedRegressor(estimator.Estimator): dnn_activation_fn=dnn_activation_fn, dnn_dropout=dnn_dropout, input_layer_partitioner=input_layer_partitioner, - config=config) + config=config, + batch_norm=batch_norm) super(DNNLinearCombinedRegressor, self).__init__( model_fn=_model_fn, model_dir=model_dir, config=config, diff --git a/tensorflow/python/estimator/canned/dnn_testing_utils.py b/tensorflow/python/estimator/canned/dnn_testing_utils.py index 06a648777f..ba17821259 100644 --- a/tensorflow/python/estimator/canned/dnn_testing_utils.py +++ b/tensorflow/python/estimator/canned/dnn_testing_utils.py @@ -65,6 +65,11 @@ from tensorflow.python.training import training_util LEARNING_RATE_NAME = 'dnn/regression_head/dnn/learning_rate' HIDDEN_WEIGHTS_NAME_PATTERN = 'dnn/hiddenlayer_%d/kernel' HIDDEN_BIASES_NAME_PATTERN = 'dnn/hiddenlayer_%d/bias' +BATCH_NORM_BETA_NAME_PATTERN = 'dnn/hiddenlayer_%d/batchnorm_%d/beta' +BATCH_NORM_GAMMA_NAME_PATTERN = 'dnn/hiddenlayer_%d/batchnorm_%d/gamma' +BATCH_NORM_MEAN_NAME_PATTERN = 'dnn/hiddenlayer_%d/batchnorm_%d/moving_mean' +BATCH_NORM_VARIANCE_NAME_PATTERN = ( + 'dnn/hiddenlayer_%d/batchnorm_%d/moving_variance') LOGITS_WEIGHTS_NAME = 'dnn/logits/kernel' LOGITS_BIASES_NAME = 'dnn/logits/bias' OCCUPATION_EMBEDDING_NAME = ('dnn/input_from_feature_columns/input_layer/' @@ -89,7 +94,10 @@ def assert_close(expected, actual, rtol=1e-04, message='', name='assert_close'): name=scope) -def create_checkpoint(weights_and_biases, global_step, model_dir): +def create_checkpoint(weights_and_biases, + global_step, + model_dir, + batch_norm_vars=None): """Create checkpoint file with provided model weights. Args: @@ -98,12 +106,20 @@ def create_checkpoint(weights_and_biases, global_step, model_dir): model_dir: Directory into which checkpoint is saved. """ weights, biases = zip(*weights_and_biases) + if batch_norm_vars: + assert len(batch_norm_vars) == len(weights_and_biases) - 1 + (bn_betas, bn_gammas, bn_means, bn_variances) = zip(*batch_norm_vars) model_weights = {} # Hidden layer weights. for i in range(0, len(weights) - 1): model_weights[HIDDEN_WEIGHTS_NAME_PATTERN % i] = weights[i] model_weights[HIDDEN_BIASES_NAME_PATTERN % i] = biases[i] + if batch_norm_vars: + model_weights[BATCH_NORM_BETA_NAME_PATTERN % (i, i)] = bn_betas[i] + model_weights[BATCH_NORM_GAMMA_NAME_PATTERN % (i, i)] = bn_gammas[i] + model_weights[BATCH_NORM_MEAN_NAME_PATTERN % (i, i)] = bn_means[i] + model_weights[BATCH_NORM_VARIANCE_NAME_PATTERN % (i, i)] = bn_variances[i] # Output layer weights. model_weights[LOGITS_WEIGHTS_NAME] = weights[-1] @@ -503,8 +519,13 @@ class BaseDNNLogitFnTest(object): writer_cache.FileWriterCache.clear() shutil.rmtree(self._model_dir) - def _test_logits(self, mode, hidden_units, logits_dimension, inputs, - expected_logits): + def _test_logits(self, + mode, + hidden_units, + logits_dimension, + inputs, + expected_logits, + batch_norm=False): """Tests that the expected logits are calculated.""" with ops.Graph().as_default(): # Global step needed for MonitoredSession, which is in turn used to @@ -525,7 +546,8 @@ class BaseDNNLogitFnTest(object): ], activation_fn=nn.relu, dropout=None, - input_layer_partitioner=input_layer_partitioner) + input_layer_partitioner=input_layer_partitioner, + batch_norm=batch_norm) logits = logit_fn( features={'age': constant_op.constant(inputs)}, mode=mode) with monitored_session.MonitoredTrainingSession( @@ -556,6 +578,69 @@ class BaseDNNLogitFnTest(object): inputs=[[10.]], expected_logits=[[-2.08]]) + def test_one_dim_logits_with_batch_norm(self): + """Tests one-dimensional logits. + + input_layer = [[10]] + hidden_layer_0 = [[relu(0.6*10 +1), relu(0.5*10 -1)]] = [[7, 4]] + hidden_layer_0 = [[relu(0.6*20 +1), relu(0.5*20 -1)]] = [[13, 9]] + + batch_norm_0, training (epsilon = 0.001): + mean1 = 1/2*(7+13) = 10, + variance1 = 1/2*(3^2+3^2) = 9 + x11 = (7-10)/sqrt(9+0.001) = -0.999944449, + x21 = (13-10)/sqrt(9+0.001) = 0.999944449, + + mean2 = 1/2*(4+9) = 6.5, + variance2 = 1/2*(2.5^2+.2.5^2) = 6.25 + x12 = (4-6.5)/sqrt(6.25+0.001) = -0.99992001, + x22 = (9-6.5)/sqrt(6.25+0.001) = 0.99992001, + + logits = [[-1*(-0.999944449) + 2*(-0.99992001) + 0.3], + [-1*0.999944449 + 2*0.99992001 + 0.3]] + = [[-0.699895571],[1.299895571]] + + batch_norm_0, not training (epsilon = 0.001): + moving_mean1 = 0, moving_variance1 = 1 + x11 = (7-0)/sqrt(1+0.001) = 6.996502623, + x21 = (13-0)/sqrt(1+0.001) = 12.993504871, + moving_mean2 = 0, moving_variance2 = 1 + x12 = (4-0)/sqrt(1+0.001) = 3.998001499, + x22 = (9-0)/sqrt(1+0.001) = 8.995503372, + + logits = [[-1*6.996502623 + 2*3.998001499 + 0.3], + [-1*12.993504871 + 2*8.995503372 + 0.3]] + = [[1.299500375],[5.297501873]] + """ + base_global_step = 100 + create_checkpoint( + ( + ([[.6, .5]], [1., -1.]), + ([[-1.], [2.]], [.3]), + ), + base_global_step, + self._model_dir, + batch_norm_vars=([[0, 0], # beta. + [1, 1], # gamma. + [0, 0], # moving mean. + [1, 1], # moving variance. + ],)) + self._test_logits( + model_fn.ModeKeys.TRAIN, + hidden_units=[2], + logits_dimension=1, + inputs=[[10.], [20.]], + expected_logits=[[-0.699895571], [1.299895571]], + batch_norm=True) + for mode in [model_fn.ModeKeys.EVAL, model_fn.ModeKeys.PREDICT]: + self._test_logits( + mode, + hidden_units=[2], + logits_dimension=1, + inputs=[[10.], [20.]], + expected_logits=[[1.299500375], [5.297501873]], + batch_norm=True) + def test_multi_dim_logits(self): """Tests multi-dimensional logits. @@ -706,7 +791,8 @@ class BaseDNNLogitFnTest(object): ], activation_fn=nn.relu, dropout=None, - input_layer_partitioner=input_layer_partitioner) + input_layer_partitioner=input_layer_partitioner, + batch_norm=False) logits = logit_fn( features={ 'age': constant_op.constant(inputs[0]), diff --git a/tensorflow/python/kernel_tests/constant_op_eager_test.py b/tensorflow/python/kernel_tests/constant_op_eager_test.py index 8e9d75667d..a0d5557b92 100644 --- a/tensorflow/python/kernel_tests/constant_op_eager_test.py +++ b/tensorflow/python/kernel_tests/constant_op_eager_test.py @@ -32,6 +32,9 @@ from tensorflow.python.util import compat # TODO(josh11b): add tests with lists/tuples, Shape. +# TODO(ashankar): Collapse with tests in constant_op_test.py and use something +# like the test_util.run_in_graph_and_eager_modes decorator to confirm +# equivalence between graph and eager execution. class ConstantTest(test.TestCase): def _testCpu(self, x): @@ -280,6 +283,34 @@ class ConstantTest(test.TestCase): with self.assertRaisesRegexp(ValueError, None): constant_op.constant([[1, 2], [3], [4, 5]]) + # TODO(ashankar): This test fails with graph construction since + # tensor_util.make_tensor_proto (invoked from constant_op.constant) + # does not handle iterables (it relies on numpy conversion). + # For consistency, should graph construction handle Python objects + # that implement the sequence protocol (but not numpy conversion), + # or should eager execution fail on such sequences? + def testCustomSequence(self): + + # This is inspired by how many objects in pandas are implemented: + # - They implement the Python sequence protocol + # - But may raise a KeyError on __getitem__(self, 0) + # See https://github.com/tensorflow/tensorflow/issues/20347 + class MySeq(object): + + def __getitem__(self, key): + if key != 1 and key != 3: + raise KeyError(key) + return key + + def __len__(self): + return 2 + + def __iter__(self): + l = list([1, 3]) + return l.__iter__() + + self.assertAllEqual([1, 3], self.evaluate(constant_op.constant(MySeq()))) + class AsTensorTest(test.TestCase): diff --git a/tensorflow/python/layers/normalization.py b/tensorflow/python/layers/normalization.py index ece6667981..f7bc10a6a6 100644 --- a/tensorflow/python/layers/normalization.py +++ b/tensorflow/python/layers/normalization.py @@ -44,7 +44,7 @@ class BatchNormalization(keras_layers.BatchNormalization, base.Layer): normalized, typically the features axis/axes. For instance, after a `Conv2D` layer with `data_format="channels_first"`, set `axis=1`. If a list of axes is provided, each axis in `axis` will be normalized - simultaneously. Default is `-1` which takes uses last axis. Note: when + simultaneously. Default is `-1` which uses the last axis. Note: when using multi-axis batch norm, the `beta`, `gamma`, `moving_mean`, and `moving_variance` variables are the same rank as the input Tensor, with dimension size 1 in all reduced (non-axis) dimensions). diff --git a/tensorflow/python/lib/core/py_seq_tensor.cc b/tensorflow/python/lib/core/py_seq_tensor.cc index 386be35ba2..3b4f12ae31 100644 --- a/tensorflow/python/lib/core/py_seq_tensor.cc +++ b/tensorflow/python/lib/core/py_seq_tensor.cc @@ -88,6 +88,41 @@ bool IsPyDimension(PyObject* obj) { return ret; } +// Sets *elem to a NEW reference to an element in seq on success. +// REQUIRES: PySequence_Check(seq) && PySequence_Length(seq) > 0. +Status SampleElementFromSequence(PyObject* seq, PyObject** elem) { + *elem = PySequence_GetItem(seq, 0); + if (*elem != nullptr) return Status::OK(); + // seq may implement the sequence protocol (i.e., implement __getitem__) + // but may legitimately not have a 0-th element (__getitem__(self, 0) + // raises a KeyError). For example: + // seq = pandas.Series([0, 1, 2], index=[2, 4, 6]) + // + // We don't actually care for the element at key 0, any element will do + // for inferring the element types. All elements are expected to + // have the same type, and this will be validated when converting + // to an EagerTensor. + PyErr_Clear(); + Safe_PyObjectPtr iter(PyObject_GetIter(seq)); + if (PyErr_Occurred()) { + return errors::InvalidArgument("Cannot infer dtype of a ", + Py_TYPE(seq)->tp_name, + " object: ", PyExceptionFetch()); + } + *elem = PyIter_Next(iter.get()); + if (PyErr_Occurred()) { + return errors::InvalidArgument( + "Cannot infer dtype of a ", Py_TYPE(seq)->tp_name, + " object, as iter(<object>).next() failed: ", PyExceptionFetch()); + } + if (*elem == nullptr) { + return errors::InvalidArgument("Cannot infer dtype of a ", + Py_TYPE(seq)->tp_name, + " object since it is an empty sequence"); + } + return Status::OK(); +} + Status InferShapeAndType(PyObject* obj, TensorShape* shape, DataType* dtype) { std::vector<Safe_PyObjectPtr> refs_to_clean; while (true) { @@ -98,7 +133,9 @@ Status InferShapeAndType(PyObject* obj, TensorShape* shape, DataType* dtype) { auto length = PySequence_Length(obj); if (length > 0) { shape->AddDim(length); - obj = PySequence_GetItem(obj, 0); + PyObject* elem = nullptr; + TF_RETURN_IF_ERROR(SampleElementFromSequence(obj, &elem)); + obj = elem; refs_to_clean.push_back(make_safe(obj)); continue; } else if (length == 0) { diff --git a/tensorflow/python/ops/parallel_for/BUILD b/tensorflow/python/ops/parallel_for/BUILD new file mode 100644 index 0000000000..065c2caedc --- /dev/null +++ b/tensorflow/python/ops/parallel_for/BUILD @@ -0,0 +1,129 @@ +package( + default_visibility = [ + "//tensorflow:internal", + ], +) + +load("//tensorflow:tensorflow.bzl", "cuda_py_test") + +licenses(["notice"]) # Apache 2.0 + +py_library( + name = "parallel_for", + srcs = [ + "__init__.py", + "control_flow_ops.py", + "gradients.py", + "pfor.py", + ], + srcs_version = "PY2AND3", + deps = [ + ":control_flow_ops", + ":gradients", + "//tensorflow/python:array_ops", + "//tensorflow/python:check_ops", + "//tensorflow/python:constant_op", + "//tensorflow/python:control_flow_ops", + "//tensorflow/python:data_flow_ops", + "//tensorflow/python:dtypes", + "//tensorflow/python:framework_ops", + "//tensorflow/python:functional_ops", + "//tensorflow/python:gradients", + "//tensorflow/python:math_ops", + "//tensorflow/python:nn_ops", + "//tensorflow/python:platform", + "//tensorflow/python:sparse_ops", + "//tensorflow/python:sparse_tensor", + "//tensorflow/python:tensor_array_ops", + "//tensorflow/python:tensor_shape", + "//tensorflow/python:tensor_util", + "//tensorflow/python:util", + "@absl_py//absl/flags", + ], +) + +py_library( + name = "pfor_lib", + srcs = ["pfor.py"], + srcs_version = "PY2AND3", + deps = [ + "//tensorflow/python:array_ops", + "//tensorflow/python:check_ops", + "//tensorflow/python:constant_op", + "//tensorflow/python:control_flow_ops", + "//tensorflow/python:data_flow_ops", + "//tensorflow/python:dtypes", + "//tensorflow/python:framework_ops", + "//tensorflow/python:functional_ops", + "//tensorflow/python:math_ops", + "//tensorflow/python:nn_ops", + "//tensorflow/python:platform", + "//tensorflow/python:sparse_ops", + "//tensorflow/python:sparse_tensor", + "//tensorflow/python:tensor_array_ops", + "//tensorflow/python:tensor_shape", + "//tensorflow/python:tensor_util", + "@absl_py//absl/flags", + ], +) + +py_library( + name = "control_flow_ops", + srcs = ["control_flow_ops.py"], + srcs_version = "PY2AND3", + visibility = ["//visibility:public"], + deps = [ + ":pfor_lib", + "//tensorflow/python:array_ops", + "//tensorflow/python:control_flow_ops", + "//tensorflow/python:dtypes", + "//tensorflow/python:framework_ops", + "//tensorflow/python:tensor_array_ops", + "//tensorflow/python:util", + ], +) + +cuda_py_test( + name = "control_flow_ops_test", + srcs = ["control_flow_ops_test.py"], + additional_deps = [ + ":control_flow_ops", + "//tensorflow/core:protos_all_py", + "//tensorflow/python:client_testlib", + "//tensorflow/python:gradients", + "//tensorflow/python:logging_ops", + "//tensorflow/python:parsing_ops", + "//tensorflow/python:session", + "//tensorflow/python:tensor_array_grad", + "//tensorflow/python:random_ops", + "//tensorflow/python:util", + ], +) + +py_library( + name = "gradients", + srcs = ["gradients.py"], + srcs_version = "PY2AND3", + deps = [ + ":control_flow_ops", + "//tensorflow/python:array_ops", + "//tensorflow/python:gradients", + "//tensorflow/python:util", + ], +) + +cuda_py_test( + name = "gradients_test", + size = "large", + srcs = ["gradients_test.py"], + additional_deps = [ + ":control_flow_ops", + ":gradients", + "//third_party/py/numpy", + "//tensorflow/python:layers", + "//tensorflow/python:client_testlib", + "//tensorflow/python:random_ops", + "//tensorflow/python/ops/losses", + ], + tags = ["no_gpu"], # TODO(b/80127739): test is flaky +) diff --git a/tensorflow/python/ops/parallel_for/__init__.py b/tensorflow/python/ops/parallel_for/__init__.py new file mode 100644 index 0000000000..b49d865968 --- /dev/null +++ b/tensorflow/python/ops/parallel_for/__init__.py @@ -0,0 +1,35 @@ +# Copyright 2018 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Ops for pfor, for_loop, jacobian.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from tensorflow.python.ops.parallel_for import * # pylint: disable=wildcard-import +from tensorflow.python.ops.parallel_for.control_flow_ops import for_loop +from tensorflow.python.ops.parallel_for.control_flow_ops import pfor +from tensorflow.python.ops.parallel_for.gradients import batch_jacobian +from tensorflow.python.ops.parallel_for.gradients import jacobian +from tensorflow.python.util.all_util import remove_undocumented + +_allowed_symbols = [ + 'pfor', + 'for_loop', + 'jacobian', + 'batch_jacobian', +] + +remove_undocumented(__name__, _allowed_symbols) diff --git a/tensorflow/python/ops/parallel_for/control_flow_ops.py b/tensorflow/python/ops/parallel_for/control_flow_ops.py new file mode 100644 index 0000000000..ccf2eb8214 --- /dev/null +++ b/tensorflow/python/ops/parallel_for/control_flow_ops.py @@ -0,0 +1,123 @@ +# Copyright 2018 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""for_loop and pfor ops.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from tensorflow.python.framework import dtypes +from tensorflow.python.framework import ops +from tensorflow.python.ops import array_ops +from tensorflow.python.ops import control_flow_ops +from tensorflow.python.ops import tensor_array_ops +from tensorflow.python.ops.parallel_for.pfor import PFor +from tensorflow.python.util import nest + + +def for_loop(loop_fn, loop_fn_dtypes, iters): + """Runs `loop_fn` `iters` times and stacks the outputs. + + + Runs `loop_fn` `iters` times, with input values from 0 to `iters - 1`, and + stacks corresponding outputs of the different runs. + + Args: + loop_fn: A function that takes an int32 scalar tf.Tensor object representing + the iteration number, and returns a possibly nested structure of tensor + objects. The shape of these outputs should not depend on the input. + loop_fn_dtypes: dtypes for the outputs of loop_fn. + iters: Number of iterations for which to run loop_fn. + + Returns: + Returns a nested structure of stacked output tensor objects with the same + nested structure as the output of `loop_fn`. + """ + + flat_loop_fn_dtypes = nest.flatten(loop_fn_dtypes) + + def while_body(i, *ta_list): + """Body of while loop.""" + fn_output = nest.flatten(loop_fn(i)) + if len(fn_output) != len(flat_loop_fn_dtypes): + raise ValueError( + "Number of expected outputs, %d, does not match the number of " + "actual outputs, %d, from loop_fn" % (len(flat_loop_fn_dtypes), + len(fn_output))) + outputs = [] + for out, ta in zip(fn_output, ta_list): + # TODO(agarwal): support returning Operation objects from loop_fn. + assert isinstance(out, ops.Tensor) + outputs.append(ta.write(i, array_ops.expand_dims(out, 0))) + return tuple([i + 1] + outputs) + + ta_list = control_flow_ops.while_loop( + lambda i, *ta: i < iters, while_body, [0] + [ + tensor_array_ops.TensorArray(dtype, iters) + for dtype in flat_loop_fn_dtypes + ])[1:] + + # TODO(rachelim): enable this for sparse tensors + return nest.pack_sequence_as(loop_fn_dtypes, [ta.concat() for ta in ta_list]) + + +def pfor(loop_fn, iters): + """Equivalent to running `loop_fn` `iters` times and stacking the outputs. + + `pfor` has functionality similar to `for_loop`, i.e. running `loop_fn` `iters` + times, with input from 0 to `iters - 1`, and stacking corresponding output of + each iteration. However the implementation does not use a tf.while_loop. + Instead it adds new operations to the graph that collectively compute the same + value as what running `loop_fn` in a loop would compute. + + + This is an experimental feature and currently has a lot of limitations: + - There should be no data depenendency between the different iterations. For + example, a future iteration should not depend on a value or side-effect of + a previous iteration. + - Stateful kernels may mostly not be supported since these often imply a + data dependency or ordering of the iterations. We do support a limited set + of such stateful kernels though (like RandomFoo, Variable operations like + reads, etc). + - Conversion works only on a limited set of kernels for which a converter + has been registered. + - loop_fn cannot currently contain control flow operations like + tf.while_loop or tf.cond. + - `loop_fn` should return nested structure of Tensors or Operations. However + if an Operation is returned, it should have zero outputs. + - The shape and dtype of `loop_fn` outputs should not depend on the input + to loop_fn. + + Args: + loop_fn: A function that takes an int32 scalar tf.Tensor object representing + the iteration number, and returns a possibly nested structure of Tensor or + Operation objects. + iters: Number of iterations for which to run loop_fn. + + Returns: + Returns a nested structure of stacked tensor objects with the same nested + structure as the output of `loop_fn`. + """ + existing_ops = set(ops.get_default_graph().get_operations()) + with ops.name_scope("loop_body"): + loop_var = array_ops.placeholder(dtypes.int32, shape=[]) + loop_fn_outputs = loop_fn(loop_var) + new_ops = set(ops.get_default_graph().get_operations()) - existing_ops + iters = ops.convert_to_tensor(iters) + with ops.name_scope("pfor"): + converter = PFor(loop_var, iters, new_ops) + outputs = [] + for loop_fn_output in nest.flatten(loop_fn_outputs): + outputs.append(converter.convert(loop_fn_output)) + return nest.pack_sequence_as(loop_fn_outputs, outputs) diff --git a/tensorflow/python/ops/parallel_for/control_flow_ops_test.py b/tensorflow/python/ops/parallel_for/control_flow_ops_test.py new file mode 100644 index 0000000000..1134a6c4a6 --- /dev/null +++ b/tensorflow/python/ops/parallel_for/control_flow_ops_test.py @@ -0,0 +1,1351 @@ +# Copyright 2018 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Tests for pfor and for_loop.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import time + +from absl import flags +import numpy as np + +from tensorflow.python.client import session +from tensorflow.python.framework import constant_op +from tensorflow.python.framework import dtypes +from tensorflow.python.framework import ops +from tensorflow.python.framework import sparse_tensor +from tensorflow.python.ops import array_ops +from tensorflow.python.ops import control_flow_ops +from tensorflow.python.ops import data_flow_ops +from tensorflow.python.ops import gradients as gradient_ops +from tensorflow.python.ops import logging_ops +from tensorflow.python.ops import math_ops +from tensorflow.python.ops import nn +from tensorflow.python.ops import random_ops +from tensorflow.python.ops import rnn +from tensorflow.python.ops import rnn_cell +from tensorflow.python.ops import tensor_array_grad # pylint: disable=unused-import +from tensorflow.python.ops import tensor_array_ops +from tensorflow.python.ops import variables +from tensorflow.python.ops.parallel_for import control_flow_ops as pfor_control_flow_ops +from tensorflow.python.platform import test +from tensorflow.python.util import nest + + +class PForTest(test.TestCase): + + def _run_targets(self, targets1, targets2=None, run_init=True): + targets1 = nest.flatten(targets1) + targets2 = ([] if targets2 is None else nest.flatten(targets2)) + assert len(targets1) == len(targets2) or not targets2 + if run_init: + init = variables.global_variables_initializer() + self.evaluate(init) + return self.evaluate(targets1 + targets2) + + def run_and_assert_equal(self, targets1, targets2): + outputs = self._run_targets(targets1, targets2) + outputs = nest.flatten(outputs) # flatten SparseTensorValues + n = len(outputs) // 2 + for i in range(n): + if outputs[i + n].dtype != np.object: + self.assertAllClose(outputs[i + n], outputs[i], rtol=1e-4, atol=1e-5) + else: + self.assertAllEqual(outputs[i + n], outputs[i]) + + def _test_loop_fn(self, loop_fn, iters, loop_fn_dtypes=dtypes.float32): + t1 = pfor_control_flow_ops.pfor(loop_fn, iters=iters) + t2 = pfor_control_flow_ops.for_loop(loop_fn, loop_fn_dtypes, iters=iters) + self.run_and_assert_equal(t1, t2) + + def test_op_conversion_fallback_to_while_loop(self): + # Note that we used top_k op for this test. If a converter gets defined for + # it, we will need to find another op for which a converter has not been + # defined. + x = random_ops.random_uniform([3, 2, 4]) + + def loop_fn(i): + x_i = array_ops.gather(x, i) + return nn.top_k(x_i) + + with self.assertRaisesRegexp(ValueError, "No converter defined"): + self._test_loop_fn( + loop_fn, 3, loop_fn_dtypes=[dtypes.float32, dtypes.int32]) + flags.FLAGS.op_conversion_fallback_to_while_loop = True + self._test_loop_fn( + loop_fn, 3, loop_fn_dtypes=[dtypes.float32, dtypes.int32]) + flags.FLAGS.op_conversion_fallback_to_while_loop = False + + +class ArrayTest(PForTest): + + def test_gather(self): + x = random_ops.random_uniform([3, 3, 3]) + + def loop_fn(i): + outputs = [] + x_i = array_ops.gather(x, i) + for y in [x, x_i]: + axes = [0, 2, -1] if y == x else [0] + for axis in axes: + outputs.append(array_ops.gather(y, 2, axis=axis)) + outputs.append(array_ops.gather(y, i, axis=axis)) + outputs.append(array_ops.gather(y, [i], axis=axis)) + outputs.append(array_ops.gather(y, [i, 2], axis=axis)) + outputs.append(array_ops.gather(y, [[2, i], [i, 1]], axis=axis)) + return outputs + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 20) + + def test_shape(self): + x = random_ops.random_uniform([3, 2, 3]) + + def loop_fn(i): + x_i = array_ops.gather(x, i) + return array_ops.shape(x_i), array_ops.shape(x_i, out_type=dtypes.int64) + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32, dtypes.int64]) + + def test_size(self): + x = random_ops.random_uniform([3, 2, 3]) + + def loop_fn(i): + x_i = array_ops.gather(x, i) + return array_ops.size(x_i), array_ops.size(x_i, out_type=dtypes.int64) + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32, dtypes.int64]) + + def test_rank(self): + x = random_ops.random_uniform([3, 2, 3]) + + def loop_fn(i): + x_i = array_ops.gather(x, i) + return array_ops.rank(x_i) + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32]) + + def test_shape_n(self): + x = random_ops.random_uniform([3, 2, 3]) + y = random_ops.random_uniform([3]) + + def loop_fn(i): + x_i = array_ops.gather(x, i) + y_i = array_ops.gather(y, i) + return array_ops.shape_n([x_i, x, y, y_i]), array_ops.shape_n( + [x_i, x, y, y_i], out_type=dtypes.int64) + + self._test_loop_fn( + loop_fn, 3, loop_fn_dtypes=[dtypes.int32] * 4 + [dtypes.int64] * 4) + + def test_reshape(self): + x = random_ops.random_uniform([3, 2, 3]) + + def loop_fn(i): + x1 = array_ops.gather(x, i) + return array_ops.reshape(x1, [-1]), array_ops.reshape(x1, [1, 3, 1, -1]) + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 2) + + def test_expand_dims(self): + x = random_ops.random_uniform([3, 2, 3]) + + def loop_fn(i): + x1 = array_ops.gather(x, i) + return array_ops.expand_dims( + x1, axis=-1), array_ops.expand_dims( + x1, axis=1) + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 2) + + def test_slice(self): + x = random_ops.random_uniform([3, 2, 3]) + + def loop_fn(i): + x1 = array_ops.gather(x, i) + return array_ops.slice(x1, begin=(0, 1), size=(2, 1)) + + self._test_loop_fn(loop_fn, 3) + + def test_tile(self): + x = random_ops.random_uniform([3, 2, 3]) + + def loop_fn(i): + x1 = array_ops.gather(x, i) + return array_ops.tile(x1, [2, 1]) + + self._test_loop_fn(loop_fn, 3) + + def test_tile_loop_dependent(self): + x = random_ops.random_uniform([3, 2, 3]) + + def loop_fn(i): + x1 = array_ops.gather(x, i) + return array_ops.tile(x1, [i, 1]) + + with self.assertRaisesRegexp(ValueError, "expected to be loop invariant"): + pfor_control_flow_ops.pfor(loop_fn, 2) + + def test_pack(self): + x = random_ops.random_uniform([3, 2, 3]) + y = random_ops.random_uniform([2, 3]) + + def loop_fn(i): + x1 = array_ops.gather(x, i) + return array_ops.stack([x1, y], axis=-1) + + self._test_loop_fn(loop_fn, 1) + + def test_unpack(self): + x = random_ops.random_uniform([3, 2, 3, 4]) + + def loop_fn(i): + x_i = array_ops.gather(x, i) + return array_ops.unstack( + x_i, 4, axis=-1), array_ops.unstack( + x_i, 3, axis=1) + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 7) + + def test_pad(self): + x = random_ops.random_uniform([3, 2, 3]) + padding = constant_op.constant([[1, 2], [3, 4]]) + + def loop_fn(i): + x1 = array_ops.gather(x, i) + return array_ops.pad(x1, padding, mode="CONSTANT") + + self._test_loop_fn(loop_fn, 3) + + def test_split(self): + x = random_ops.random_uniform([3, 2, 3]) + + def loop_fn(i): + x1 = array_ops.gather(x, i) + return array_ops.split(x1, 2, axis=0), array_ops.split(x1, 3, axis=-1) + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 5) + + def test_transpose(self): + x = random_ops.random_uniform([3, 2, 3, 4]) + + def loop_fn(i): + x1 = array_ops.gather(x, i) + return array_ops.transpose(x1, [2, 1, 0]) + + self._test_loop_fn(loop_fn, 3) + + def test_zeros_like(self): + x = random_ops.random_uniform([3, 2, 3]) + + def loop_fn(i): + x1 = array_ops.gather(x, i) + z = array_ops.zeros_like(x1), + return z, z + x1 + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 2) + + def test_concat_v2(self): + x = random_ops.random_uniform([3, 2, 3]) + y = random_ops.random_uniform([2, 3]) + + def loop_fn(i): + x1 = array_ops.gather(x, i) + return array_ops.concat( + [x1, x1, y], axis=0), array_ops.concat( + [x1, x1, y], axis=-1) + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 2) + + def test_unary_cwise_ops(self): + for op in [array_ops.identity, array_ops.stop_gradient]: + x = random_ops.random_uniform([3, 5]) + + # pylint: disable=cell-var-from-loop + def loop_fn(i): + x1 = array_ops.gather(x, i) + y = op(x1) + x1 + loss = nn.l2_loss(y) + return op(x), y, gradient_ops.gradients(loss, x1) + + # pylint: enable=cell-var-from-loop + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 3) + + def test_strided_slice(self): + x = random_ops.random_uniform([3, 3, 4, 4, 2, 2, 2]) + + def loop_fn(i): + x_i = array_ops.gather(x, i) + y = x_i[:2, ::2, 1::3, ..., array_ops.newaxis, 1] + loss = nn.l2_loss(y) + return y, gradient_ops.gradients(loss, x_i) + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 2) + + +class MathTest(PForTest): + + def test_unary_cwise_ops(self): + for op in [ + math_ops.tanh, nn.relu, math_ops.sigmoid, math_ops.negative, + math_ops.square + ]: + x = random_ops.random_uniform([3, 5]) + + # pylint: disable=cell-var-from-loop + def loop_fn(i): + x1 = array_ops.gather(x, i) + y = op(x1) + loss = math_ops.reduce_sum(y * y) + return op(x), y, gradient_ops.gradients(loss, x1) + + # pylint: enable=cell-var-from-loop + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 3) + + def test_unary_cwise_no_grad(self): + for op in [math_ops.ceil, math_ops.floor, math_ops.logical_not]: + x = random_ops.random_uniform([3, 5]) + if op == math_ops.logical_not: + x = x > 0 + + # pylint: disable=cell-var-from-loop + def loop_fn(i): + return op(array_ops.gather(x, i)) + + # pylint: enable=cell-var-from-loop + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=x.dtype) + + def test_binary_cwise_ops(self): + logical_ops = [ + math_ops.logical_and, math_ops.logical_or, math_ops.logical_xor + ] + bool_ops = [ + math_ops.less, math_ops.less_equal, math_ops.greater, + math_ops.greater_equal, math_ops.equal, math_ops.not_equal + ] + float_ops = [ + math_ops.add, math_ops.subtract, math_ops.multiply, math_ops.divide, + math_ops.maximum, math_ops.minimum + ] + for op in logical_ops + bool_ops + float_ops: + x = random_ops.random_uniform([7, 3, 5]) + y = random_ops.random_uniform([3, 5]) + if op in logical_ops: + x = x > 0 + y = y > 0 + + # pylint: disable=cell-var-from-loop + def loop_fn(i): + x1 = array_ops.gather(x, i) + y1 = array_ops.gather(y, i) + return op(x, y), op(x1, y), op(x, y1), op(x1, y1), op(x1, x1) + + # pylint: enable=cell-var-from-loop + + dtype = dtypes.float32 if op in float_ops else dtypes.bool + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtype] * 5) + + def test_addn(self): + x = random_ops.random_uniform([2, 3, 5]) + y = random_ops.random_uniform([3, 5]) + z = random_ops.random_uniform([3, 5]) + + def loop_fn(i): + x1 = array_ops.gather(x, i) + return math_ops.add_n([x1, y, z]) + + self._test_loop_fn(loop_fn, 2) + + def test_matmul(self): + for tr_a in (True, False): + for tr_b in (True, False): + for stack_a in (True, False): + for stack_b in (True, False): + shape_a = (5, 3) if tr_a else (3, 5) + if stack_a: + shape_a = (2,) + shape_a + shape_b = (7, 5) if tr_b else (5, 7) + if stack_b: + shape_b = (2,) + shape_b + + x = random_ops.random_uniform(shape_a) + y = random_ops.random_uniform(shape_b) + + # pylint: disable=cell-var-from-loop + def loop_fn(i): + a = array_ops.gather(x, i) if stack_a else x + b = array_ops.gather(y, i) if stack_b else y + return math_ops.matmul(a, b, transpose_a=tr_a, transpose_b=tr_b) + + # pylint: enable=cell-var-from-loop + + self._test_loop_fn(loop_fn, 2) + + def test_batch_matmul(self): + for tr_a in (True, False): + for tr_b in (True, False): + for stack_a in (True, False): + for stack_b in (True, False): + shape_a = (4, 5, 3) if tr_a else (4, 3, 5) + if stack_a: + shape_a = (2,) + shape_a + shape_b = (4, 7, 5) if tr_b else (4, 5, 7) + if stack_b: + shape_b = (2,) + shape_b + + x = random_ops.random_uniform(shape_a) + y = random_ops.random_uniform(shape_b) + + # pylint: disable=cell-var-from-loop + def loop_fn(i): + a = array_ops.gather(x, i) if stack_a else x + b = array_ops.gather(y, i) if stack_b else y + return math_ops.matmul(a, b, transpose_a=tr_a, transpose_b=tr_b) + + # pylint: enable=cell-var-from-loop + + self._test_loop_fn(loop_fn, 2) + + def test_reduction(self): + x = random_ops.random_uniform([2, 3, 4, 5]) + for op in [ + math_ops.reduce_sum, math_ops.reduce_prod, math_ops.reduce_max, + math_ops.reduce_min + ]: + for axis in ([1], None, [0, 2]): + for keepdims in (True, False): + + # pylint: disable=cell-var-from-loop + def loop_fn(i): + a = array_ops.gather(x, i) + return op(a, axis=axis, keepdims=keepdims) + + # pylint: enable=cell-var-from-loop + + self._test_loop_fn(loop_fn, 2) + + def test_cum_sum(self): + x = random_ops.random_uniform([2, 3, 4, 5]) + for axis in (1, -2): + for exclusive in (True, False): + for reverse in (True, False): + + # pylint: disable=cell-var-from-loop + def loop_fn(i): + a = array_ops.gather(x, i) + return math_ops.cumsum( + a, axis=axis, exclusive=exclusive, reverse=reverse) + + # pylint: enable=cell-var-from-loop + + self._test_loop_fn(loop_fn, 2) + + def test_cum_prod(self): + x = random_ops.random_uniform([2, 3, 4, 5]) + for axis in (1, -2): + for exclusive in (True, False): + for reverse in (True, False): + + # pylint: disable=cell-var-from-loop + def loop_fn(i): + a = array_ops.gather(x, i) + return math_ops.cumprod( + a, axis=axis, exclusive=exclusive, reverse=reverse) + + # pylint: enable=cell-var-from-loop + + self._test_loop_fn(loop_fn, 2) + + def test_bias_add(self): + x_shape = [2, 3, 4, 5, 6] + x = random_ops.random_uniform(x_shape) + for data_format in ("NCHW", "NHWC"): + bias_dim = 2 if data_format == "NCHW" else -1 + bias_shape = x_shape[bias_dim] + bias = random_ops.random_uniform([bias_shape]) + + # pylint: disable=cell-var-from-loop + def loop_fn(i): + a = array_ops.gather(x, i) + y = nn.bias_add(a, bias, data_format=data_format) + loss = math_ops.reduce_sum(y * y) + return y, gradient_ops.gradients(loss, bias) + + # pylint: enable=cell-var-from-loop + + self._test_loop_fn( + loop_fn, 2, loop_fn_dtypes=[dtypes.float32, dtypes.float32]) + + def test_unsorted_segment_sum(self): + t = random_ops.random_uniform([3, 3, 2]) + segment_ids = constant_op.constant([[0, 0, 2], [0, 1, 2], [2, 2, 2]]) + num_segments = 3 + + def loop_fn(i): + data = array_ops.gather(t, i) + data_0 = array_ops.gather(t, 0) + seg_ids = array_ops.gather(segment_ids, i) + return (math_ops.unsorted_segment_sum(data, seg_ids, num_segments), + math_ops.unsorted_segment_sum(data_0, seg_ids, num_segments)) + + self._test_loop_fn(loop_fn, 3, [dtypes.float32] * 2) + + def test_cast(self): + x = constant_op.constant([[1], [2]]) + y = constant_op.constant([[1.0], [2.0]]) + + def loop_fn(i): + return (math_ops.cast(array_ops.gather(x, i), dtypes.float32), + math_ops.cast(array_ops.gather(y, i), dtypes.int32)) + + self._test_loop_fn( + loop_fn, 2, loop_fn_dtypes=[dtypes.float32, dtypes.int32]) + + def test_tanh_axpy(self): + a = constant_op.constant(3.) + x = random_ops.random_uniform([4, 5]) + y = random_ops.random_uniform([6, 5]) + n = x.shape[0] + + def loop_fn(i): + return math_ops.tanh(a * array_ops.gather(x, i) + array_ops.gather(y, i)) + + self._test_loop_fn(loop_fn, n) + + def test_select(self): + cond = constant_op.constant([True, False]) + a = random_ops.random_uniform([2, 3, 5]) + b = random_ops.random_uniform([2, 3, 5]) + for cond_shape in [2], [2, 3], [2, 3, 5]: + cond = random_ops.random_uniform(cond_shape) > 0.5 + + # pylint: disable=cell-var-from-loop + def loop_fn(i): + a_i = array_ops.gather(a, i) + b_i = array_ops.gather(b, i) + cond_i = array_ops.gather(cond, i) + return array_ops.where(cond_i, a_i, b_i) + + # pylint: enable=cell-var-from-loop + + self._test_loop_fn(loop_fn, 2) + + +class NNTest(PForTest): + + def test_conv2d(self): + x = random_ops.random_uniform([3, 2, 12, 12, 3]) + filt = random_ops.random_uniform([3, 3, 3, 7]) + + def loop_fn(i): + x1 = array_ops.gather(x, i) + return nn.conv2d( + x1, filt, strides=[1, 2, 2, 1], padding="VALID", data_format="NHWC") + + self._test_loop_fn(loop_fn, 3) + + def test_conv2d_backprop_input(self): + x_shape = [2, 12, 12, 3] + filt = random_ops.random_uniform([3, 3, 3, 7]) + grad = random_ops.random_uniform([3, 2, 5, 5, 7]) + + def loop_fn(i): + grad1 = array_ops.gather(grad, i) + return nn.conv2d_backprop_input( + x_shape, + filt, + grad1, + strides=[1, 2, 2, 1], + padding="VALID", + data_format="NHWC") + + self._test_loop_fn(loop_fn, 3) + + def test_conv2d_backprop_filter(self): + x = random_ops.random_uniform([3, 2, 12, 12, 3]) + x_0 = array_ops.gather(x, 0) + filter_sizes = [3, 3, 3, 7] + grad = random_ops.random_uniform([3, 2, 5, 5, 7]) + + def loop_fn(i): + x_i = array_ops.gather(x, i) + grad_i = array_ops.gather(grad, i) + return [ + nn.conv2d_backprop_filter( + inp, + filter_sizes, + grad_i, + strides=[1, 2, 2, 1], + padding="VALID", + data_format="NHWC") for inp in [x_i, x_0] + ] + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 2) + + def test_avg_pool(self): + x = random_ops.random_uniform([3, 2, 12, 12, 3]) + ksize = [1, 3, 3, 1] + + def loop_fn(i): + x1 = array_ops.gather(x, i) + output = nn.avg_pool( + x1, ksize, strides=[1, 2, 2, 1], padding="VALID", data_format="NHWC") + loss = nn.l2_loss(output) + return output, gradient_ops.gradients(loss, x1) + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 2) + + def test_max_pool(self): + x = random_ops.random_uniform([3, 2, 12, 12, 3]) + ksize = [1, 3, 3, 1] + + def loop_fn(i): + x1 = array_ops.gather(x, i) + output = nn.max_pool( + x1, ksize, strides=[1, 2, 2, 1], padding="VALID", data_format="NHWC") + loss = nn.l2_loss(output) + return output, gradient_ops.gradients(loss, x1) + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 2) + + def test_fused_batch_norm(self): + data_formats = ["NHWC"] + if test.is_gpu_available(): + data_formats.append("NCHW") + for is_training in (True, False): + for data_format in data_formats: + if data_format == "NCHW": + x = random_ops.random_uniform([3, 1, 2, 5, 5]) + else: + x = random_ops.random_uniform([3, 1, 5, 5, 2]) + scale = random_ops.random_uniform([2]) + offset = random_ops.random_uniform([2]) + mean = None if is_training else random_ops.random_uniform([2]) + variance = None if is_training else random_ops.random_uniform([2]) + + # pylint: disable=cell-var-from-loop + def loop_fn(i): + x1 = array_ops.gather(x, i) + outputs = nn.fused_batch_norm( + x1, + scale, + offset, + mean=mean, + variance=variance, + epsilon=0.01, + data_format=data_format, + is_training=is_training) + outputs = list(outputs) + # We only test the first value of outputs when is_training is False. + # It looks like CPU and GPU have different outputs for batch_mean and + # batch_variance for this case. + if not is_training: + outputs[1] = constant_op.constant(0.) + outputs[2] = constant_op.constant(0.) + loss = nn.l2_loss(outputs[0]) + gradients = gradient_ops.gradients(loss, [x1, scale, offset]) + return outputs + gradients + + # pylint: enable=cell-var-from-loop + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 6) + + def test_softmax_cross_entropy_with_logits(self): + logits = random_ops.random_uniform([3, 2, 4]) + labels = random_ops.random_uniform([3, 2, 4]) + labels /= math_ops.reduce_sum(labels, axis=[2], keepdims=True) + + def loop_fn(i): + logits_i = array_ops.gather(logits, i) + labels_i = array_ops.gather(labels, i) + loss = nn.softmax_cross_entropy_with_logits( + labels=labels_i, logits=logits_i) + return loss, gradient_ops.gradients(math_ops.reduce_sum(loss), logits_i) + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 2) + + +class RandomTest(PForTest): + + # The random values generated in the two implementations are not guaranteed to + # match. So we only check the returned shapes. + def run_and_assert_equal(self, targets1, targets2): + outputs = self._run_targets(targets1, targets2) + n = len(outputs) // 2 + for i in range(n): + self.assertAllEqual(outputs[i].shape, outputs[i + n].shape) + + def test_random_uniform(self): + + def loop_fn(_): + return random_ops.random_uniform([3]) + + self._test_loop_fn(loop_fn, 5) + + def test_random_uniform_int(self): + + def loop_fn(_): + return random_ops.random_uniform([3], maxval=1, dtype=dtypes.int32) + + self._test_loop_fn(loop_fn, 5, loop_fn_dtypes=dtypes.int32) + + def test_random_standard_normal(self): + + def loop_fn(_): + return random_ops.random_normal([3]) + + self._test_loop_fn(loop_fn, 5) + + def test_truncated_normal(self): + + def loop_fn(_): + return random_ops.truncated_normal([3]) + + self._test_loop_fn(loop_fn, 5) + + def test_random_gamma(self): + + def loop_fn(_): + return random_ops.random_gamma([3], alpha=[0.5]) + + self._test_loop_fn(loop_fn, 5) + + def test_random_poisson_v2(self): + + def loop_fn(_): + return random_ops.random_poisson(lam=[1.3], shape=[3]) + + self._test_loop_fn(loop_fn, 5) + + +class LoggingTest(PForTest): + + def test_print(self): + x = random_ops.random_uniform([3, 5]) + + def loop_fn(i): + x1 = array_ops.gather(x, i) + return logging_ops.Print( + x1, [x1, "x1", array_ops.shape(x1)], summarize=10) + + self._test_loop_fn(loop_fn, 3) + + def test_assert(self): + + def loop_fn(i): + return control_flow_ops.Assert(i < 10, [i, [10], [i + 1]]) + + # TODO(agarwal): make this work with for_loop. + with session.Session() as sess: + sess.run(pfor_control_flow_ops.pfor(loop_fn, 3)) + + +class TensorArrayTest(PForTest): + + def test_create_outside_and_read(self): + + ta = tensor_array_ops.TensorArray( + dtypes.int32, 2, clear_after_read=False).write(0, 0).write(1, 1) + + def loop_fn(i): + return ta.read(i), ta.read(0) + + self._test_loop_fn(loop_fn, 2, [dtypes.int32] * 2) + + def test_create_outside_and_gather(self): + + ta = tensor_array_ops.TensorArray( + dtypes.int32, 2, clear_after_read=False).write(0, 0).write(1, 1) + + def loop_fn(i): + return ta.gather([i]), ta.gather([0, 1]) + + self._test_loop_fn(loop_fn, 2, [dtypes.int32] * 2) + + def test_create_outside_and_write_and_scatter(self): + + t = tensor_array_ops.TensorArray(dtypes.int32, 10, clear_after_read=False) + handle = t.handle + + def loop_fn(i): + ta = t.write(i + 2, 2 * i).write(i, 5) + ta = ta.scatter([4 + i], [4]).scatter([6 + i, 8 + i], [6 + i, 8 + i]) + return ta.flow + + t1 = pfor_control_flow_ops.pfor(loop_fn, iters=2) + out1 = tensor_array_ops.TensorArray( + dtypes.int32, handle=handle, flow=t1[-1]).stack() + output1 = self._run_targets(out1) + + t2 = pfor_control_flow_ops.for_loop(loop_fn, dtypes.float32, iters=2) + out2 = tensor_array_ops.TensorArray( + dtypes.int32, handle=handle, flow=t2[-1]).stack() + output2 = self._run_targets(out2) + self.assertAllClose(output2, output1) + + def test_create_inside_and_write(self): + + def loop_fn(i): + # TODO(agarwal): switching the order of writes to ta1 does not work. + ta1 = tensor_array_ops.TensorArray(dtypes.int32, 2).write(0, i).write( + 1, 1) + ta2 = tensor_array_ops.TensorArray(dtypes.int32, 1).write(0, 1) + return ta1.stack(), ta2.stack() + + self._test_loop_fn(loop_fn, 3, [dtypes.int32] * 2) + + def test_create_inside_and_scatter(self): + + def loop_fn(i): + # TODO(agarwal): switching the order of scatter to ta1 does not work. + ta1 = tensor_array_ops.TensorArray(dtypes.int32, 2).scatter( + [0], [[i, 2]]).scatter([1], [[1, 2]]) + ta2 = tensor_array_ops.TensorArray(dtypes.int32, + 2).scatter([0], [3]).scatter([1], [4]) + return ta1.stack(), ta2.stack() + + self._test_loop_fn(loop_fn, 3, [dtypes.int32] * 2) + + def test_create_inside_and_read(self): + + def loop_fn(i): + ta1 = tensor_array_ops.TensorArray( + dtypes.int32, 2, clear_after_read=False).write(0, i).write(1, 1) + ta2 = tensor_array_ops.TensorArray( + dtypes.int32, 2, clear_after_read=False).write(0, 1).write(1, 2) + # TODO(agarwal): ta1.read(i) currently is not supported. + return ta1.read(0), ta2.read(0), ta2.read(i) + + self._test_loop_fn(loop_fn, 2, [dtypes.int32] * 3) + + def test_create_inside_and_gather(self): + + def loop_fn(i): + ta1 = tensor_array_ops.TensorArray( + dtypes.int32, 2, clear_after_read=False).write(0, i).write(1, 1) + ta2 = tensor_array_ops.TensorArray( + dtypes.int32, 2, clear_after_read=False).write(0, 1).write(1, 2) + # TODO(agarwal): ta1.read(i) currently is not supported. + return ta1.gather([0, 1]), ta2.gather([0, 1]), ta2.gather([i]) + + self._test_loop_fn(loop_fn, 2, [dtypes.int32] * 3) + + def test_grad(self): + x = random_ops.random_uniform([3, 2]) + ta = tensor_array_ops.TensorArray( + dtypes.float32, 3, clear_after_read=False).unstack(x) + y = math_ops.square(ta.stack()) + + def loop_fn(i): + y_i = array_ops.gather(y, i) + grad = gradient_ops.gradients(y_i, x)[0] + return array_ops.gather(grad, i) + + t1 = pfor_control_flow_ops.pfor(loop_fn, iters=3) + # y = x * x. Hence dy/dx = 2 * x. + actual_grad = 2.0 * x + with session.Session() as sess: + actual_grad, computed_grad = sess.run([t1, actual_grad]) + self.assertAllClose(actual_grad, computed_grad) + + +class StackTest(PForTest): + + def test_stack_inside_loop_invariant(self): + + def loop_fn(_): + s = data_flow_ops.stack_v2(max_size=4, elem_type=dtypes.int32) + op1 = data_flow_ops.stack_push_v2(s, 1) + with ops.control_dependencies([op1]): + op2 = data_flow_ops.stack_push_v2(s, 2) + with ops.control_dependencies([op2]): + e2 = data_flow_ops.stack_pop_v2(s, elem_type=dtypes.int32) + with ops.control_dependencies([e2]): + e1 = data_flow_ops.stack_pop_v2(s, elem_type=dtypes.int32) + return e1, e2 + + self._test_loop_fn(loop_fn, 2, [dtypes.int32] * 2) + + def test_stack_inside_push_loop_dependent(self): + + def loop_fn(i): + s = data_flow_ops.stack_v2(max_size=4, elem_type=dtypes.int32) + op1 = data_flow_ops.stack_push_v2(s, i) + with ops.control_dependencies([op1]): + op2 = data_flow_ops.stack_push_v2(s, 2) + with ops.control_dependencies([op2]): + e2 = data_flow_ops.stack_pop_v2(s, elem_type=dtypes.int32) + with ops.control_dependencies([e2]): + e1 = data_flow_ops.stack_pop_v2(s, elem_type=dtypes.int32) + return e1, e2 + + self._test_loop_fn(loop_fn, 2, [dtypes.int32] * 2) + + def test_stack_outside_pop(self): + s = data_flow_ops.stack_v2(max_size=4, elem_type=dtypes.int32) + op = data_flow_ops.stack_push_v2(s, 5) + with ops.control_dependencies([op]): + op = data_flow_ops.stack_push_v2(s, 6) + with ops.control_dependencies([op]): + op = data_flow_ops.stack_push_v2(s, 7) + + def loop_fn(_): + e1 = data_flow_ops.stack_pop_v2(s, elem_type=dtypes.int32) + with ops.control_dependencies([e1]): + e2 = data_flow_ops.stack_pop_v2(s, elem_type=dtypes.int32) + return e1, e2 + + with ops.control_dependencies([op]): + e1, e2 = pfor_control_flow_ops.pfor(loop_fn, iters=2) + with ops.control_dependencies([e1, e2]): + e3 = data_flow_ops.stack_pop_v2(s, elem_type=dtypes.int32) + v1, v2, v3 = self._run_targets([e1, e2, e3], run_init=False) + self.assertAllEqual([7, 7], v1) + self.assertAllEqual([6, 6], v2) + self.assertAllEqual(5, v3) + + def test_stack_outside_push(self): + s = data_flow_ops.stack_v2(max_size=4, elem_type=dtypes.int32) + + def loop_fn(_): + return data_flow_ops.stack_push_v2(s, 7) + + with self.assertRaisesRegexp(ValueError, "StackPushV2 not allowed.*"): + pfor_control_flow_ops.pfor(loop_fn, iters=2) + + +# TODO(agarwal): test nested while_loops. This currently requires converting a +# tf.cond. +class ControlFlowTest(PForTest): + + def test_while_outside_loop(self): + + x = control_flow_ops.while_loop(lambda j: j < 4, lambda j: j + 1, [0]) + + def loop_fn(i): + return x + i + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32]) + + def test_invariant_while(self): + + def loop_fn(_): + return control_flow_ops.while_loop(lambda j: j < 4, lambda j: j + 1, [0]) + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32]) + + def test_invariant_while_with_control_dependency(self): + + def loop_fn(i): + with ops.control_dependencies([i]): + return control_flow_ops.while_loop(lambda j: j < 4, lambda j: j + 1, + [0]) + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32]) + + def test_while_with_stateful_ops(self): + + def loop_fn(_): + return control_flow_ops.while_loop( + lambda j, x: j < 4, + lambda j, x: (j + 1, x + random_ops.random_uniform([])), [0, 0.])[0] + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32]) + + def test_while_unstacked_condition(self): + + def loop_fn(i): + return control_flow_ops.while_loop(lambda j, x: j < 4, + lambda j, x: (j + 1, x + i), [0, 0]) + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32, dtypes.int32]) + + def test_while(self): + x = random_ops.random_uniform([3, 5]) + lengths = constant_op.constant([4, 0, 2]) + + def loop_fn(i): + x_i = array_ops.gather(x, i) + lengths_i = array_ops.gather(lengths, i) + + _, total = control_flow_ops.while_loop( + lambda j, _: j < lengths_i, + lambda j, t: (j + 1, t + array_ops.gather(x_i, j)), [0, 0.]) + return total + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32]) + + def test_while_jacobian(self): + x = random_ops.random_uniform([1, 3]) + y = random_ops.random_uniform([3, 3]) + + # out = x @ y @ y @ y @ y, where @ is matmul operator. + _, out = control_flow_ops.while_loop( + lambda i, _: i < 4, lambda i, out: (i + 1, math_ops.matmul(out, y)), + [0, x]) + + def loop_fn(i): + out_i = array_ops.gather(out, i, axis=1) + return array_ops.reshape(gradient_ops.gradients(out_i, x)[0], [-1]) + + out = pfor_control_flow_ops.pfor(loop_fn, iters=3) + + # The above code does not work with tf.while_loop instead of pfor. So we + # manually compute the expected output here. + # Note that gradient of output w.r.t is (y @ y @ y @ y)^T. + expected_output = y + for _ in range(3): + expected_output = math_ops.matmul(expected_output, y) + expected_output = array_ops.transpose(expected_output, [1, 0]) + + with session.Session() as sess: + out, expected = sess.run([out, expected_output]) + self.assertAllClose(expected, out) + + def test_tensor_array_as_loop_variable(self): + + def loop_fn(i): + + def body(j, ta): + ta = ta.write(j, i + j * j) + return j + 1, ta + + _, ta = control_flow_ops.while_loop( + lambda j, _: j < 4, body, + (0, tensor_array_ops.TensorArray(dtypes.int32, size=4))) + return ta.stack() + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32]) + + def test_read_tensor_array_partitioned_indices(self): + # Note that tensor array values are pfor loop dependent, and the while loop + # termination condition is also dependent on pfor iteration. + def loop_fn(i): + ta = tensor_array_ops.TensorArray(dtypes.int32, size=6) + ta = ta.unstack(i + list(range(5))) + + def body(j, s): + return j + 1, s + ta.read(j) + + _, s = control_flow_ops.while_loop(lambda j, _: j < i, + body, + (0, 0)) + return s + + self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32]) + + def test_external_while_loop_grad(self): + # Here we test that external while_loops that are extended from inside pfor + # (due to gradient calls) are not actually converted. If the below was + # converted all pfor iterations would write to the same tensor array + # indices. + x = constant_op.constant(1.) + + def body(j, ta): + ta = ta.write(j, x) + return j + 1, ta + + _, ta = control_flow_ops.while_loop( + lambda j, _: j < 4, body, + (0, tensor_array_ops.TensorArray(dtypes.float32, size=4))) + out = ta.stack() + + def loop_fn(i): + out_i = array_ops.gather(out, i) + return gradient_ops.gradients(out_i, x)[0] + + with session.Session() as sess: + # out is [x, x, x]. Hence the gradients should be [1, 1, 1]. + self.assertAllEqual([1, 1, 1], + sess.run(pfor_control_flow_ops.pfor(loop_fn, 3))) + + def test_tensor_array_grad(self): + inp = constant_op.constant(np.random.rand(3, 4, 2), dtype=dtypes.float32) + ta = tensor_array_ops.TensorArray(dtypes.float32, size=3) + ta = ta.unstack(inp) + + def loop_fn(i): + + def body(j, x): + value = ta.gather([j]) + value = array_ops.gather(array_ops.reshape(value, [4, 2]), i) + return j + 1, x + value + + _, out = control_flow_ops.while_loop(lambda j, _: j < 3, body, + (0, array_ops.zeros([2]))) + out = math_ops.reduce_prod(out) + return out, gradient_ops.gradients(out, inp)[0] + + pfor_out, pfor_out_grad = pfor_control_flow_ops.pfor(loop_fn, 4) + # Note that tf.while_loop does not work in the setup above. So we manually + # construct the equivalent computation of the above loops here. + real_out = math_ops.reduce_sum(inp, reduction_indices=[0]) + real_out = math_ops.reduce_prod(real_out, reduction_indices=[1]) + # Note that gradients of real_out will accumulate the gradients across the + # output value. Hence we do the same aggregation on pfor_out_grad. + real_out_grad = gradient_ops.gradients(real_out, inp)[0] + sum_pfor_out_grad = math_ops.reduce_sum( + pfor_out_grad, reduction_indices=[0]) + + with session.Session() as sess: + v1, v2, v1_grad, v2_grad = sess.run( + [pfor_out, real_out, sum_pfor_out_grad, real_out_grad]) + self.assertAllClose(v1, v2) + self.assertAllClose(v1_grad, v2_grad) + + +def dynamic_lstm_input_fn(batch_size, state_size, max_steps): + # We make inputs and sequence_length constant so that multiple session.run + # calls produce the same result. + inputs = constant_op.constant( + np.random.rand(batch_size, max_steps, state_size), dtype=dtypes.float32) + sequence_length = np.random.randint(0, size=[batch_size], high=max_steps + 1) + sequence_length = constant_op.constant(sequence_length, dtype=dtypes.int32) + return inputs, sequence_length + + +def create_dynamic_lstm(cell_fn, batch_size, state_size, max_steps): + cell = cell_fn(state_size) + inputs, sequence_length = dynamic_lstm_input_fn(batch_size, + state_size, + max_steps) + inputs_ta = tensor_array_ops.TensorArray( + dtypes.float32, size=max_steps, element_shape=[batch_size, state_size]) + inputs_time_major = array_ops.transpose(inputs, [1, 0, 2]) + inputs_ta = inputs_ta.unstack(inputs_time_major) + zeros = array_ops.zeros([state_size]) + + def loop_fn(i): + sequence_length_i = array_ops.gather(sequence_length, i) + + def body_fn(t, state, ta): + inputs_t = array_ops.expand_dims( + array_ops.gather(inputs_ta.read(t), i), 0) + output, new_state = cell(inputs_t, state) + output = array_ops.reshape(output, [-1]) + # TODO(agarwal): one optimization that dynamic_rnn uses is to avoid the + # array_ops.where when t < min(sequence_length). Doing that requires + # supporting tf.cond pfor conversion. + done = t >= sequence_length_i + output = array_ops.where(done, zeros, output) + ta = ta.write(t, output) + new_state = [array_ops.where(done, s, ns) for s, ns in + zip(nest.flatten(state), nest.flatten(new_state))] + new_state = nest.pack_sequence_as(state, new_state) + return t + 1, new_state, ta + + def condition_fn(t, _, unused): + del unused + return t < max_steps + + initial_state = cell.zero_state(1, dtypes.float32) + _, state, ta = control_flow_ops.while_loop(condition_fn, body_fn, [ + 0, initial_state, + tensor_array_ops.TensorArray(dtypes.float32, max_steps) + ]) + + new_state = [array_ops.reshape(x, [-1]) for x in nest.flatten(state)] + new_state = nest.pack_sequence_as(initial_state, new_state) + return ta.stack(), new_state + + pfor_output = pfor_control_flow_ops.pfor(loop_fn, batch_size) + tf_output = rnn.dynamic_rnn( + cell, + inputs, + sequence_length=sequence_length, + initial_state=cell.zero_state(batch_size, dtypes.float32)) + return pfor_output, tf_output + + +class RNNTest(PForTest): + + def test_dynamic_rnn(self): + pfor_outputs, tf_outputs = create_dynamic_lstm(rnn_cell.BasicRNNCell, + 3, 5, 7) + self.run_and_assert_equal(pfor_outputs, tf_outputs) + + def test_dynamic_lstm(self): + pfor_outputs, tf_outputs = create_dynamic_lstm(rnn_cell.BasicLSTMCell, + 3, 5, 7) + self.run_and_assert_equal(pfor_outputs, tf_outputs) + + +# TODO(agarwal): benchmark numbers on GPU for graphs based on while_loop +# conversion don't look good. Some of it seems like lot of copies between host +# and device. Optimize that. +class Benchmarks(test.Benchmark): + + def _run(self, targets, iters, name=None): + + def _done(t): + # Note that we don't use tf.control_dependencies since that will not make + # sure that the computation on GPU has actually finished. So we fetch the + # first element of the output, and assume that this will not be called on + # empty tensors. + return array_ops.gather(array_ops.reshape(t, [-1]), 0) + + targets = [_done(x) for x in nest.flatten(targets)] + sess = session.Session() + with sess: + init = variables.global_variables_initializer() + sess.run(init) + sess.run(targets) + begin = time.time() + for _ in range(iters): + sess.run(targets) + end = time.time() + avg_time_ms = 1000 * (end - begin) / iters + self.report_benchmark(iters=iters, wall_time=avg_time_ms, name=name) + return avg_time_ms + + def benchmark_basic_while(self): + with ops.Graph().as_default(): + + def loop_fn(i): + _, s = control_flow_ops.while_loop( + lambda t, x: t < i, + lambda t, x: (t + 1, x + i), + [0, 0]) + return s + + iters = 50 + pfor_output = pfor_control_flow_ops.pfor(loop_fn, iters) + for_loop_output = pfor_control_flow_ops.for_loop(loop_fn, dtypes.int32, + iters) + self._run(pfor_output, 100, name="pfor_basic") + self._run(for_loop_output, 100, name="for_loop_basic") + + def benchmark_dynamic_rnn(self): + with ops.Graph().as_default(): + pfor_outputs, tf_outputs = create_dynamic_lstm(rnn_cell.BasicRNNCell, + 128, 512, 16) + self._run(pfor_outputs, 100, name="pfor_rnn") + self._run(tf_outputs, 100, name="tf_rnn") + + def benchmark_dynamic_lstm(self): + with ops.Graph().as_default(): + pfor_outputs, tf_outputs = create_dynamic_lstm(rnn_cell.BasicLSTMCell, + 128, 512, 16) + self._run(pfor_outputs, 100, name="pfor_lstm") + self._run(tf_outputs, 100, name="tf_lstm") + + +class SparseTest(PForTest): + + def test_var_loop_len(self): + num_iters = array_ops.placeholder(dtypes.int32) + + def loop_fn(_): + return sparse_tensor.SparseTensor([[0], [1], [2]], [4, 5, 6], + [3]) # [0, 2, 0] + + pfor = pfor_control_flow_ops.pfor(loop_fn, num_iters) + with self.test_session() as sess: + sess.run(pfor, feed_dict={num_iters: 3}) + + def test_sparse_result_none_stacked(self): + num_iters = 10 + + def loop_fn(_): + return sparse_tensor.SparseTensor([[0], [1], [2]], [4, 5, 6], + [3]) # [0, 2, 0] + + pfor = pfor_control_flow_ops.pfor(loop_fn, num_iters) + + indices = [[i, j] for i in range(num_iters) for j in range(3)] + values = [4, 5, 6] * num_iters + dense_shapes = [num_iters, 3] + # Expected result: [[4, 5, 6], [4, 5, 6], [4, 5, 6], ...] + manual = sparse_tensor.SparseTensor(indices, values, dense_shapes) + self.run_and_assert_equal(pfor, manual) + + def test_sparse_result_all_stacked(self): + num_iters = 10 + + def loop_fn(i): + i = array_ops.expand_dims(math_ops.cast(i, dtypes.int64), 0) + indices = array_ops.expand_dims(i, 0) + return sparse_tensor.SparseTensor(indices, i, i + 1) # [0, ..., 0, i] + + # Expected result: [[0], [0, 1], [0, 0, 2], [0, 0, 0, 3], ...] + pfor = pfor_control_flow_ops.pfor(loop_fn, num_iters) + manual = sparse_tensor.SparseTensor([[i, i] for i in range(num_iters)], + list(range(num_iters)), + (num_iters, num_iters)) + self.run_and_assert_equal(pfor, manual) + + def test_sparse_result_indices_stacked(self): + num_iters = 10 + + def loop_fn(i): + i = array_ops.expand_dims(math_ops.cast(i, dtypes.int64), 0) + indices = array_ops.expand_dims(i, 0) + return sparse_tensor.SparseTensor(indices, [1], [num_iters]) + + # Expected result: identity matrix size num_iters * num_iters + pfor = pfor_control_flow_ops.pfor(loop_fn, num_iters) + manual = sparse_tensor.SparseTensor([[i, i] for i in range(num_iters)], + [1] * num_iters, (num_iters, num_iters)) + self.run_and_assert_equal(pfor, manual) + + def test_sparse_result_values_stacked(self): + num_iters = 10 + + def loop_fn(i): + i = array_ops.expand_dims(math_ops.cast(i, dtypes.int64), 0) + return sparse_tensor.SparseTensor([[0]], i, [num_iters]) # [i, 0, ..., 0] + + # Expected result: [[1, 0, ...], [2, 0, ...], [3, 0, ...], ...] + pfor = pfor_control_flow_ops.pfor(loop_fn, num_iters) + manual = sparse_tensor.SparseTensor([[i, 0] for i in range(num_iters)], + list(range(num_iters)), + (num_iters, num_iters)) + self.run_and_assert_equal(pfor, manual) + + def test_sparse_result_shapes_stacked(self): + num_iters = 10 + + def loop_fn(i): + i = array_ops.expand_dims(math_ops.cast(i, dtypes.int64), 0) + return sparse_tensor.SparseTensor([[0]], [1], i + 1) # [1, 0, ..., 0] + + # Expected result: [[1, 0, 0, ...], [1, 0, 0, ...], ...] + pfor = pfor_control_flow_ops.pfor(loop_fn, num_iters) + manual = sparse_tensor.SparseTensor([[i, 0] for i in range(num_iters)], + [1] * num_iters, (num_iters, num_iters)) + self.run_and_assert_equal(pfor, manual) + + def test_sparse_result_shapes_stacked_2D(self): + num_iters = 10 + + def loop_fn(i): + i = array_ops.expand_dims(math_ops.cast(i + 1, dtypes.int64), 0) + shape = array_ops.concat([i, i], 0) + return sparse_tensor.SparseTensor([[0, 0]], [1], shape) # [1, 0, ..., 0] + + # Expected result: [[[1, 0, ...], [0, ..., 0], [0, ..., 0], ...], ...] + pfor = pfor_control_flow_ops.pfor(loop_fn, num_iters) + manual = sparse_tensor.SparseTensor([[i, 0, 0] for i in range(num_iters)], + [1] * num_iters, + (num_iters, num_iters, num_iters)) + self.run_and_assert_equal(pfor, manual) + + +if __name__ == "__main__": + test.main() diff --git a/tensorflow/python/ops/parallel_for/gradients.py b/tensorflow/python/ops/parallel_for/gradients.py new file mode 100644 index 0000000000..ee3d5c9b86 --- /dev/null +++ b/tensorflow/python/ops/parallel_for/gradients.py @@ -0,0 +1,126 @@ +# Copyright 2018 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Jacobian ops.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from tensorflow.python.framework import ops +from tensorflow.python.ops import array_ops +from tensorflow.python.ops import check_ops +from tensorflow.python.ops import gradients as gradient_ops +from tensorflow.python.ops.parallel_for import control_flow_ops +from tensorflow.python.util import nest + + +def jacobian(output, inputs, use_pfor=True): + """Computes jacobian of `output` w.r.t. `inputs`. + + Args: + output: A tensor. + inputs: A tensor or a nested structure of tensor objects. + use_pfor: If true, uses pfor for computing the jacobian. Else uses + tf.while_loop. + + Returns: + A tensor or a nested strucutre of tensors with the same structure as + `inputs`. Each entry is the jacobian of `output` w.rt. to the corresponding + value in `inputs`. If output has shape [y_1, ..., y_n] and inputs_i has + shape [x_1, ..., x_m], the corresponding jacobian has shape + [y_1, ..., y_n, x_1, ..., x_m]. + """ + flat_inputs = nest.flatten(inputs) + output_shape = array_ops.shape(output) + output = array_ops.reshape(output, [-1]) + + def loop_fn(i): + y = array_ops.gather(output, i) + return gradient_ops.gradients(y, flat_inputs) + + try: + output_size = int(output.shape[0]) + except TypeError: + output_size = array_ops.shape(output)[0] + + if use_pfor: + pfor_outputs = control_flow_ops.pfor(loop_fn, output_size) + else: + pfor_outputs = control_flow_ops.for_loop( + loop_fn, [output.dtype] * len(flat_inputs), output_size) + + for i, out in enumerate(pfor_outputs): + new_shape = array_ops.concat( + [output_shape, array_ops.shape(out)[1:]], axis=0) + out = array_ops.reshape(out, new_shape) + pfor_outputs[i] = out + + return nest.pack_sequence_as(inputs, pfor_outputs) + + +def batch_jacobian(output, inp, use_pfor=True): + """Computes and stacks jacobians of `output[i,...]` w.r.t. `input[i,...]`. + + e.g. + x = tf.constant([[1, 2], [3, 4]], dtype=tf.float32) + y = x * x + jacobian = batch_jacobian(y, x) + # => [[[2, 0], [0, 4]], [[6, 0], [0, 8]]] + + Args: + output: A tensor with shape [b, y1, ..., y_n]. `output[i,...]` should + only depend on `inp[i,...]`. + inp: A tensor with shape [b, x1, ..., x_m] + use_pfor: If true, uses pfor for computing the Jacobian. Else uses a + tf.while_loop. + + Returns: + A tensor `t` with shape [b, y_1, ..., y_n, x1, ..., x_m] where `t[i, ...]` + is the jacobian of `output[i, ...]` w.r.t. `inp[i, ...]`, i.e. stacked + per-example jacobians. + + Raises: + ValueError: if first dimension of `output` and `inp` do not match. + """ + output_shape = output.shape + if not output_shape[0].is_compatible_with(inp.shape[0]): + raise ValueError("Need first dimension of output shape (%s) and inp shape " + "(%s) to match." % (output.shape, inp.shape)) + if output_shape.is_fully_defined(): + batch_size = int(output_shape[0]) + output_row_size = output_shape.num_elements() // batch_size + else: + output_shape = array_ops.shape(output) + batch_size = output_shape[0] + output_row_size = array_ops.size(output) // batch_size + inp_shape = array_ops.shape(inp) + # Flatten output to 2-D. + with ops.control_dependencies( + [check_ops.assert_equal(batch_size, inp_shape[0])]): + output = array_ops.reshape(output, [batch_size, output_row_size]) + + def loop_fn(i): + y = array_ops.gather(output, i, axis=1) + return gradient_ops.gradients(y, inp)[0] + + if use_pfor: + pfor_output = control_flow_ops.pfor(loop_fn, output_row_size) + else: + pfor_output = control_flow_ops.for_loop(loop_fn, output.dtype, + output_row_size) + pfor_output = array_ops.reshape(pfor_output, + [output_row_size, batch_size, -1]) + output = array_ops.transpose(pfor_output, [1, 0, 2]) + new_shape = array_ops.concat([output_shape, inp_shape[1:]], axis=0) + return array_ops.reshape(output, new_shape) diff --git a/tensorflow/python/ops/parallel_for/gradients_test.py b/tensorflow/python/ops/parallel_for/gradients_test.py new file mode 100644 index 0000000000..310a2154f7 --- /dev/null +++ b/tensorflow/python/ops/parallel_for/gradients_test.py @@ -0,0 +1,568 @@ +# Copyright 2018 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Tests for jacobian and batch_jacobian ops.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import functools +import time + +import numpy as np + +from tensorflow.python.client import session +from tensorflow.python.framework import constant_op +from tensorflow.python.framework import dtypes +from tensorflow.python.framework import errors +from tensorflow.python.framework import ops +from tensorflow.python.keras.engine import training as keras_training +from tensorflow.python.layers import layers as tf_layers +from tensorflow.python.ops import array_ops +from tensorflow.python.ops import gradients as gradient_ops +from tensorflow.python.ops import math_ops +from tensorflow.python.ops import nn +from tensorflow.python.ops import random_ops +from tensorflow.python.ops import rnn +from tensorflow.python.ops import rnn_cell +from tensorflow.python.ops import variables +from tensorflow.python.ops.losses import losses +from tensorflow.python.ops.parallel_for import control_flow_ops +from tensorflow.python.ops.parallel_for import gradients +from tensorflow.python.platform import test +from tensorflow.python.util import nest + + +class FullyConnectedModel(object): + + def __init__(self, activation_size, num_layers): + self._layers = [ + tf_layers.Dense(activation_size, activation=nn.relu) + for _ in range(num_layers) + ] + + def __call__(self, inp): + activation = inp + for layer in self._layers: + activation = layer(activation) + return activation + + +def fully_connected_model_fn(batch_size, activation_size, num_layers): + model = FullyConnectedModel(activation_size, num_layers) + inp = random_ops.random_normal([batch_size, activation_size]) + return inp, model(inp) + + +def lstm_model_fn(batch_size, state_size, steps): + inputs = [ + random_ops.random_normal([batch_size, state_size]) for _ in range(steps) + ] + cell = rnn_cell.BasicLSTMCell(state_size) + init_state = cell.zero_state(batch_size, dtypes.float32) + state = init_state + for inp in inputs: + _, state = cell(inp, state) + return init_state.c, state.c + + +def dynamic_lstm_model_fn(batch_size, state_size, max_steps): + # We make inputs and sequence_length constant so that multiple session.run + # calls produce the same result. + inputs = constant_op.constant( + np.random.rand(batch_size, max_steps, state_size), dtype=dtypes.float32) + sequence_length = constant_op.constant( + np.random.randint(0, size=[batch_size], high=max_steps + 1), + dtype=dtypes.int32) + + cell = rnn_cell.BasicLSTMCell(state_size) + initial_state = cell.zero_state(batch_size, dtypes.float32) + return inputs, rnn.dynamic_rnn( + cell, + inputs, + sequence_length=sequence_length, + initial_state=initial_state) + + +def create_fc_batch_jacobian(batch_size, activation_size, num_layers): + inp, output = fully_connected_model_fn(batch_size, activation_size, + num_layers) + pfor_jacobian = gradients.batch_jacobian(output, inp, use_pfor=True) + while_jacobian = gradients.batch_jacobian(output, inp, use_pfor=False) + return pfor_jacobian, while_jacobian + + +def create_lstm_batch_jacobian(batch_size, state_size, steps): + inp, output = lstm_model_fn(batch_size, state_size, steps) + pfor_jacobian = gradients.batch_jacobian(output, inp, use_pfor=True) + while_jacobian = gradients.batch_jacobian(output, inp, use_pfor=False) + return pfor_jacobian, while_jacobian + + +def create_dynamic_lstm_batch_jacobian(batch_size, state_size, max_steps): + inp, (_, final_state) = dynamic_lstm_model_fn(batch_size, state_size, + max_steps) + pfor_jacobian = gradients.batch_jacobian(final_state.c, inp, use_pfor=True) + # Note that use_pfor=False does not work above given the current limitations + # on implementation of while_loop. So we statically unroll the looping in the + # jacobian computation. + while_gradients = [ + gradient_ops.gradients(array_ops.gather(final_state.c, i, axis=1), inp)[0] + for i in range(state_size) + ] + return pfor_jacobian, while_gradients + + +def create_lstm_batch_hessian(batch_size, state_size, steps): + inp, output = lstm_model_fn(batch_size, state_size, steps) + pfor_jacobian = gradients.batch_jacobian(output, inp, use_pfor=True) + pfor_jacobian = array_ops.reshape(pfor_jacobian, [batch_size, -1]) + pfor_hessian = gradients.batch_jacobian(pfor_jacobian, inp, use_pfor=True) + # TODO(agarwal): using two nested while_loop doesn't seem to work here. + # Hence we use pfor_jacobian for computing while_hessian. + while_jacobian = pfor_jacobian + while_hessian = gradients.batch_jacobian(while_jacobian, inp, use_pfor=False) + return pfor_hessian, while_hessian + + +def create_lstm_hessian(batch_size, state_size, steps): + _, output = lstm_model_fn(batch_size, state_size, steps) + weights = variables.trainable_variables() + pfor_jacobians = gradients.jacobian(output, weights, use_pfor=True) + pfor_hessians = [ + gradients.jacobian(x, weights, use_pfor=True) for x in pfor_jacobians + ] + # TODO(agarwal): using two nested while_loop doesn't seem to work here. + # Hence we use pfor_jacobians for computing while_hessians. + while_jacobians = pfor_jacobians + while_hessians = [ + gradients.jacobian(x, weights, use_pfor=False) for x in while_jacobians + ] + return pfor_hessians, while_hessians + + +def create_fc_per_eg_grad(batch_size, activation_size, num_layers): + inp = random_ops.random_normal([batch_size, activation_size]) + layers = [ + tf_layers.Dense(activation_size, activation=nn.relu) + for _ in range(num_layers) + ] + projection = tf_layers.Dense(1) + + def model_fn(activation): + for layer in layers: + activation = layer(activation) + activation = projection(activation) + activation = nn.l2_loss(activation) + return gradient_ops.gradients(activation, variables.trainable_variables()) + + def loop_fn(i): + return model_fn(array_ops.expand_dims(array_ops.gather(inp, i), 0)) + + pfor_outputs = control_flow_ops.pfor(loop_fn, batch_size) + loop_fn_dtypes = [x.dtype for x in variables.trainable_variables()] + while_outputs = control_flow_ops.for_loop(loop_fn, loop_fn_dtypes, batch_size) + return pfor_outputs, while_outputs + + +def create_lstm_per_eg_grad(batch_size, state_size, steps): + inputs = [ + random_ops.random_normal([batch_size, state_size]) for _ in range(steps) + ] + cell = rnn_cell.BasicLSTMCell(state_size) + init_state = cell.zero_state(batch_size, dtypes.float32) + + def model_fn(inps, init_state): + state = init_state + for inp in inps: + _, state = cell(inp, state) + output = nn.l2_loss(state.c) + return gradient_ops.gradients(output, variables.trainable_variables()) + + def loop_fn(i): + loop_inputs = [ + array_ops.expand_dims(array_ops.gather(x, i), 0) for x in inputs + ] + loop_init_state = rnn_cell.LSTMStateTuple( + *[array_ops.expand_dims(array_ops.gather(x, i), 0) for x in init_state]) + return model_fn(loop_inputs, loop_init_state) + + pfor_outputs = control_flow_ops.pfor(loop_fn, batch_size) + loop_fn_dtypes = [x.dtype for x in variables.trainable_variables()] + while_outputs = control_flow_ops.for_loop(loop_fn, loop_fn_dtypes, batch_size) + return pfor_outputs, while_outputs + + +# Importing the code from tensorflow_models seems to cause errors. Hence we +# duplicate the model definition here. +# TODO(agarwal): Use the version in tensorflow_models/official instead. +class Mnist(keras_training.Model): + + def __init__(self, data_format): + """Creates a model for classifying a hand-written digit. + + Args: + data_format: Either 'channels_first' or 'channels_last'. + """ + super(Mnist, self).__init__() + if data_format == "channels_first": + self._input_shape = [-1, 1, 28, 28] + else: + assert data_format == "channels_last" + self._input_shape = [-1, 28, 28, 1] + + self.conv1 = tf_layers.Conv2D( + 32, 5, padding="same", data_format=data_format, activation=nn.relu) + self.conv2 = tf_layers.Conv2D( + 64, 5, padding="same", data_format=data_format, activation=nn.relu) + self.fc1 = tf_layers.Dense(1024, activation=nn.relu) + self.fc2 = tf_layers.Dense(10) + self.dropout = tf_layers.Dropout(0.4) + self.max_pool2d = tf_layers.MaxPooling2D( + (2, 2), (2, 2), padding="same", data_format=data_format) + + def __call__(self, inputs, training): + """Add operations to classify a batch of input images. + + Args: + inputs: A Tensor representing a batch of input images. + training: A boolean. Set to True to add operations required only when + training the classifier. + + Returns: + A logits Tensor with shape [<batch_size>, 10]. + """ + y = array_ops.reshape(inputs, self._input_shape) + y = self.conv1(y) + y = self.max_pool2d(y) + y = self.conv2(y) + y = self.max_pool2d(y) + y = tf_layers.flatten(y) + y = self.fc1(y) + y = self.dropout(y, training=training) + return self.fc2(y) + + +def create_mnist_per_eg_grad(batch_size, data_format, training): + images = random_ops.random_uniform([batch_size, 28, 28]) + sparse_labels = np.random.randint( + low=0, high=10, size=[batch_size]).astype(np.int32) + labels = np.zeros((batch_size, 10)).astype(np.float32) + labels[np.arange(batch_size), sparse_labels] = 1. + model = Mnist(data_format) + + def loop_fn(i): + image = array_ops.gather(images, i) + label = array_ops.gather(labels, i) + logits = array_ops.reshape(model(image, training=training), [-1]) + loss = losses.softmax_cross_entropy( + logits=logits, onehot_labels=label, reduction=losses.Reduction.NONE) + return gradient_ops.gradients(loss, variables.trainable_variables()) + + pfor_outputs = control_flow_ops.pfor(loop_fn, batch_size) + while_outputs = control_flow_ops.for_loop( + loop_fn, [dtypes.float32] * len(variables.trainable_variables()), + batch_size) + return pfor_outputs, while_outputs + + +def create_mnist_per_eg_jacobian(batch_size, data_format, training): + images = random_ops.random_uniform([batch_size, 28, 28]) + model = Mnist(data_format) + + def loop_fn(i, use_pfor): + image = array_ops.gather(images, i) + logits = array_ops.reshape(model(image, training=training), [-1]) + return gradients.jacobian( + logits, variables.trainable_variables(), use_pfor=use_pfor) + + pfor_outputs = control_flow_ops.pfor( + functools.partial(loop_fn, use_pfor=True), + batch_size) + while_outputs = control_flow_ops.for_loop( + functools.partial(loop_fn, use_pfor=False), + [dtypes.float32] * len(variables.trainable_variables()), batch_size) + return pfor_outputs, while_outputs + + +def create_fc_per_eg_jacobians(batch_size, activation_size, num_layers): + model = FullyConnectedModel(activation_size=activation_size, + num_layers=num_layers) + inp = random_ops.random_normal([batch_size, activation_size]) + output = model(inp) + jacobians = gradients.jacobian(output, variables.trainable_variables()) + + def loop_fn(i, use_pfor): + inp_i = array_ops.expand_dims(array_ops.gather(inp, i), 0) + output = array_ops.reshape(model(inp_i), [-1]) + return gradients.jacobian( + output, variables.trainable_variables(), use_pfor=use_pfor) + + per_eg_jacobians_pfor = control_flow_ops.pfor( + functools.partial(loop_fn, use_pfor=True), + batch_size) + per_eg_jacobians_while = control_flow_ops.for_loop( + functools.partial(loop_fn, use_pfor=False), + [dtypes.float32] * len(variables.trainable_variables()), batch_size) + return jacobians, per_eg_jacobians_pfor, per_eg_jacobians_while + + +class GradientsTest(test.TestCase): + + def run_and_assert_equal(self, targets1, targets2, atol=1e-4, rtol=1e-4): + targets1 = nest.flatten(targets1) + targets2 = nest.flatten(targets2) + assert len(targets1) == len(targets2) + init = variables.global_variables_initializer() + self.evaluate(init) + outputs = self.evaluate(targets1 + targets2) + n = len(outputs) // 2 + for i in range(n): + self.assertAllClose(outputs[i], outputs[i + n], rtol=rtol, atol=atol) + + def test_jacobian_fixed_shape(self): + x = random_ops.random_uniform([2, 2]) + y = math_ops.matmul(x, x, transpose_a=True) + jacobian_pfor = gradients.jacobian(y, x, use_pfor=True) + jacobian_while = gradients.jacobian(y, x, use_pfor=False) + answer = ops.convert_to_tensor([[ + gradient_ops.gradients(y[0][0], x)[0], + gradient_ops.gradients(y[0][1], x)[0] + ], [ + gradient_ops.gradients(y[1][0], x)[0], + gradient_ops.gradients(y[1][1], x)[0] + ]]) + self.run_and_assert_equal(answer, jacobian_pfor) + self.run_and_assert_equal(answer, jacobian_while) + + def test_jacobian_unknown_shape(self): + with self.test_session() as sess: + x = array_ops.placeholder(dtypes.float32, shape=[None, None]) + y = math_ops.matmul(x, x, transpose_a=True) + jacobian_pfor = gradients.jacobian(y, x, use_pfor=True) + jacobian_while = gradients.jacobian(y, x, use_pfor=False) + answer = ops.convert_to_tensor([[ + gradient_ops.gradients(y[0][0], x)[0], + gradient_ops.gradients(y[0][1], x)[0] + ], [ + gradient_ops.gradients(y[1][0], x)[0], + gradient_ops.gradients(y[1][1], x)[0] + ]]) + ans, pfor_value, while_value = sess.run( + [answer, jacobian_pfor, jacobian_while], + feed_dict={x: [[1, 2], [3, 4]]}) + self.assertAllClose(ans, pfor_value) + self.assertAllClose(ans, while_value) + + def test_batch_jacobian_bad_shapes(self): + x = random_ops.random_uniform([2, 2]) + y = random_ops.random_uniform([3, 2]) + with self.assertRaisesRegexp(ValueError, "Need first dimension of output"): + gradients.batch_jacobian(y, x, use_pfor=True) + + def test_batch_jacobian_bad_unknown_shapes(self): + with self.test_session() as sess: + x = array_ops.placeholder(dtypes.float32) + y = array_ops.concat([x, x], axis=0) + jacobian = gradients.batch_jacobian(y, x) + with self.assertRaisesRegexp(errors.InvalidArgumentError, + "assertion failed"): + sess.run(jacobian, feed_dict={x: [[1, 2], [3, 4]]}) + + def test_batch_jacobian_fixed_shape(self): + x = random_ops.random_uniform([2, 3, 5]) + y = x * x + batch_jacobian_pfor = gradients.batch_jacobian(y, x, use_pfor=True) + batch_jacobian_while = gradients.batch_jacobian(y, x, use_pfor=False) + two_x = 2 * x + answer = array_ops.stack( + [array_ops.diag(two_x[0]), + array_ops.diag(two_x[1])]) + self.run_and_assert_equal(answer, batch_jacobian_pfor) + self.run_and_assert_equal(answer, batch_jacobian_while) + + def test_batch_jacobian_unknown_shape(self): + with self.test_session() as sess: + x = array_ops.placeholder(dtypes.float32) + y = x * x + batch_jacobian_pfor = gradients.batch_jacobian(y, x, use_pfor=True) + batch_jacobian_while = gradients.batch_jacobian(y, x, use_pfor=False) + two_x = 2 * x + answer = array_ops.stack( + [array_ops.diag(two_x[0]), + array_ops.diag(two_x[1])]) + ans, pfor_value, while_value = sess.run( + [answer, batch_jacobian_pfor, batch_jacobian_while], + feed_dict={x: [[1, 2], [3, 4]]}) + self.assertAllClose(ans, pfor_value) + self.assertAllClose(ans, while_value) + + def test_fc_batch_jacobian(self): + pfor_jacobian, while_jacobian = create_fc_batch_jacobian(8, 4, 2) + self.run_and_assert_equal(pfor_jacobian, while_jacobian) + + def test_lstm_batch_jacobian(self): + pfor_jacobian, while_jacobian = create_lstm_batch_jacobian(8, 4, 2) + self.run_and_assert_equal(pfor_jacobian, while_jacobian) + + def test_dynamic_lstm_batch_jacobian(self): + pfor_jacobian, while_gradients = create_dynamic_lstm_batch_jacobian(8, 4, 3) + with session.Session() as sess: + init = variables.global_variables_initializer() + sess.run(init) + pfor = sess.run(pfor_jacobian) + for i in range(4): + while_i = sess.run(while_gradients[i]) + self.assertAllClose(while_i, pfor[:, i, ...]) + + def test_lstm_hessian(self): + pfor_hessian, while_hessian = create_lstm_hessian(2, 2, 2) + self.run_and_assert_equal(pfor_hessian, while_hessian) + + def test_lstm_batch_hessian(self): + pfor_hessian, while_hessian = create_lstm_batch_hessian(2, 2, 2) + self.run_and_assert_equal(pfor_hessian, while_hessian) + + def test_fc_per_eg_grad(self): + pfor_outputs, while_outputs = create_fc_per_eg_grad(8, 4, 2) + self.run_and_assert_equal(pfor_outputs, while_outputs) + + def test_lstm_per_eg_grad(self): + pfor_outputs, while_outputs = create_lstm_per_eg_grad(8, 4, 2) + self.run_and_assert_equal(pfor_outputs, while_outputs) + + def test_mnist_per_eg_grad(self): + data_format = ("channels_first" + if test.is_gpu_available() else "channels_last") + # Note that we we are setting training=False here so that dropout produces + # the same result with pfor and with while_loop. + pfor_outputs, while_outputs = create_mnist_per_eg_grad( + 4, data_format, training=False) + self.run_and_assert_equal(pfor_outputs, while_outputs, rtol=1e-3) + + def test_mnist_per_eg_jacobian(self): + data_format = ("channels_first" + if test.is_gpu_available() else "channels_last") + # Note that we we are setting training=False here so that dropout produces + # the same result with pfor and with while_loop. + pfor_outputs, while_outputs = create_mnist_per_eg_jacobian( + 2, data_format, training=False) + self.run_and_assert_equal(pfor_outputs, while_outputs, rtol=1e-3) + + def test_fc_jacobian(self): + jacobians, per_eg_jacobians_pfor, per_eg_jacobians_while = ( + create_fc_per_eg_jacobians(batch_size=8, + activation_size=4, + num_layers=2)) + self.run_and_assert_equal(jacobians, per_eg_jacobians_pfor, + rtol=2e-3, atol=1e-3) + self.run_and_assert_equal(jacobians, per_eg_jacobians_while, + rtol=2e-3, atol=1e-3) + + +class GradientsBenchmarks(test.Benchmark): + + def _run(self, targets, iters, name=None): + + def _done(t): + # Note that we don't use tf.control_dependencies since that will not make + # sure that the computation on GPU has actually finished. So we fetch the + # first element of the output, and assume that this will not be called on + # empty tensors. + return array_ops.gather(array_ops.reshape(t, [-1]), 0) + + targets = [_done(x) for x in nest.flatten(targets)] + sess = session.Session() + with sess: + init = variables.global_variables_initializer() + sess.run(init) + sess.run(targets) + begin = time.time() + for _ in range(iters): + sess.run(targets) + end = time.time() + avg_time_ms = 1000 * (end - begin) / iters + self.report_benchmark(iters=iters, wall_time=avg_time_ms, name=name) + return avg_time_ms + + def benchmark_fc_batch_jacobian(self): + with ops.Graph().as_default(): + pfor_jacobian, while_jacobian = create_fc_batch_jacobian(100, 32, 20) + self._run(pfor_jacobian, 100, name="fc_batch_jacobian_pfor") + self._run(while_jacobian, 20, name="fc_batch_jacobian_while") + + def benchmark_lstm_batch_jacobian(self): + with ops.Graph().as_default(): + pfor_jacobian, while_jacobian = create_lstm_batch_jacobian(100, 32, 8) + self._run(pfor_jacobian, 100, name="lstm_batch_jacobian_pfor") + self._run(while_jacobian, 20, name="lstm_batch_jacobian_while") + + def benchmark_lstm_hessian(self): + with ops.Graph().as_default(): + pfor_hessian, while_hessian = create_lstm_hessian(2, 2, 10) + self._run(pfor_hessian, 20, name="lstm_hessian_pfor") + self._run(while_hessian, 3, name="lstm_hessian_while_pfor") + + def benchmark_lstm_batch_hessian(self): + with ops.Graph().as_default(): + pfor_hessian, while_hessian = create_lstm_batch_hessian(4, 4, 10) + self._run(pfor_hessian, 100, name="lstm_batch_hessian_pfor") + self._run(while_hessian, 20, name="lstm_batch_hessian_while_pfor") + + def benchmark_fc_per_eg_grad(self): + with ops.Graph().as_default(): + pfor_outputs, while_outputs = create_fc_per_eg_grad(100, 32, 3) + self._run(pfor_outputs, 100, name="fc_per_eg_grad_pfor") + self._run(while_outputs, 20, name="fc_per_eg_grad_while") + + def benchmark_lstm_per_eg_grad(self): + with ops.Graph().as_default(): + pfor_outputs, while_outputs = create_lstm_per_eg_grad(100, 32, 8) + self._run(pfor_outputs, 100, name="lstm_per_eg_grad_pfor") + self._run(while_outputs, 20, name="lstm_per_eg_grad_while") + + def benchmark_mnist_per_eg_grad(self): + with ops.Graph().as_default(): + data_format = ("channels_first" + if test.is_gpu_available() else "channels_last") + pfor_outputs, while_outputs = create_mnist_per_eg_grad( + 128, data_format, training=True) + self._run(pfor_outputs, 20, name="mnist_per_eg_grad_pfor") + self._run(while_outputs, 20, name="mnist_per_eg_grad_while") + + def benchmark_mnist_per_eg_jacobian(self): + with ops.Graph().as_default(): + data_format = ("channels_first" + if test.is_gpu_available() else "channels_last") + pfor_outputs, while_outputs = create_mnist_per_eg_jacobian( + 16, data_format, training=True) + self._run(pfor_outputs, 20, name="mnist_per_eg_jacobian_pfor") + self._run(while_outputs, 20, name="mnist_per_eg_jacobian_while") + + def benchmark_fc_per_eg_jacobian(self): + with ops.Graph().as_default(): + jacobians, per_eg_jacobians_pfor, per_eg_jacobians_while = ( + create_fc_per_eg_jacobians(batch_size=128, + activation_size=32, + num_layers=3)) + self._run(jacobians, 30, name="fc_jacobians_pfor") + self._run(per_eg_jacobians_pfor, 100, + name="fc_per_eg_jacobians_pfor") + self._run(per_eg_jacobians_while, 10, + name="fc_per_eg_jacobians_while") + + +if __name__ == "__main__": + test.main() diff --git a/tensorflow/python/ops/parallel_for/pfor.py b/tensorflow/python/ops/parallel_for/pfor.py new file mode 100644 index 0000000000..1c0ef24040 --- /dev/null +++ b/tensorflow/python/ops/parallel_for/pfor.py @@ -0,0 +1,2484 @@ +# Copyright 2018 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Compiled parallel-for loop.""" +# pylint: disable=missing-docstring + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import collections + +from absl import flags + +from tensorflow.python.framework import constant_op +from tensorflow.python.framework import dtypes +from tensorflow.python.framework import ops +from tensorflow.python.framework import sparse_tensor +from tensorflow.python.framework import tensor_shape +from tensorflow.python.framework import tensor_util +from tensorflow.python.ops import array_ops +from tensorflow.python.ops import check_ops +from tensorflow.python.ops import control_flow_ops +from tensorflow.python.ops import data_flow_ops +from tensorflow.python.ops import functional_ops +from tensorflow.python.ops import gen_sparse_ops +from tensorflow.python.ops import math_ops +from tensorflow.python.ops import nn_ops +from tensorflow.python.ops import sparse_ops +from tensorflow.python.ops import tensor_array_ops +from tensorflow.python.platform import tf_logging as logging + +flags.DEFINE_bool( + "op_conversion_fallback_to_while_loop", False, + "If true, falls back to using a while loop for ops for " + "which a converter is not defined.") + + +def _stack(t, length): + """stacks `t` `length` times.""" + ones = array_ops.ones_like(array_ops.shape(t)) + multiples = array_ops.concat([length, ones], 0) + t = array_ops.tile(array_ops.expand_dims(t, 0), multiples) + return wrap(t, True) + + +# The following stateful ops can be safely called once, and with the same +# signature as the unconverted version, if their inputs are loop invariant. +# TODO(agarwal): implement a strategy for converting Variable reads/writes. The +# plan is to map each read/write in the loop_fn to a corresponding merged +# read/write in the converted graph. Writes need to be mergeable (e.g. +# AssignAdd) to be used in `pfor`. Given a certain read/write order in the +# loop_fn, doing a one-to-one conversion will simulate executing such +# instructions in lock-step across all iterations. +passthrough_stateful_ops = set([ + "VariableV2", + "VarHandleOp", + "ReadVariableOp", + "StackV2", + "TensorArrayWriteV3", + "TensorArrayReadV3", + "TensorArraySizeV3", +]) + + +def _is_stateful_pfor_op(op): + if isinstance(op, WhileOp): + return op.is_stateful + if op.type == "Const": + # Const didn't have an op_def. + return False + if op.type in passthrough_stateful_ops: + return False + assert hasattr(op, "op_def") and op.op_def is not None, op + return op.op_def.is_stateful + + +# pylint: disable=protected-access +class WhileOp(object): + """Object for storing state for converting the outputs of a while_loop.""" + + def __init__(self, exit_node, pfor_ops): + """Initializer. + + Args: + exit_node: A tensor output from the while_loop. + pfor_ops: list of ops inside the current pfor loop. + """ + self._pfor_ops = set(pfor_ops) + self._pfor_op_ids = set([x._id for x in pfor_ops]) + assert isinstance(exit_node, ops.Tensor) + self._while_context = exit_node.op._get_control_flow_context() + assert isinstance(self._while_context, control_flow_ops.WhileContext) + self._context_name = self._while_context.name + self._condition = self._while_context.pivot.op.inputs[0] + # Parts of an external while_loop could be created inside a pfor loop. + # However for the purpose here, we declare such loops to be external. Also + # note that we check if the condition was created inside or outside to + # determine if the while_loop was first created inside or outside. + # TODO(agarwal): check that the Enter and Exit of this loop are unstacked. + self._is_inside_loop = self.op_is_inside_loop(self._condition.op) + if self._is_inside_loop: + for e in self._while_context.loop_exits: + assert self.op_is_inside_loop(e.op) + + # Note the code below tries to reverse engineer an existing while_loop graph + # by assuming the following pattern of nodes. + # + # NextIteration <---- Body <--- Enter + # | ^ + # V ___| Y + # Enter -> Merge -> Switch___ + # ^ | N + # | V + # LoopCond Exit + + # Node that elements in the list below correspond one-to-one with each + # other. i.e. these lists are the same size, and the i_th entry corresponds + # to different Operations/Tensors of a single cycle as illustrated above. + # List of Switch ops (ops.Operation) that feed into an Exit Node. + self._exit_switches = [] + # List of inputs (ops.Tensor) to NextIteration. + self._body_outputs = [] + # List of list of control inputs of the NextIteration nodes. + self._next_iter_control_inputs = [] + # List of Merge ops (ops.Operation). + self._enter_merges = [] + # List of output (ops.Tensor) of Exit nodes. + self._outputs = [] + + # List of Enter Tensors. + # There are two types of Enter nodes: + # - The Enter nodes that are used in the `loop_vars` argument to + # `while_loop` (see + # https://www.tensorflow.org/api_docs/python/tf/while_loop). We collect + # these Enter nodes immediately below by tracing backwards from the Exit + # nodes via Exit <- Switch <- Merge <- Enter. You can see this chain in the + # diagram above. This allows us to have a 1:1 correspondence between the + # self._outputs and the first elements in self._enters. + # - The Enter nodes that are used only by the body. They don't appear in the + # `loop_vars` and are not returned from the `while_loop`. In Python code, + # they are usually captured by the body lambda. We collect them below by + # iterating over all the ops in the graph. They are appended to the end of + # self._enters and don't correspond to any outputs in self._outputs. + self._enters = [] + + for e in self._while_context.loop_exits: + self._outputs.append(e.op.outputs[0]) + switch = e.op.inputs[0].op + assert switch.type == "Switch", switch + self._exit_switches.append(switch) + merge = switch.inputs[0].op + assert merge.type == "Merge", merge + self._enter_merges.append(merge) + enter = merge.inputs[0].op + assert enter.type == "Enter", enter + self._enters.append(enter.outputs[0]) + next_iter = merge.inputs[1].op + assert next_iter.type == "NextIteration", next_iter + self._body_outputs.append(next_iter.inputs[0]) + self._next_iter_control_inputs.append(next_iter.control_inputs) + + # Collect all the Enter nodes that are not part of `loop_vars`, the second + # category described above. + # Also track whether the loop body has any stateful ops. + self._is_stateful = False + for op in ops.get_default_graph().get_operations(): + # TODO(agarwal): make sure this works with nested case. + control_flow_context = op._get_control_flow_context() + if control_flow_context is None: + continue + if control_flow_context.name == self._context_name: + self._is_stateful |= _is_stateful_pfor_op(op) + if op.type == "Enter": + output = op.outputs[0] + if output not in self._enters: + self._enters.append(output) + + def __str__(self): + """String representation.""" + return "while_loop(%s)" % self.name + + @property + def inputs(self): + """Input to all the Enter nodes.""" + return [x.op.inputs[0] for x in self._enters] + + @property + def control_inputs(self): + """Control input to all the Enter nodes.""" + control_inputs = [] + for x in self._enters: + control_inputs.extend(x.op.control_inputs) + return control_inputs + + @property + def outputs(self): + """Outputs of all the Exit nodes.""" + return self._outputs + + @property + def name(self): + """Context name for the while loop.""" + return self._context_name + + @property + def is_inside_loop(self): + """Returns true if the while_loop was created inside the pfor.""" + return self._is_inside_loop + + def op_is_inside_loop(self, op): + """True if op was created inside the pfor loop body.""" + assert isinstance(op, ops.Operation) + # Note that we use self._pfor_op_ids for the check and not self._pfor_ops + # since it appears there tensorflow API could return different python + # objects representing the same Operation node. + return op._id in self._pfor_op_ids + + @property + def is_stateful(self): + return self._is_stateful + + @property + def pfor_converter(self): + """Return a converter for the while loop.""" + return self + + def _init_pfor(self, parent_pfor, indices, cond_stacked, inputs, + inputs_stacked): + """Create a PFor object for converting parts of the while_loop. + + Args: + parent_pfor: PFor object being used for converting the while_loop. + indices: int32 Tensor of ids for the iterations that are still active + (i.e. did not exit the while_loop). + cond_stacked: True if the while_loop condition is stacked. + inputs: list of input Tensors corresponding 1-to-1 with self._enters. Note + that these Tensors are a subset of the loop variables for the generated + while_loop. + inputs_stacked: List of booleans corresponding 1-to-1 with `inputs`, + indicating if the value is stacked or not. + + Returns: + A PFor instance. The instance is initialized by adding conversion mappings + of nodes that will be external to the conversion that the returned + instance will be used for. e.g. Enter nodes as well as Merge and Switch + outputs are mapped to converted values. + """ + num_outputs = len(self._outputs) + assert len(inputs) == len(self._enters) + assert len(inputs_stacked) == len(self._enters) + loop_var = parent_pfor.loop_var + loop_len = array_ops.size(indices) + pfor = PFor( + loop_var, + loop_len, + pfor_ops=self._pfor_ops, + all_indices=indices, + all_indices_partitioned=cond_stacked) + # Map all Enter nodes to the inputs. + for enter, inp, stacked in zip(self._enters, inputs, inputs_stacked): + pfor._add_conversion(enter, wrap(inp, stacked)) + # Map outputs of Switch and Merge. + for i in range(num_outputs): + wrapped_inp = wrap(inputs[i], inputs_stacked[i]) + merge = self._enter_merges[i] + pfor._add_conversion(merge.outputs[0], wrapped_inp) + # Note that second output of Merge is typically not used, except possibly + # as a control dependency. To avoid trying to output the correct value, we + # employ a hack here. We output a dummy invalid value with an incorrect + # dtype. This will allow control dependency to work but if using it as an + # input, it should typically lead to errors during graph construction due + # to dtype mismatch. + # TODO(agarwal): Check in the original graph to see if there are any + # consumers of this Tensor that use it as an input. + pfor._add_conversion(merge.outputs[1], + wrap(constant_op.constant(-1.0), False)) + switch = self._exit_switches[i] + # Don't need to worry about switch.output[0] which will feed to Exit node. + pfor._add_conversion(switch.outputs[1], wrapped_inp) + return pfor + + def _convert_enter(self, parent_pfor, enter): + """Converts an Enter node.""" + inp, stacked, _ = parent_pfor._convert_helper(enter.op.inputs[0]) + control_inputs = [ + parent_pfor._convert_helper(x).t for x in enter.op.control_inputs + ] + if control_inputs: + with ops.control_dependencies(control_inputs): + inp = array_ops.identity(inp) + return inp, stacked + + def _maybe_stacked(self, cache, inp): + """Heuristic to figue out if the coverting inp leads to a stacked value. + + + Args: + cache: map from Tensor to boolean indicating stacked/unstacked. + inp: input Tensor. + + Returns: + True if `inp` could get stacked. If the function returns False, the + converted value should be guaranteed to be unstacked. If returning True, + it may or may not be stacked. + """ + if inp in cache: + return cache[inp] + if not self.op_is_inside_loop(inp.op): + return False + op = inp.op + output = False + if op.type in [ + "Shape", + "Rank" + "ShapeN", + "ZerosLike", + "TensorArrayV3", + "TensorArraySizeV3", + ]: + output = False + elif _is_stateful_pfor_op(op): + # This may be fairly aggressive. + output = True + elif op.type == "Exit": + # This may be fairly aggressive. + output = True + else: + for t in op.inputs: + if self._maybe_stacked(cache, t): + output = True + break + cache[inp] = output + return output + + def _create_init_values(self, pfor_input): + """Create arguments passed to converted while_loop.""" + with ops.name_scope("while_init"): + loop_len_vector = pfor_input.pfor.loop_len_vector + loop_len = loop_len_vector[0] + num_outputs = len(self._outputs) + + inputs = [] + maybe_stacked_cache = {} + # Convert all the Enters. Need to do this before checking for stacking + # below. + for i, enter in enumerate(self._enters): + inp, stacked = self._convert_enter(pfor_input.pfor, enter) + inputs.append(inp) + maybe_stacked_cache[enter] = stacked + # Since this enter node is part of the `loop_vars`, it corresponds to an + # output and its preceding switch. We mark this switch's output the same + # stackness, to act at the base case for the logic below. Below, we will + # be going through the body figuring out which inputs might need to be + # stacked and which inputs can safely remain unstacked. + if i < num_outputs: + maybe_stacked_cache[self._exit_switches[i].outputs[1]] = stacked + + # Shape invariants for init_values corresponding to self._enters. + input_shape_invariants = [] + # TensorArrays for outputs of converted while loop + output_tas = [] + # Shape invariants for output TensorArrays. + ta_shape_invariants = [] + # List of booleans indicating stackness of inputs, i.e. tensors + # corresponding to self._enters. + inputs_stacked = [] + for i, inp in enumerate(inputs): + enter = self._enters[i] + inp_stacked = self._maybe_stacked(maybe_stacked_cache, enter) + # Note that even when an input is unstacked, the body could make it + # stacked. we use a heuristic below to figure out if body may be making + # it stacked. + if i < num_outputs: + body_output = self._body_outputs[i] + if enter.op in self._pfor_ops: + body_output_stacked = self._maybe_stacked(maybe_stacked_cache, + body_output) + else: + # If constructed outside of pfor loop, then the output would not be + # stacked. + body_output_stacked = False + if body_output_stacked and not inp_stacked: + inp = _stack(inp, loop_len_vector).t + inputs[i] = inp + inp_stacked = True + # TODO(agarwal): other attributes for the TensorArray ? + output_tas.append(tensor_array_ops.TensorArray(inp.dtype, loop_len)) + ta_shape_invariants.append(tensor_shape.TensorShape(None)) + + inputs_stacked.append(inp_stacked) + input_shape_invariants.append(tensor_shape.TensorShape(None)) + + # See documentation for __call__ for the structure of init_values. + init_values = [True, pfor_input.pfor.all_indices] + inputs + output_tas + # TODO(agarwal): try stricter shape invariants + shape_invariants = ( + [tensor_shape.TensorShape(None), + tensor_shape.TensorShape(None) + ] + input_shape_invariants + ta_shape_invariants) + + return init_values, inputs_stacked, shape_invariants + + def _process_cond_unstacked(self, conditions, indices, inputs, output_tas): + """Handles case when condition is unstacked. + + Note that all iterations end together. So we don't need to partition the + inputs. When all iterations are done, we write the inputs to the + TensorArrays. Note that we only write to index 0 of output_tas. Since all + iterations end together, they can all be output together. + """ + not_all_done = array_ops.reshape(conditions, []) + new_output_tas = [] + # pylint: disable=cell-var-from-loop + for i, out_ta in enumerate(output_tas): + inp = inputs[i] + new_output_tas.append( + control_flow_ops.cond(not_all_done, + lambda: out_ta, + lambda: out_ta.write(0, inp))) + # pylint: enable=cell-var-from-loop + return not_all_done, indices, inputs, new_output_tas + + def _process_cond_stacked(self, conditions, indices, inputs, inputs_stacked, + output_tas): + num_outputs = len(self._outputs) + # Compute if all iterations are done. + not_all_done = math_ops.reduce_any(conditions) + conditions_int = math_ops.cast(conditions, dtypes.int32) + # Partition the indices. + done_indices, new_indices = data_flow_ops.dynamic_partition( + indices, conditions_int, 2) + + new_inputs = [] + new_output_tas = [] + for i, (inp, stacked) in enumerate(zip(inputs, inputs_stacked)): + # Partition the inputs. + if stacked: + done_inp, new_inp = data_flow_ops.dynamic_partition( + inp, conditions_int, 2) + else: + # TODO(agarwal): avoid this stacking. See TODO earlier in + # _process_cond_unstacked. + done_inp = _stack(inp, [array_ops.size(done_indices)]).t + new_inp = inp + new_inputs.append(new_inp) + # For iterations that are done, write them to TensorArrays. + if i < num_outputs: + out_ta = output_tas[i] + # Note that done_indices can be empty. done_inp should also be empty in + # that case. + new_output_tas.append(out_ta.scatter(done_indices, done_inp)) + return not_all_done, new_indices, new_inputs, new_output_tas + + def _process_body(self, pfor_input, inputs_stacked, + new_indices, cond_stacked, new_inputs, + not_all_done): + """Convert the body function.""" + + def true_fn(control_inputs, body_pfor, body_output, stacked): + """Converts the body function for all but last iteration. + + This essentially converts body_output. Additionally, it needs to handle + any control dependencies on the NextIteration node. So it creates another + Identity node with the converted dependencies. + """ + converted_control_inp = [] + for x in control_inputs: + for t in x.outputs: + converted_control_inp.append(body_pfor._convert_helper(t).t) + if stacked: + # Note convert always does the stacking. + output = body_pfor.convert(body_output) + else: + output, convert_stacked, _ = body_pfor._convert_helper(body_output) + assert convert_stacked == stacked, body_output + with ops.control_dependencies(converted_control_inp): + return array_ops.identity(output) + + body_pfor = self._init_pfor(pfor_input.pfor, new_indices, + cond_stacked, new_inputs, + inputs_stacked) + new_outputs = [] + + for i, (body_output, stacked) in enumerate( + zip(self._body_outputs, inputs_stacked)): + control_inp = self._next_iter_control_inputs[i] + out_dtype = body_output.dtype + # Note that we want to run the body only if not all pfor iterations are + # done. If all are done, we return empty tensors since these values will + # not be used. Notice that the value returned by the loop is based on + # TensorArrays and not directly on these returned values. + # pylint: disable=cell-var-from-loop + new_output = control_flow_ops.cond( + not_all_done, + lambda: true_fn(control_inp, body_pfor, body_output, stacked), + lambda: constant_op.constant([], dtype=out_dtype)) + # pylint: enable=cell-var-from-loop + new_outputs.append(new_output) + return new_outputs + + def __call__(self, pfor_input): + """Converter for the while_loop. + + The conversion of a while_loop is another while_loop. + + The arguments to this converted while_loop are as follows: + not_all_done: Boolean scalar Tensor indicating if all the pfor iterations + are done. + indices: int32 1-D Tensor storing the id of the iterations that are not + done. + args: Remaining arguments. These can be divided into 3 categories: + - First set of arguments are the tensors that correspond to the initial + elements of self._enters. The elements that appear in original while + loop's `loop_vars`. + - The second set of arguments are the tensors that correspond to the + remaining elements of self._enters. These are the tensors that directly + enter the original while loop body. + - Finally, the last set of arguments are TensorArrays. These TensorArrays + correspond to the outputs of the original while_loop, i.e. to the + elements in self._outputs. Each TensorArray has `PFor.loop_len` + elements, i.e. the number of pfor iterations. At the end, the i'th + element of each TensorArray will contain the output computed by the + i'th iteration of pfor. Note that elements can be written into these + tensors arrays in any order, depending on when the corresponding pfor + iteration is done. + If the original while_loop had `k` tensors in its `loop_vars` and its body + directly captured `m` tensors, the `args` will contain `2 * k + m` values. + + In each iteration, the while_loop body recomputes the condition for all + active pfor iterations to see which of them are now done. It then partitions + all the inputs and passes them along to the converted body. Values for all + the iterations that are done are written to TensorArrays indexed by the pfor + iteration number. When all iterations are done, the TensorArrays are stacked + to get the final value. + + Args: + pfor_input: A PForInput object corresponding to the output of any Exit + node from this while loop. + + Returns: + List of converted outputs. + """ + # Create init_values that will be passed to the while_loop. + init_values, inputs_stacked, shape_invariants = self._create_init_values( + pfor_input) + # Note that we use a list as a hack since we need the nested function body + # to set the value of cond_is_stacked. python2.x doesn't support nonlocal + # variables. + cond_is_stacked = [None] + + def cond(not_all_done, *_): + return not_all_done + + def body(not_all_done, indices, *args): + # See documentatin for __call__ for the structure of *args. + num_enters = len(self._enters) + inputs = args[:num_enters] + output_tas = args[num_enters:] + # TODO(agarwal): see which outputs have consumers and only populate the + # TensorArrays corresonding to those. Or do those paths get trimmed out + # from inside the while_loop body? + assert len(inputs) >= len(output_tas) + assert len(inputs) == len(inputs_stacked) + + # Convert condition + with ops.name_scope("while_cond"): + # Note that we set cond_stacked to True here. At this point we don't + # know if it could be loop invariant, hence the conservative value is + # to assume stacked. + cond_pfor = self._init_pfor(pfor_input.pfor, indices, + cond_stacked=True, + inputs=inputs, + inputs_stacked=inputs_stacked) + conditions, cond_stacked, _ = cond_pfor._convert_helper(self._condition) + cond_is_stacked[0] = cond_stacked + + # Recompute the new condition, write outputs of done iterations, and + # partition the inputs if needed. + if not cond_stacked: + (not_all_done, new_indices, + new_inputs, new_output_tas) = self._process_cond_unstacked( + conditions, indices, inputs, output_tas) + else: + (not_all_done, new_indices, + new_inputs, new_output_tas) = self._process_cond_stacked( + conditions, indices, inputs, inputs_stacked, output_tas) + + # Convert body + with ops.name_scope("while_body"): + # Compute the outputs from the body. + new_outputs = self._process_body(pfor_input, inputs_stacked, + new_indices, cond_stacked, new_inputs, + not_all_done) + + # Note that the first num_outputs new values of inputs are computed using + # the body. Rest of them were direct Enters into the condition/body and + # the partitioning done earlier is sufficient to give the new value. + num_outputs = len(self._outputs) + new_args = ([not_all_done, new_indices] + new_outputs + list( + new_inputs[num_outputs:]) + new_output_tas) + return tuple(new_args) + + while_outputs = control_flow_ops.while_loop( + cond, body, init_values, shape_invariants=shape_invariants) + output_tas = while_outputs[-len(self._outputs):] + outputs = [] + assert cond_is_stacked[0] is not None + for inp_stacked, ta in zip(inputs_stacked, output_tas): + if cond_is_stacked[0]: + outputs.append(wrap(ta.stack(), True)) + else: + # Note that if while_loop condition is unstacked, all iterations exit at + # the same time and we wrote those outputs in index 0 of the tensor + # array. + outputs.append(wrap(ta.read(0), inp_stacked)) + return outputs + + +class _PforInput(object): + """Input object passed to registered pfor converters.""" + + def __init__(self, pfor, op, inputs): + """Creates a _PforInput object. + + Args: + pfor: PFor converter object. + op: the Operation object that is being converted. + inputs: list of WrappedTensor objects representing converted values of the + inputs of `op`. + """ + self.pfor = pfor + self._op = op + self._inputs = inputs + + def stack_inputs(self, stack_indices=None): + """Stacks unstacked inputs at `stack_indices`. + + Args: + stack_indices: indices of inputs at which stacking is done. If None, + stacking is done at all indices. + """ + if stack_indices is None: + stack_indices = range(len(self._inputs)) + length = self.pfor.loop_len_vector + for i in stack_indices: + inp = self._inputs[i] + if not inp.is_stacked: + self._inputs[i] = _stack(inp.t, length) + + def expanddim_inputs_for_broadcast(self): + """Reshapes stacked inputs to prepare them for broadcast. + + Since stacked inputs have an extra leading dimension, automatic broadcasting + rules could incorrectly try to expand dimensions before that leading + dimension. To avoid that, we reshape these stacked inputs to the maximum + rank they will need to be broadcasted to. + """ + if not self._inputs: + return + + # Find max rank + def _get_rank(x): + rank = array_ops.rank(x.t) + if not x.is_stacked: + rank += 1 + return rank + + ranks = [_get_rank(x) for x in self._inputs] + max_rank = ranks[0] + for rank in ranks[1:]: + max_rank = math_ops.maximum(rank, max_rank) + + for i, inp in enumerate(self._inputs): + if inp.is_stacked: + shape = array_ops.shape(inp.t) + rank_diff = array_ops.reshape(max_rank - ranks[i], [1]) + ones = array_ops.tile([1], rank_diff) + new_shape = array_ops.concat([shape[:1], ones, shape[1:]], axis=0) + self._inputs[i] = wrap(array_ops.reshape(inp.t, new_shape), True) + + @property + def inputs(self): + return self._inputs + + @property + def num_inputs(self): + return len(self._inputs) + + def input(self, index): + assert len(self._inputs) > index, (index, self._inputs) + return self._inputs[index] + + def stacked_input(self, index): + t, is_stacked, _ = self.input(index) + if not is_stacked: + op_type = self.op_type + op_def = getattr(self._op, "op_def", None) + if op_def is None: + input_name = "at index %d" % index + else: + input_name = "\"%s\"" % op_def.input_arg[index].name + raise ValueError("Input %s of op \"%s\" expected to be not loop invariant" + ".\nError while converting op %s" + "with converted inputs\n%s" % (input_name, op_type, + self._op, self.inputs)) + return t + + def unstacked_input(self, index): + t, is_stacked, _ = self.input(index) + if is_stacked: + op_type = self.op_type + op_def = getattr(self._op, "op_def", None) + if op_def is None: + input_name = "at index %d" % index + else: + input_name = "\"%s\"" % op_def.input_arg[index].name + raise ValueError("Input %s of op \"%s\" expected to be loop invariant" + ".\nError while converting op %s" + "with converted inputs\n%s" % (input_name, op_type, + self._op, self.inputs)) + return t + + @property + def op(self): + return self._op + + @property + def op_type(self): + return self._op.type + + def get_attr(self, attr): + return self._op.get_attr(attr) + + @property + def outputs(self): + return self._op.outputs + + def output(self, index): + assert index < len(self._op.outputs) + return self._op.outputs[index] + + +_pfor_converter_registry = {} + + +class RegisterPFor(object): + """Utility to register converters for pfor. + + Usage: + @RegisterPFor(foo_op_type) + def _foo_converter(pfor_input): + ... + + The above will register conversion function `_foo_converter` for handling + conversion of `foo_op_type`. During conversion, the registered functin will be + called with a single argument of type `PForInput` which will contain state + needed for the conversion. This registered function should output a list of + WrappedTensor object with the same length as the number of outputs of op being + converted. If the op had zero outputs, then it should return a ops.Operation + object. + """ + + def __init__(self, op_type): + """Creates an object to register a converter for op with type `op_type`.""" + self.op_type = op_type + + def __call__(self, converter): + name = self.op_type + assert name not in _pfor_converter_registry, "Re-registering %s " % name + _pfor_converter_registry[name] = converter + return converter + + +class RegisterPForWithArgs(RegisterPFor): + """Utility to register converters for pfor. + + Usage: + @RegisteRPFor(foo_op_type, foo=value, ....) + def _foo_converter(pfor_input, foo=None, ....): + ... + + See RegisterPFor for details on the conversion function. + `RegisterPForWithArgs` allows binding extra arguments to the + conversion function at registration time. + """ + + def __init__(self, op_type, *args, **kw_args): + super(RegisterPForWithArgs, self).__init__(op_type) + self._args = args + self._kw_args = kw_args + + def __call__(self, converter): + + def _f(pfor_input): + return converter(pfor_input, self.op_type, *self._args, **self._kw_args) + + super(RegisterPForWithArgs, self).__call__(_f) + return converter + + +def _create_op(op_type, inputs, op_dtypes, attrs=None): + """Utility to create an op.""" + return ops.get_default_graph().create_op( + op_type, inputs, op_dtypes, attrs=attrs, compute_device=True) + + +WrappedTensor = collections.namedtuple("WrappedTensor", + ["t", "is_stacked", "is_sparse_stacked"]) +"""Wrapper around the result of a Tensor conversion. + +The additional fields are useful for keeping track of the conversion state as +data flows through the ops in the loop body. For every op whose output is a +Tensor, its converter should return either a WrappedTensor or a list of +WrappedTensors. + +Args: + t: The converted tensor + is_stacked: True if the tensor is stacked, i.e. represents the results of all + the iterations of the loop, where each row i of the tensor corresponds to + that op's output on iteration i of the loop. False if the tensor is not + stacked, i.e. represents the result of the op on of a single iteration of + the loop, where the result does not vary between iterations. + is_sparse_stacked: True if the tensor corresponds to a component tensor + (indices, values, or dense_shape) of a sparse tensor, and has been logically + stacked via a sparse conversion. +""" + + +def wrap(tensor, is_stacked=True, is_sparse_stacked=False): + """Helper to create a WrappedTensor object.""" + assert isinstance(is_stacked, bool) + assert isinstance(is_sparse_stacked, bool) + assert isinstance(tensor, ops.Tensor) + assert not is_sparse_stacked or is_stacked, ("If the wrapped tensor is " + "stacked via a sparse " + "conversion, it must also be " + "stacked.") + return WrappedTensor(tensor, is_stacked, is_sparse_stacked) + + +def _fallback_converter(pfor_input): + logging.warn("Using a while_loop for converting %s", pfor_input.op_type) + output_dtypes = [x.dtype for x in pfor_input.outputs] + iters = pfor_input.pfor.loop_len_vector[0] + + def while_body(i, *ta_list): + """Body of while loop.""" + inputs = [ + x[i, ...] if stacked else x for x, stacked, _ in pfor_input.inputs + ] + op_outputs = _create_op( + pfor_input.op_type, + inputs, + output_dtypes, + attrs=pfor_input.op.node_def.attr).outputs + + outputs = [] + for out, ta in zip(op_outputs, ta_list): + assert isinstance(out, ops.Tensor) + outputs.append(ta.write(i, array_ops.expand_dims(out, 0))) + return tuple([i + 1] + outputs) + + ta_list = control_flow_ops.while_loop( + lambda i, *ta: i < iters, while_body, [0] + [ + tensor_array_ops.TensorArray(dtype, iters) for dtype in output_dtypes + ])[1:] + return tuple([wrap(ta.concat(), True) for ta in ta_list]) + + +class PFor(object): + """Implementation of rewrite of parallel-for loops. + + This class takes a DAG or a set of DAGs representing the body of a + parallel-for loop, and adds new operations to the graph that implements + functionality equivalent to running that loop body for a specified number of + iterations. This new set of nodes may or may not use a tensorflow loop + construct. + + The process of conversion does not delete or change any existing operations. + It only adds operations that efficiently implement the equivalent + functionality. We refer to the added ops as "converted ops". + + The conversion process uses a simple greedy heuristic. It walks the loop body + and tries to express the functionality of running each node in a loop with a + new set of nodes. When converting an op several cases are possible: + - The op is not inside the loop body. Hence it can be used as is. + - The op does not depend on the iteration number and is stateless. In this + case, it can be used as is. + - The op is not stateful, and depends on iteration number only through control + dependencies. In this case, we can create a single op with same inputs and + attributes, but with "converted" control dependencies. + - The op is not stateful, and all its inputs are loop invariant. In this + case, similar to above, we can create a single op with same inputs and + attributes, but with "converted" control dependencies. + - The op is stateful or at least one of the inputs is not loop invariant. In + this case, we run the registered converter for that op to create a set of + converted ops. All nodes in the set will have converted control dependencies + corresponding to control dependencies of the original op. If the op returned + multiple outputs, "converted outputs" could be produced by different ops in + this set. + """ + + def __init__(self, + loop_var, + loop_len, + pfor_ops, + all_indices=None, + all_indices_partitioned=False): + """Creates an object to rewrite a parallel-for loop. + + Args: + loop_var: ops.Tensor output of a Placeholder operation. The value should + be an int32 scalar representing the loop iteration number. + loop_len: A scalar or scalar Tensor representing the number of iterations + the loop is run for. + pfor_ops: List of all ops inside the loop body. + all_indices: If not None, an int32 vector with size `loop_len` + representing the iteration ids that are still active. These values + should be unique and sorted. However they may not be contiguous. This is + typically the case when inside a control flow construct which has + partitioned the indices of the iterations that are being converted. + all_indices_partitioned: If True, this object is being constructed from a + control flow construct where not all the pfor iterations are guaranteed + to be active. + """ + assert isinstance(loop_var, ops.Tensor) + assert loop_var.op.type == "Placeholder" + self._loop_var = loop_var + loop_len_value = tensor_util.constant_value(loop_len) + if loop_len_value is not None: + loop_len = loop_len_value + self._loop_len_vector = array_ops.reshape(loop_len, [1]) + self._all_indices_partitioned = all_indices_partitioned + if all_indices_partitioned: + assert all_indices is not None + self.all_indices = ( + math_ops.range(loop_len) if all_indices is None else all_indices) + + self._conversion_map = {} + self._conversion_map[loop_var] = wrap(self.all_indices, True) + self._pfor_ops = set(pfor_ops) + self._pfor_op_ids = set([x._id for x in pfor_ops]) + + def op_is_inside_loop(self, op): + """True if op was created inside the pfor loop body.""" + assert isinstance(op, ops.Operation) + # Note that we use self._pfor_op_ids for the check and not self._pfor_ops + # since it appears there tensorflow API could return different python + # objects representing the same Operation node. + return op._id in self._pfor_op_ids + + def _convert_sparse(self, y): + """Returns the converted value corresponding to SparseTensor y. + + For SparseTensors, instead of stacking the component tensors separately, + resulting in component tensors with shapes (N, m, rank), (N, m), and (N, + rank) respectively for indices, values, and dense_shape (where N is the loop + length and m is the number of sparse tensor values per loop iter), we want + to logically stack the SparseTensors, to create a SparseTensor whose + components are size (N * m, rank + 1), (N * m, ), and (rank + 1,) + respectively. + + Here, we try to get the conversion of each component tensor. + If the tensors are stacked via a sparse conversion, return the resulting + SparseTensor composed of the converted components. Otherwise, the component + tensors are either unstacked or stacked naively. In the latter case, we + unstack the component tensors to reform loop_len SparseTensor elements, + then correctly batch them. + + The unstacked tensors must have the same rank. Each dimension of each + SparseTensor will expand to be the largest among all SparseTensor elements + for that dimension. For example, if there are N SparseTensors of rank 3 + being stacked, with N dense shapes, where the i_th shape is (x_i, y_i, z_i), + the new dense shape will be (N, max_i(x_i), max_i(y_i), max_i(z_i)). + + Args: + y: A tf.SparseTensor. + + Returns: + A tf.SparseTensor that is the converted value corresponding to y. + """ + outputs = [ + self._convert_helper(t) for t in (y.indices, y.values, y.dense_shape) + ] + assert all(isinstance(o, WrappedTensor) for o in outputs) + + if all(w.is_sparse_stacked for w in outputs): + return sparse_tensor.SparseTensor(*[w.t for w in outputs]) + + assert not any(w.is_sparse_stacked for w in outputs), ( + "Error converting SparseTensor. All components should be logically " + "stacked, or none.") + + # If component tensors were not sparsely stacked, they are either unstacked + # or stacked without knowledge that they are components of sparse tensors. + # In this case, we have to restack them. + return self._restack_sparse_tensor_logically( + *[self._unwrap_or_tile(w) for w in outputs]) + + def _restack_sparse_tensor_logically(self, indices, values, shape): + sparse_tensor_rank = indices.get_shape()[-1].value + if sparse_tensor_rank is not None: + sparse_tensor_rank += 1 + + def map_fn(args): + res = gen_sparse_ops.serialize_sparse( + args[0], args[1], args[2], out_type=dtypes.variant) + return res + + # Applies a map function to the component tensors to serialize each + # sparse tensor element and batch them all, then deserializes the batch. + # TODO(rachelim): Try to do this without map_fn -- add the right offsets + # to shape and indices tensors instead. + result = functional_ops.map_fn( + map_fn, [indices, values, shape], dtype=dtypes.variant) + return sparse_ops.deserialize_sparse( + result, dtype=values.dtype, rank=sparse_tensor_rank) + + def _unwrap_or_tile(self, wrapped_tensor): + """Given a wrapped tensor, unwrap if stacked. Otherwise, tiles it.""" + output, is_stacked = wrapped_tensor.t, wrapped_tensor.is_stacked + if is_stacked: + return output + else: + return _stack(output, self._loop_len_vector).t + + def convert(self, y): + """Returns the converted value corresponding to y. + + Args: + y: A ops.Tensor or a ops.Operation object. If latter, y should not have + any outputs. + + Returns: + If y does not need to be converted, it returns y as is. Else it returns + the "converted value" corresponding to y. + """ + if isinstance(y, sparse_tensor.SparseTensor): + return self._convert_sparse(y) + output = self._convert_helper(y) + if isinstance(output, WrappedTensor): + assert isinstance(y, ops.Tensor) + return self._unwrap_or_tile(output) + else: + assert isinstance(y, ops.Operation) + assert not y.outputs + assert isinstance(output, ops.Operation) + return output + + def _was_converted(self, t): + """True if t is not a conversion of itself.""" + converted_t = self._conversion_map[t] + return converted_t.t is not t + + def _add_conversion(self, old_output, new_output): + self._conversion_map[old_output] = new_output + + def _convert_helper(self, op_or_tensor): + stack = [op_or_tensor] + while stack: + y = stack[0] + if y in self._conversion_map: + assert isinstance(self._conversion_map[y], + (WrappedTensor, ops.Operation)) + stack.pop(0) + continue + if isinstance(y, ops.Operation): + assert not y.outputs, ( + "We only support converting Operation objects with no outputs. " + "Got %s", y) + y_op = y + else: + assert isinstance(y, ops.Tensor), y + y_op = y.op + + is_while_loop = y_op.type == "Exit" + if is_while_loop: + while_op = WhileOp(y, pfor_ops=self._pfor_ops) + is_inside_loop = while_op.is_inside_loop + # If all nodes in the while_loop graph were created inside the pfor, we + # treat the whole loop subgraph as a single op (y_op) and try to convert + # it. For while_loops that are created completely or partially outside, + # we treat them as external and should be able to simply return the Exit + # node output as is without needing any conversion. Note that for + # while_loops that are partially constructed inside, we assume they will + # be loop invariant. If that is not the case, it will create runtime + # errors since the converted graph would depend on the self._loop_var + # placeholder. + if is_inside_loop: + y_op = while_op + else: + is_inside_loop = self.op_is_inside_loop(y_op) + + # If this op was not created inside the loop body, we will return as is. + # 1. Convert inputs and control inputs. + + def _add_to_stack(x): + if x not in self._conversion_map: + stack.insert(0, x) + return True + else: + return False + + if is_inside_loop: + added_to_stack = False + for inp in y_op.inputs: + added_to_stack |= _add_to_stack(inp) + for cinp in y_op.control_inputs: + if cinp.outputs: + for t in cinp.outputs: + added_to_stack |= _add_to_stack(t) + else: + added_to_stack |= _add_to_stack(cinp) + if added_to_stack: + continue + + converted_inputs = [self._conversion_map[inp] for inp in y_op.inputs] + some_input_converted = any( + [self._was_converted(x) for x in y_op.inputs]) + some_input_stacked = any([x.is_stacked for x in converted_inputs]) + + converted_control_ops = set() + some_control_input_converted = False + for cinp in y_op.control_inputs: + if cinp.outputs: + for t in cinp.outputs: + converted_t = self._conversion_map[t] + if self._was_converted(t): + some_control_input_converted = True + converted_control_ops.add(converted_t.t.op) + else: + converted_cinp = self._conversion_map[cinp] + assert isinstance(converted_cinp, ops.Operation) + if converted_cinp != cinp: + some_control_input_converted = True + converted_control_ops.add(converted_cinp) + converted_control_ops = list(converted_control_ops) + is_stateful = _is_stateful_pfor_op(y_op) + else: + converted_inputs = [] + converted_control_ops = [] + logging.vlog(3, "converting op:%s\ninputs:%s\ncontrol_inputs:%s", y_op, + converted_inputs, converted_control_ops) + + # 2. Convert y_op + # If converting a while_loop, we let the while_loop convertor deal with + # putting the control dependencies appropriately. + control_dependencies = [] if is_while_loop else converted_control_ops + with ops.control_dependencies(control_dependencies), ops.name_scope( + y_op.name + "/pfor/"): + # None of the inputs and control inputs were converted. + if (not is_inside_loop or + (not is_stateful and not some_input_converted and + not some_control_input_converted)): + if y == y_op: + assert not isinstance(y_op, WhileOp) + new_outputs = y_op + else: + new_outputs = [wrap(x, False) for x in y_op.outputs] + elif not (is_stateful or is_while_loop or some_input_stacked): + # All inputs are unstacked or uncoverted but some control inputs are + # converted. + # TODO(rachelim): Handle the case where some inputs are sparsely + # stacked (i.e. any([x.is_sparse_stacked for x in converted_inputs])) + new_op = _create_op(y_op.type, [x.t for x in converted_inputs], + [x.dtype for x in y_op.outputs], + y_op.node_def.attr) + if y == y_op: + new_outputs = new_op + else: + new_outputs = [wrap(x, False) for x in new_op.outputs] + else: + # Either some inputs are not loop invariant or op is stateful. + if hasattr(y_op, "pfor_converter"): + converter = y_op.pfor_converter + else: + converter = _pfor_converter_registry.get(y_op.type, None) + if converter is None: + if flags.FLAGS.op_conversion_fallback_to_while_loop: + converter = _fallback_converter + else: + raise ValueError( + "No converter defined for %s\n%s\ninputs: %s. " + "\nEither add a converter or set " + "--op_conversion_fallback_to_while_loop=True, " + "which may run slower" % (y_op.type, y_op, converted_inputs)) + # TODO(rachelim): Handle the case where some inputs are sparsely + # stacked. We should only call the converter if it supports handling + # those inputs. + new_outputs = converter(_PforInput(self, y_op, converted_inputs)) + if isinstance(new_outputs, WrappedTensor): + new_outputs = [new_outputs] + assert isinstance(new_outputs, + (list, tuple, ops.Operation)), new_outputs + logging.vlog(2, "converted %s %s", y_op, new_outputs) + + # Insert into self._conversion_map + if y == y_op: + assert isinstance(new_outputs, ops.Operation) + self._add_conversion(y_op, new_outputs) + else: + for old_output, new_output in zip(y_op.outputs, new_outputs): + assert isinstance(new_output, WrappedTensor), (new_output, y, y_op) + self._add_conversion(old_output, new_output) + stack.pop(0) + + return self._conversion_map[op_or_tensor] + + @property + def loop_len_vector(self): + """Returns a single element vector whose value is number of iterations.""" + return self._loop_len_vector + + @property + def loop_var(self): + """Returns placeholder loop variable.""" + return self._loop_var + + @property + def pfor_ops(self): + return self._pfor_ops + + @property + def all_indices_partitioned(self): + """all_indices_partitioned property. + + Returns: + True if we are inside a control flow construct and not all pfor iterations + may be active. + """ + return self._all_indices_partitioned + +# nn_ops + + +def _flatten_first_two_dims(x): + """Merges first two dimensions.""" + old_shape = array_ops.shape(x) + new_shape = array_ops.concat([[-1], old_shape[2:]], axis=0) + return array_ops.reshape(x, new_shape) + + +def _unflatten_first_dim(x, first_dim): + """Splits first dimension into [first_dim, -1].""" + old_shape = array_ops.shape(x) + new_shape = array_ops.concat([first_dim, [-1], old_shape[1:]], axis=0) + return array_ops.reshape(x, new_shape) + + +def _inputs_with_flattening(pfor_input, input_indices): + """Stacks and flattens first dim of inputs at indices `input_indices`.""" + if input_indices is None: + input_indices = [] + pfor_input.stack_inputs(stack_indices=input_indices) + inputs = [] + for i in range(pfor_input.num_inputs): + if i in input_indices: + inp = pfor_input.stacked_input(i) + inp = _flatten_first_two_dims(inp) + else: + inp = pfor_input.unstacked_input(i) + inputs.append(inp) + return inputs + + +@RegisterPForWithArgs("Conv2D", dims=[0]) +@RegisterPForWithArgs("AvgPool", dims=[0]) +@RegisterPForWithArgs("MaxPool", dims=[0]) +@RegisterPForWithArgs("MaxPoolGrad", dims=[0, 1, 2]) +@RegisterPForWithArgs("SoftmaxCrossEntropyWithLogits", dims=[0, 1]) +def _convert_flatten_batch(pfor_input, op_type, dims): + del op_type + inputs = _inputs_with_flattening(pfor_input, dims) + outputs = _create_op( + pfor_input.op_type, + inputs, [x.dtype for x in pfor_input.outputs], + attrs=pfor_input.op.node_def.attr).outputs + n = pfor_input.pfor.loop_len_vector + outputs = [_unflatten_first_dim(x, n) for x in outputs] + return [wrap(x, True) for x in outputs] + + +_channel_flatten_input_cache = {} + + +def _channel_flatten_input(x, data_format): + """Merge the stack dimension with the channel dimension. + + If S is pfor's stacking dimension, then, + - for SNCHW, we transpose to NSCHW. If N dimension has size 1, the transpose + should be cheap. + - for SNHWC, we transpose to NHWCS. + We then merge the S and C dimension. + + Args: + x: ops.Tensor to transform. + data_format: "NCHW" or "NHWC". + + Returns: + A 3-element tuple with the transformed value, along with the shape for + reshape and order for transpose required to transform back. + """ + + graph = ops.get_default_graph() + cache_key = (graph, x, data_format) + if cache_key not in _channel_flatten_input_cache: + x_shape = array_ops.shape(x) + if data_format == b"NCHW": + order = [1, 0, 2, 3, 4] + shape = array_ops.concat([x_shape[1:2], [-1], x_shape[3:]], axis=0) + reverse_order = order + else: + order = [1, 2, 3, 0, 4] + shape = array_ops.concat([x_shape[1:4], [-1]], axis=0) + reverse_order = [3, 0, 1, 2, 4] + # Move S dimension next to C dimension. + x = array_ops.transpose(x, order) + reverse_shape = array_ops.shape(x) + # Reshape to merge the S and C dimension. + x = array_ops.reshape(x, shape) + outputs = x, reverse_order, reverse_shape + _channel_flatten_input_cache[cache_key] = outputs + else: + outputs = _channel_flatten_input_cache[cache_key] + return outputs + + +# Note that with training=True, running FusedBatchNorm on individual examples +# is very different from running FusedBatchNorm on a batch of those examples. +# This is because, for the latter case, the operation can be considered as first +# computing the mean and variance over all the examples and then using these +# to scale all those examples. This creates a data dependency between these +# different "iterations" since the inputs to the scaling step depends on the +# statistics coming from all these inputs. +# As with other kernels, the conversion here effectively runs the kernel +# independently for each iteration, and returns outputs by stacking outputs from +# each of those iterations. +@RegisterPFor("FusedBatchNorm") +def _convert_fused_batch_norm(pfor_input): + is_training = pfor_input.get_attr("is_training") + # When BatchNorm is used with training=False, mean and variance are provided + # externally and used as is by the op. Thus, we can merge the S and N + # dimensions as we do for regular operations. + # When BatchNorm is used with training=True, mean and variance are computed + # for each channel across the batch dimension (first one). If we merge S and N + # dimensions, mean and variances will be computed over a larger set. So, we + # merge the S and C dimensions instead. + if not is_training: + # We return zeros for batch_mean and batch_variance output. Note that CPU + # and GPU seem to have different behavior for those two outputs. CPU outputs + # zero because these values are not used during inference. GPU outputs + # something, probably real means and variances. + inputs = _inputs_with_flattening(pfor_input, [0]) + outputs = _create_op( + pfor_input.op_type, + inputs, [x.dtype for x in pfor_input.outputs], + attrs=pfor_input.op.node_def.attr).outputs + y = outputs[0] + n = pfor_input.pfor.loop_len_vector + y = _unflatten_first_dim(y, n) + mean = pfor_input.unstacked_input(3) + zeros = array_ops.zeros_like(mean) + return [wrap(y, True), wrap(zeros, False), wrap(zeros, False)] + + pfor_input.stack_inputs() + data_format = pfor_input.get_attr("data_format") + # We merge the first dimension with the "C" dimension, run FusedBatchNorm, and + # then transpose back. + x = pfor_input.stacked_input(0) + x, reverse_order, reverse_shape = _channel_flatten_input(x, data_format) + # Note that we stack all the other inputs as well so that they are the same + # size as the new size of the channel dimension. + inputs = [x] + [ + array_ops.reshape(pfor_input.stacked_input(i), [-1]) + for i in range(1, pfor_input.num_inputs) + ] + outputs = _create_op( + pfor_input.op_type, + inputs, [x.dtype for x in pfor_input.outputs], + attrs=pfor_input.op.node_def.attr).outputs + y = outputs[0] + y = array_ops.reshape(y, reverse_shape) + y = array_ops.transpose(y, reverse_order) + n = pfor_input.pfor.loop_len_vector + outputs = [_unflatten_first_dim(x, n) for x in outputs[1:]] + outputs = [y] + outputs + return [wrap(x, True) for x in outputs] + + +@RegisterPFor("FusedBatchNormGrad") +def _convert_fused_batch_norm_grad(pfor_input): + pfor_input.stack_inputs() + data_format = pfor_input.get_attr("data_format") + y_backprop = pfor_input.stacked_input(0) + y_backprop, _, _ = _channel_flatten_input(y_backprop, data_format) + x = pfor_input.stacked_input(1) + x, x_reverse_order, x_reverse_shape = _channel_flatten_input(x, data_format) + inputs = [y_backprop, x] + [ + array_ops.reshape(pfor_input.stacked_input(i), [-1]) + for i in range(2, pfor_input.num_inputs) + ] + outputs = _create_op( + pfor_input.op_type, + inputs, [x.dtype for x in pfor_input.outputs], + attrs=pfor_input.op.node_def.attr).outputs + x_backprop = outputs[0] + x_backprop = array_ops.reshape(x_backprop, x_reverse_shape) + x_backprop = array_ops.transpose(x_backprop, x_reverse_order) + n = pfor_input.pfor.loop_len_vector + outputs = [_unflatten_first_dim(x, n) for x in outputs[1:]] + outputs = [x_backprop] + outputs + return [wrap(output, True) for output in outputs] + + +@RegisterPForWithArgs("Conv2DBackpropInput", flatten_dims=[2], shape_dim=0) +@RegisterPForWithArgs("AvgPoolGrad", flatten_dims=[1], shape_dim=0) +def _convert_flatten_batch_shape_input(pfor_input, op_type, flatten_dims, + shape_dim): + del op_type + inputs = _inputs_with_flattening(pfor_input, flatten_dims) + n = pfor_input.pfor.loop_len_vector + # Adjust the `input_sizes` input. + ones = array_ops.ones( + [array_ops.shape(inputs[shape_dim])[0] - 1], dtype=n.dtype) + inputs[shape_dim] *= array_ops.concat([n, ones], axis=0) + outputs = _create_op( + pfor_input.op_type, + inputs, [x.dtype for x in pfor_input.outputs], + attrs=pfor_input.op.node_def.attr).outputs + outputs = [_unflatten_first_dim(x, n) for x in outputs] + return [wrap(x, True) for x in outputs] + + +@RegisterPFor("Conv2DBackpropFilter") +def _convert_conv2d_backprop_filter(pfor_input): + pfor_input.stack_inputs(stack_indices=[2]) + inputs, inputs_stacked, _ = pfor_input.input(0) + filter_sizes = pfor_input.unstacked_input(1) + grads = pfor_input.stacked_input(2) + strides = pfor_input.get_attr("strides") + padding = pfor_input.get_attr("padding") + use_cudnn_on_gpu = pfor_input.get_attr("use_cudnn_on_gpu") + data_format = pfor_input.get_attr("data_format") + dilations = pfor_input.get_attr("dilations") + if inputs_stacked: + # TODO(agarwal): Implement this efficiently. + logging.warn("Conv2DBackpropFilter uses a while_loop. Fix that!") + + def while_body(i, ta): + inp_i = inputs[i, ...] + grad_i = grads[i, ...] + output = nn_ops.conv2d_backprop_filter( + inp_i, + filter_sizes, + grad_i, + strides=strides, + padding=padding, + use_cudnn_on_gpu=use_cudnn_on_gpu, + data_format=data_format, + dilations=dilations) + return i + 1, ta.write(i, array_ops.expand_dims(output, 0)) + + n = array_ops.reshape(pfor_input.pfor.loop_len_vector, []) + _, ta = control_flow_ops.while_loop( + lambda i, ta: i < n, while_body, + (0, tensor_array_ops.TensorArray(inputs.dtype, n))) + output = ta.concat() + return wrap(output, True) + else: + # We merge the stack dimension with the channel dimension of the gradients + # and pretend we had a larger filter (see change to filter_sizes below). + # Once the filter backprop is computed, we reshape and transpose back + # appropriately. + grads, _, _ = _channel_flatten_input(grads, data_format) + n = pfor_input.pfor.loop_len_vector + old_filter_sizes = filter_sizes + filter_sizes *= array_ops.concat([[1, 1, 1], n], axis=0) + output = nn_ops.conv2d_backprop_filter( + inputs, + filter_sizes, + grads, + strides=strides, + padding=padding, + use_cudnn_on_gpu=use_cudnn_on_gpu, + data_format=data_format, + dilations=dilations) + new_filter_shape = array_ops.concat([old_filter_sizes[:3], n, [-1]], axis=0) + output = array_ops.reshape(output, new_filter_shape) + output = array_ops.transpose(output, [3, 0, 1, 2, 4]) + return wrap(output, True) + + +# array_ops + + +@RegisterPForWithArgs("Identity", array_ops.identity) +@RegisterPForWithArgs("StopGradient", array_ops.stop_gradient) +def _convert_identity(pfor_input, op_type, op_func): + del op_type + return wrap(op_func(*[x.t for x in pfor_input.inputs]), True) + + +@RegisterPFor("Reshape") +def _convert_reshape(pfor_input): + t = pfor_input.stacked_input(0) + shape = pfor_input.unstacked_input(1) + new_dim = array_ops.shape(t)[:1] + new_shape = array_ops.concat([new_dim, shape], axis=0) + return wrap(array_ops.reshape(t, new_shape), True) + + +@RegisterPFor("ExpandDims") +def _convert_expanddims(pfor_input): + t = pfor_input.stacked_input(0) + dim = pfor_input.unstacked_input(1) + dim += math_ops.cast(dim >= 0, dtypes.int32) + return wrap(array_ops.expand_dims(t, axis=dim), True) + + +@RegisterPFor("Slice") +def _convert_slice(pfor_input): + t = pfor_input.stacked_input(0) + begin = pfor_input.unstacked_input(1) + size = pfor_input.unstacked_input(2) + begin = array_ops.concat([[0], begin], axis=0) + size = array_ops.concat([[-1], size], axis=0) + return wrap(array_ops.slice(t, begin, size), True) + + +@RegisterPFor("Tile") +def _convert_tile(pfor_input): + t = pfor_input.stacked_input(0) + multiples = pfor_input.unstacked_input(1) + multiples = array_ops.concat([[1], multiples], 0) + return wrap(array_ops.tile(t, multiples), True) + + +@RegisterPFor("Pack") +def _convert_pack(pfor_input): + pfor_input.stack_inputs() + axis = pfor_input.get_attr("axis") + if axis >= 0: + axis += 1 + return wrap( + array_ops.stack([x.t for x in pfor_input.inputs], axis=axis), True) + + +@RegisterPFor("Unpack") +def _convert_unpack(pfor_input): + value = pfor_input.stacked_input(0) + axis = pfor_input.get_attr("axis") + if axis >= 0: + axis += 1 + num = pfor_input.get_attr("num") + return [wrap(x, True) for x in array_ops.unstack(value, axis=axis, num=num)] + + +@RegisterPFor("Pad") +def _convert_pad(pfor_input): + t = pfor_input.stacked_input(0) + paddings = pfor_input.unstacked_input(1) + paddings = array_ops.concat([[[0, 0]], paddings], 0) + return wrap(array_ops.pad(t, paddings, mode="CONSTANT"), True) + + +@RegisterPFor("Split") +def _convert_split(pfor_input): + split_dim = pfor_input.unstacked_input(0) + t = pfor_input.stacked_input(1) + num_split = pfor_input.get_attr("num_split") + split_dim += math_ops.cast(split_dim >= 0, dtypes.int32) + return [wrap(x, True) for x in array_ops.split(t, num_split, axis=split_dim)] + + +@RegisterPFor("Transpose") +def _convert_transpose(pfor_input): + t = pfor_input.stacked_input(0) + perm = pfor_input.unstacked_input(1) + new_perm = array_ops.concat([[0], perm + 1], axis=0) + return wrap(array_ops.transpose(t, new_perm), True) + + +@RegisterPFor("ZerosLike") +def _convert_zeroslike(pfor_input): + t = pfor_input.stacked_input(0) + shape = array_ops.shape(t)[1:] + return wrap(array_ops.zeros(shape, dtype=t.dtype), False) + + +@RegisterPFor("Gather") +@RegisterPFor("GatherV2") +def _convert_gather(pfor_input): + param, param_stacked, _ = pfor_input.input(0) + indices, indices_stacked, _ = pfor_input.input(1) + op_type = pfor_input.op_type + if op_type == "Gather": + validate_indices = pfor_input.get_attr("validate_indices") + axis = 0 + else: + validate_indices = None + axis = pfor_input.unstacked_input(2) + axis_value = tensor_util.constant_value(axis) + if axis_value is not None: + axis = axis_value + if indices_stacked and not param_stacked: + if indices == pfor_input.pfor.all_indices and axis == 0: + param_shape0 = param.shape[0].value + indices_shape0 = indices.shape[0].value + if param_shape0 is not None and indices_shape0 == param_shape0: + # Note that with loops and conditionals, indices may not be contiguous. + # However they will be sorted and unique. So if the shape matches, then + # it must be picking up all the rows of param. + return wrap(param, True) + # TODO(agarwal): use array_ops.slice here. + output = array_ops.gather( + param, indices, validate_indices=validate_indices, axis=axis) + if axis != 0: + axis = control_flow_ops.cond( + axis < 0, lambda: axis + array_ops.rank(param), lambda: axis) + order = array_ops.concat( + [[axis], + math_ops.range(axis), + math_ops.range(axis + 1, array_ops.rank(output))], + axis=0) + output = control_flow_ops.cond( + math_ops.equal(axis, 0), lambda: output, + lambda: array_ops.transpose(output, order)) + return wrap(output, True) + if param_stacked: + loop_len_vector = pfor_input.pfor.loop_len_vector + pfor_input.stack_inputs(stack_indices=[1]) + indices = pfor_input.stacked_input(1) + param_flat = _flatten_first_two_dims(param) + + # Recompute indices to handle stacked param. + indices_offset = math_ops.range( + loop_len_vector[0]) * array_ops.shape(param)[1] + # Reshape indices_offset to allow broadcast addition + ones = array_ops.ones([array_ops.rank(indices) - 1], dtype=dtypes.int32) + new_shape = array_ops.concat([loop_len_vector, ones], axis=0) + indices_offset = array_ops.reshape(indices_offset, new_shape) + indices += indices_offset + + # TODO(agarwal): handle axis != 0. May need to transpose param or + # array_ops.gather_nd. + if isinstance(axis, ops.Tensor): + axis_value = tensor_util.constant_value(axis) + else: + try: + axis_value = int(axis) + except TypeError: + axis_value = None + msg = ("Gather, where indices and param are both loop dependent, currently " + "requires axis=0") + if axis_value is not None and axis_value != 0: + raise ValueError("Error while converting %s. %s. Got axis=%d" % + (pfor_input.op, msg, axis)) + with ops.control_dependencies( + [check_ops.assert_equal(axis, 0, message=msg)]): + output = array_ops.gather(param_flat, indices) + return wrap(output, True) + + +@RegisterPFor("ConcatV2") +def _convert_concatv2(pfor_input): + n = pfor_input.num_inputs + pfor_input.stack_inputs(stack_indices=range(n - 1)) + axis = pfor_input.unstacked_input(n - 1) + axis += math_ops.cast(axis >= 0, axis.dtype) + return wrap( + array_ops.concat([x.t for x in pfor_input.inputs[:n - 1]], axis=axis), + True) + + +@RegisterPFor("StridedSlice") +def _convert_strided_slice(pfor_input): + inp = pfor_input.stacked_input(0) + begin = pfor_input.unstacked_input(1) + end = pfor_input.unstacked_input(2) + strides = pfor_input.unstacked_input(3) + begin_mask = pfor_input.get_attr("begin_mask") + end_mask = pfor_input.get_attr("end_mask") + ellipsis_mask = pfor_input.get_attr("ellipsis_mask") + new_axis_mask = pfor_input.get_attr("new_axis_mask") + shrink_axis_mask = pfor_input.get_attr("shrink_axis_mask") + + begin = array_ops.concat([[0], begin], axis=0) + end = array_ops.concat([[0], end], axis=0) + strides = array_ops.concat([[1], strides], axis=0) + begin_mask = begin_mask << 1 | 1 + end_mask = end_mask << 1 | 1 + ellipsis_mask <<= 1 + new_axis_mask <<= 1 + shrink_axis_mask <<= 1 + return wrap( + array_ops.strided_slice( + inp, + begin, + end, + strides, + begin_mask=begin_mask, + end_mask=end_mask, + ellipsis_mask=ellipsis_mask, + new_axis_mask=new_axis_mask, + shrink_axis_mask=shrink_axis_mask), True) + + +@RegisterPFor("StridedSliceGrad") +def _convert_strided_slice_grad(pfor_input): + shape = pfor_input.unstacked_input(0) + begin = pfor_input.unstacked_input(1) + end = pfor_input.unstacked_input(2) + strides = pfor_input.unstacked_input(3) + dy = pfor_input.stacked_input(4) + begin_mask = pfor_input.get_attr("begin_mask") + end_mask = pfor_input.get_attr("end_mask") + ellipsis_mask = pfor_input.get_attr("ellipsis_mask") + new_axis_mask = pfor_input.get_attr("new_axis_mask") + shrink_axis_mask = pfor_input.get_attr("shrink_axis_mask") + + shape = array_ops.concat([pfor_input.pfor.loop_len_vector, shape], axis=0) + begin = array_ops.concat([[0], begin], axis=0) + end = array_ops.concat([[0], end], axis=0) + strides = array_ops.concat([[1], strides], axis=0) + begin_mask = begin_mask << 1 | 1 + end_mask = end_mask << 1 | 1 + ellipsis_mask <<= 1 + new_axis_mask <<= 1 + shrink_axis_mask <<= 1 + return wrap( + array_ops.strided_slice_grad( + shape, + begin, + end, + strides, + dy, + begin_mask=begin_mask, + end_mask=end_mask, + ellipsis_mask=ellipsis_mask, + new_axis_mask=new_axis_mask, + shrink_axis_mask=shrink_axis_mask), True) + + +# math_ops + + +@RegisterPFor("MatMul") +def _convert_matmul(pfor_input): + # TODO(agarwal): Check if tiling is faster than two transposes. + a, a_stacked, _ = pfor_input.input(0) + b, b_stacked, _ = pfor_input.input(1) + tr_a = pfor_input.get_attr("transpose_a") + tr_b = pfor_input.get_attr("transpose_b") + if a_stacked and b_stacked: + output = wrap(math_ops.matmul(a, b, adjoint_a=tr_a, adjoint_b=tr_b), True) + return output + elif a_stacked: + if tr_a: + a = array_ops.transpose(a, [0, 2, 1]) + if a.shape.is_fully_defined(): + x, y, z = a.shape + else: + x, y, z = [ + array_ops.reshape(i, []) + for i in array_ops.split(array_ops.shape(a), 3) + ] + a = array_ops.reshape(a, [x * y, z]) + prod = math_ops.matmul(a, b, transpose_b=tr_b) + return wrap(array_ops.reshape(prod, [x, y, -1]), True) + else: + assert b_stacked + if tr_b: + perm = [2, 0, 1] + b = array_ops.transpose(b, perm) + else: + # As an optimization, if one of the first two dimensions is 1, then we can + # reshape instead of transpose. + # TODO(agarwal): This check can be done inside Transpose kernel. + b_shape = array_ops.shape(b) + min_dim = math_ops.minimum(b_shape[0], b_shape[1]) + perm = control_flow_ops.cond( + math_ops.equal(min_dim, 1), lambda: [0, 1, 2], lambda: [1, 0, 2]) + new_shape = array_ops.stack([b_shape[1], b_shape[0], b_shape[2]]) + b = array_ops.transpose(b, perm) + b = array_ops.reshape(b, new_shape) + + if b.shape.is_fully_defined(): + x, y, z = b.shape + else: + x, y, z = [ + array_ops.reshape(i, []) + for i in array_ops.split(array_ops.shape(b), 3) + ] + b = array_ops.reshape(b, [x, y * z]) + prod = math_ops.matmul(a, b, transpose_a=tr_a) + prod = array_ops.reshape(prod, [-1, y, z]) + prod = array_ops.transpose(prod, [1, 0, 2]) + return wrap(prod, True) + + +@RegisterPFor("BatchMatMul") +def _convert_batch_mat_mul(pfor_input): + # TODO(agarwal): There may be a more efficient way to do this instead of + # stacking the inputs. + pfor_input.stack_inputs() + x = pfor_input.stacked_input(0) + y = pfor_input.stacked_input(1) + adj_x = pfor_input.get_attr("adj_x") + adj_y = pfor_input.get_attr("adj_y") + + x = _flatten_first_two_dims(x) + y = _flatten_first_two_dims(y) + output = math_ops.matmul(x, y, adjoint_a=adj_x, adjoint_b=adj_y) + output = _unflatten_first_dim(output, pfor_input.pfor.loop_len_vector) + return wrap(output, True) + + +@RegisterPForWithArgs("Sum", math_ops.reduce_sum) +@RegisterPForWithArgs("Prod", math_ops.reduce_prod) +@RegisterPForWithArgs("Max", math_ops.reduce_max) +@RegisterPForWithArgs("Min", math_ops.reduce_min) +def _convert_reduction(pfor_input, _, op_func): + t = pfor_input.stacked_input(0) + indices = pfor_input.unstacked_input(1) + # Shift positive indices by one to account for the extra dimension. + indices += math_ops.cast(indices >= 0, dtypes.int32) + keep_dims = pfor_input.get_attr("keep_dims") + return wrap(op_func(t, indices, keepdims=keep_dims), True) + + +@RegisterPForWithArgs("Cumsum", math_ops.cumsum) +@RegisterPForWithArgs("Cumprod", math_ops.cumprod) +def _convert_cumfoo(pfor_input, _, op_func): + t = pfor_input.stacked_input(0) + axis = pfor_input.unstacked_input(1) + # Shift positive indices by one to account for the extra dimension. + axis += math_ops.cast(axis >= 0, dtypes.int32) + exclusive = pfor_input.get_attr("exclusive") + reverse = pfor_input.get_attr("reverse") + return wrap(op_func(t, axis, exclusive=exclusive, reverse=reverse), True) + + +@RegisterPFor("BiasAdd") +def _convert_biasadd(pfor_input): + t = pfor_input.stacked_input(0) + bias = pfor_input.unstacked_input(1) + data_format = pfor_input.get_attr("data_format") + if data_format != b"NCHW": + return wrap(nn_ops.bias_add(t, bias, data_format=data_format), True) + shape = array_ops.shape(t) + flattened_shape = array_ops.concat([[-1], shape[2:]], axis=0) + t = array_ops.reshape(t, flattened_shape) + t = nn_ops.bias_add(t, bias, data_format=b"NCHW") + t = array_ops.reshape(t, shape) + return wrap(t, True) + + +@RegisterPFor("UnsortedSegmentSum") +def _convert_unsortedsegmentsum(pfor_input): + data, data_stacked, _ = pfor_input.input(0) + # TODO(agarwal): handle unstacked? + segment_ids = pfor_input.stacked_input(1) + # TODO(agarwal): handle stacked? + num_segments = pfor_input.unstacked_input(2) + if not data_stacked: + data = _stack(data, pfor_input.pfor.loop_len_vector).t + segment_shape = array_ops.shape(segment_ids) + n = segment_shape[0] + ones = array_ops.ones_like(segment_shape)[1:] + segment_offset = num_segments * math_ops.range(n) + segment_offset = array_ops.reshape(segment_offset, + array_ops.concat([[n], ones], axis=0)) + segment_ids += segment_offset + num_segments *= n + output = math_ops.unsorted_segment_sum(data, segment_ids, num_segments) + new_output_shape = array_ops.concat( + [[n, -1], array_ops.shape(output)[1:]], axis=0) + output = array_ops.reshape(output, new_output_shape) + return wrap(output, True) + + +@RegisterPFor("Cast") +def _convert_cast(pfor_input): + inp = pfor_input.stacked_input(0) + dtype = pfor_input.get_attr("DstT") + return wrap(math_ops.cast(inp, dtype), True) + + +# Note that ops handled here do not have attributes except "T", and hence don't +# need extra arguments passed to the cwise_op call below. +@RegisterPForWithArgs("Add", math_ops.add) +@RegisterPForWithArgs("Ceil", math_ops.ceil) +@RegisterPForWithArgs("Equal", math_ops.equal) +@RegisterPForWithArgs("NotEqual", math_ops.not_equal) +@RegisterPForWithArgs("Floor", math_ops.floor) +@RegisterPForWithArgs("Greater", math_ops.greater) +@RegisterPForWithArgs("GreaterEqual", math_ops.greater_equal) +@RegisterPForWithArgs("Less", math_ops.less) +@RegisterPForWithArgs("LessEqual", math_ops.less_equal) +@RegisterPForWithArgs("LogicalOr", math_ops.logical_or) +@RegisterPForWithArgs("LogicalAnd", math_ops.logical_and) +@RegisterPForWithArgs("LogicalNot", math_ops.logical_not) +@RegisterPForWithArgs("LogicalXor", math_ops.logical_xor) +@RegisterPForWithArgs("Maximum", math_ops.maximum) +@RegisterPForWithArgs("Minimum", math_ops.minimum) +@RegisterPForWithArgs("Mul", math_ops.multiply) +@RegisterPForWithArgs("Neg", math_ops.negative) +@RegisterPForWithArgs("RealDiv", math_ops.divide) +@RegisterPForWithArgs("Relu", nn_ops.relu) +@RegisterPForWithArgs("Sigmoid", math_ops.sigmoid) +@RegisterPForWithArgs("Square", math_ops.square) +@RegisterPForWithArgs("Sub", math_ops.subtract) +@RegisterPForWithArgs("Tanh", math_ops.tanh) +def _convert_cwise(pfor_input, op_type, op_func): + del op_type + pfor_input.expanddim_inputs_for_broadcast() + return wrap(op_func(*[x.t for x in pfor_input.inputs]), True) + + +@RegisterPFor("Shape") +def _convert_shape(pfor_input): + out_type = pfor_input.get_attr("out_type") + return wrap( + array_ops.shape(pfor_input.stacked_input(0), out_type=out_type)[1:], + False) + + +@RegisterPFor("ShapeN") +def _convert_shape_n(pfor_input): + out_type = pfor_input.get_attr("out_type") + shapes = [ + array_ops.shape(x, out_type=out_type)[1:] + if stacked else array_ops.shape(x) for x, stacked, _ in pfor_input.inputs + ] + return [wrap(x, False) for x in shapes] + + +@RegisterPFor("Size") +def _convert_size(pfor_input): + out_type = pfor_input.get_attr("out_type") + n = math_ops.cast(pfor_input.pfor.loop_len_vector[0], out_type) + return wrap( + array_ops.size(pfor_input.stacked_input(0), out_type=out_type) // n, + False) + + +@RegisterPFor("Rank") +def _convert_rank(pfor_input): + return wrap(array_ops.rank(pfor_input.stacked_input(0)) - 1, False) + + +@RegisterPFor("AddN") +def _convert_addn(pfor_input): + # AddN does not support broadcasting. + pfor_input.stack_inputs() + return wrap(math_ops.add_n([x.t for x in pfor_input.inputs]), True) + + +@RegisterPFor("BiasAddGrad") +def _convert_biasaddgrad(pfor_input): + grad = pfor_input.stacked_input(0) + fmt = pfor_input.get_attr("data_format") + if fmt == b"NCHW": + output = math_ops.reduce_sum(grad, axis=[1, 3, 4], keepdims=False) + else: + grad_shape = array_ops.shape(grad) + last_dim_shape = grad_shape[-1] + first_dim_shape = grad_shape[0] + output = array_ops.reshape(grad, [first_dim_shape, -1, last_dim_shape]) + output = math_ops.reduce_sum(output, axis=[1], keepdims=False) + return wrap(output, True) + + +# Some required ops are not exposed under the tf namespace. Hence relying on +# _create_op to create them. +@RegisterPForWithArgs("ReluGrad") +@RegisterPForWithArgs("TanhGrad") +@RegisterPForWithArgs("SigmoidGrad") +def _convert_grads(pfor_input, op_type, *args, **kw_args): + del args + del kw_args + # TODO(agarwal): Looks like these ops don't support broadcasting. Hence we + # have to use tiling here. + pfor_input.stack_inputs() + outputs = _create_op( + op_type, [x.t for x in pfor_input.inputs], + [x.dtype for x in pfor_input.outputs], + attrs=pfor_input.op.node_def.attr).outputs + return [wrap(x, True) for x in outputs] + + +@RegisterPFor("Select") +def _convert_select(pfor_input): + pfor_input.stack_inputs() + cond = pfor_input.stacked_input(0) + t = pfor_input.stacked_input(1) + e = pfor_input.stacked_input(2) + cond_rank = array_ops.rank(cond) + cond, t, e = control_flow_ops.cond( + cond_rank > 1, lambda: _inputs_with_flattening(pfor_input, [0, 1, 2]), + lambda: [cond, t, e]) + outputs = _create_op( + pfor_input.op_type, [cond, t, e], [x.dtype for x in pfor_input.outputs], + attrs=pfor_input.op.node_def.attr).outputs + n = pfor_input.pfor.loop_len_vector + out = control_flow_ops.cond(cond_rank > 1, + lambda: _unflatten_first_dim(outputs[0], n), + lambda: outputs[0]) + return [wrap(out, True) for x in outputs] + + +# random_ops + + +@RegisterPForWithArgs("RandomUniform") +@RegisterPForWithArgs("RandomUniformInt") +@RegisterPForWithArgs("RandomStandardNormal") +@RegisterPForWithArgs("TruncatedNormal") +@RegisterPForWithArgs("RandomGamma") +@RegisterPForWithArgs("RandomPoissonV2") +def _convert_random(pfor_input, op_type, *args, **kw_args): + del args + del kw_args + inputs = [pfor_input.unstacked_input(i) for i in range(pfor_input.num_inputs)] + # inputs[0] is "shape" + inputs[0] = array_ops.concat( + [pfor_input.pfor.loop_len_vector, inputs[0]], axis=0) + logging.warning( + "Note that %s inside pfor op may not give same output as " + "inside a sequential loop.", op_type) + outputs = _create_op( + op_type, + inputs, [x.dtype for x in pfor_input.outputs], + attrs=pfor_input.op.node_def.attr).outputs + return [wrap(x, True) for x in outputs] + + +# logging_ops + + +@RegisterPFor("Assert") +def _convert_assert(pfor_input): + cond, cond_stacked, _ = pfor_input.input(0) + if cond_stacked: + cond = math_ops.reduce_all(cond) + + data_list = [x.t for x in pfor_input.inputs][1:] + return _create_op("Assert", [cond] + data_list, [], + attrs=pfor_input.op.node_def.attr) + + +@RegisterPFor("Print") +def _convert_print(pfor_input): + # Note that we don't stack all the inputs. Hence unstacked values are printed + # once here vs multiple times in a while_loop. + pfor_input.stack_inputs([0]) + outputs = _create_op( + "Print", [x.t for x in pfor_input.inputs], + [x.dtype for x in pfor_input.outputs], + attrs=pfor_input.op.node_def.attr).outputs + return [wrap(x, True) for x in outputs] + + +# data_flow_ops + +# TensorArray conversion is tricky since we don't support arrays of +# TensorArrays. For converting them, we consider two distinct cases: +# +# 1. The array is constructed outside the pfor call, and read/written inside the +# loop. +# This is an easier case since we don't need to make an array of TensorArrays. +# A correctness requirement is that these parallel iterations shouldn't attempt +# to write to the same location. Hence at conversion time we disallow indices to +# be loop-invariant as that would guarantee a collision. Even if the indices are +# not loop-invariant, they could conflict and that shall trigger runtime errors. +# +# 2. The array is constructed and used entirely inside each pfor iteration. +# For simplicity, here we require that the indices used for write/scatter are +# "unstacked". Otherwise it becomes hard to merge the TensorArrays created in +# different pfor iterations. We consider two sub_cases: +# +# 2a Elements written to the array are "stacked" +# To simulate multiple TensorArrays, we may increase the dimension of each +# element of the array. i.e. the i_th row of the j_th entry of the converted +# TensorArray corresponds to to the j_th entry of the TensorArray in the i_th +# pfor iteration. +# +# 2b Elements written to the array are "unstacked" +# In this case we don't increase the dimensions to avoid redundant tiling. Each +# iteration is trying to write the same value. So we convert that to a single +# write. +# +# Here are some tricks used to implement the above: +# - TensorArrayV3 constructor encodes the element shape as an attr. Instead of +# trying to trace whether future writes are stacked or unstacked in order to set +# this attr, we set it to correspond to unknown shape. +# - We use the "flow" output of the different ops to track whether the array +# elements are stacked or unstacked. If a stacked write/scatter is done, we make +# the flow stacked as well. +# - We use some heuristic traversal of the graph to track whether the +# TensorArray handle was created inside or outside the pfor loop. + + +@RegisterPFor("TensorArrayV3") +def _convert_tensor_array_v3(pfor_input): + size = pfor_input.unstacked_input(0) + dtype = pfor_input.get_attr("dtype") + dynamic_size = pfor_input.get_attr("dynamic_size") + clear_after_read = pfor_input.get_attr("clear_after_read") + identical_element_shapes = pfor_input.get_attr("identical_element_shapes") + tensor_array_name = pfor_input.get_attr("tensor_array_name") + handle, flow = data_flow_ops.tensor_array_v3( + size, + dtype=dtype, + # We don't set element shape since we don't know if writes are stacked or + # not yet. + element_shape=None, + dynamic_size=dynamic_size, + clear_after_read=clear_after_read, + identical_element_shapes=identical_element_shapes, + tensor_array_name=tensor_array_name) + # Note we keep flow unstacked for now since we don't know if writes will be + # stacked or not. + return wrap(handle, False), wrap(flow, False) + + +@RegisterPFor("TensorArraySizeV3") +def _convert_tensor_array_size_v3(pfor_input): + handle = pfor_input.unstacked_input(0) + flow, flow_stacked, _ = pfor_input.input(1) + if flow_stacked: + flow = _unstack_flow(flow) + size = data_flow_ops.tensor_array_size_v3(handle, flow) + return wrap(size, False) + + +def _handle_inside_pfor(pfor_input, handle): + """Returns True if handle was created inside the pfor loop.""" + # We use some heuristic to find the original TensorArray creation op. + # The logic should handle the common cases (except cond based subgraphs). + # In theory the user could perform different operations on the handle (like + # Reshape, stack multiple handles, etc) which could break this logic. + # TODO(agarwal): handle Switch/Merge. + while handle.op.type in ("Enter", "Identity"): + handle = handle.op.inputs[0] + if handle.op.type not in [ + "TensorArrayV3", "TensorArrayGradV3", "TensorArrayGradWithShape"]: + raise ValueError("Unable to find source for handle %s" % handle) + else: + return pfor_input.pfor.op_is_inside_loop(handle.op) + + +def _unstack_flow(value): + # TODO(agarwal): consider looking if this is a Tile op then get its input. + # This may avoid running the Tile operations. + return array_ops.gather(value, 0) + + +@RegisterPFor("TensorArrayReadV3") +def _convert_tensor_array_read_v3(pfor_input): + handle = pfor_input.unstacked_input(0) + index, index_stacked, _ = pfor_input.input(1) + dtype = pfor_input.get_attr("dtype") + flow, flow_stacked, _ = pfor_input.input(2) + if flow_stacked: + flow = _unstack_flow(flow) + + is_inside_pfor = _handle_inside_pfor(pfor_input, pfor_input.op.inputs[0]) + if is_inside_pfor: + # Note that if we are inside a control flow construct inside the pfor, and + # only some of the iterations are doing the read (i.e. + # `all_indices_partitioned` is True), then the read operation should only + # return values for the currently active pfor iterations (`all_indices` + # below). Hence, whenever the returned value is stacked (i.e. `flow` is + # stacked), we may need to do an extra gather after reading the values. Also + # note that if `is_inside` is false, then values in the tensor array are + # unstacked. So the check is only needed in this branch. + all_indices = pfor_input.pfor.all_indices + all_indices_partitioned = pfor_input.pfor.all_indices_partitioned + # Note: flow_stacked indicates if values in the TensorArray are stacked or + # not. + if index_stacked: + if flow_stacked: + raise ValueError( + "It looks like TensorArrayReadV3 was called on a TensorArray whose" + " values are not loop-invariant, and the read indices were also" + " not loop invariant. This is currently unsupported.") + value = data_flow_ops.tensor_array_gather_v3( + handle, index, flow, dtype=dtype) + return wrap(value, True) + value = data_flow_ops.tensor_array_read_v3( + handle, index, flow, dtype=dtype) + if flow_stacked and all_indices_partitioned: + value = array_ops.gather(value, all_indices) + return wrap(value, flow_stacked) + # Values in the TensorArray should be unstacked (since different iterations + # couldn't write to the same location). So whether output is stacked or not + # depends on index_stacked. + if index_stacked: + value = data_flow_ops.tensor_array_gather_v3( + handle, index, flow, dtype=dtype) + else: + value = data_flow_ops.tensor_array_read_v3( + handle, index, flow, dtype=dtype) + return wrap(value, index_stacked) + + +@RegisterPFor("TensorArrayWriteV3") +def _convert_tensor_array_write_v3(pfor_input): + handle = pfor_input.unstacked_input(0) + index, index_stacked, _ = pfor_input.input(1) + value, value_stacked, _ = pfor_input.input(2) + flow, flow_stacked, _ = pfor_input.input(3) + if value_stacked and pfor_input.pfor.all_indices_partitioned: + # Looks like we are in a control flow in a pfor where not all iterations are + # active now. We don't allow that since that could lead to different indices + # having different shapes which will be hard to merge later. + raise ValueError("Writing non loop invariant values to TensorArray from " + "inside a while_loop/cond not supported.") + if flow_stacked: + flow = _unstack_flow(flow) + is_inside = _handle_inside_pfor(pfor_input, pfor_input.op.inputs[0]) + if is_inside: + if index_stacked: + raise ValueError("Need indices for %s to be loop invariant" % handle) + if not flow_stacked and not value_stacked: + flow_out = data_flow_ops.tensor_array_write_v3(handle, index, value, flow) + return wrap(flow_out, False) + else: + if not value_stacked: + value = _stack(value, pfor_input.pfor.loop_len_vector).t + # TODO(agarwal): Note that if flow is unstacked and value is stacked, then + # this may or may not be a safe situation. flow is unstacked both for a + # freshly created TensorArray, as well as after unstacked values are + # written to it. If it is the latter, then we cannot write a stacked value + # now since that may cause runtime errors due to different shapes in the + # array. At the moment we are not able to handle this gracefully and + # distinguish between the two cases. That would require some heuristic + # traversal of the graph to figure out whether all the writes are + # unstacked or not. + flow_out = data_flow_ops.tensor_array_write_v3(handle, index, value, flow) + return _stack(flow_out, pfor_input.pfor.loop_len_vector) + else: + if not index_stacked: + raise ValueError("Need indices for %s to be not loop invariant" % handle) + # Note that even when index_stacked is true, actual values in index may + # still not be unique. However that will cause runtime error when executing + # the scatter operation below. + if not value_stacked: + value = _stack(value, pfor_input.pfor.loop_len_vector).t + flow_out = data_flow_ops.tensor_array_scatter_v3(handle, index, value, flow) + return _stack(flow_out, pfor_input.pfor.loop_len_vector) + + +def _transpose_first_two_dims(value): + # TODO(agarwal): optimize if one of the dims == 1. + value_shape = array_ops.shape(value) + v0 = value_shape[0] + v1 = value_shape[1] + value = array_ops.reshape(value, [v0, v1, -1]) + value = array_ops.transpose(value, [1, 0, 2]) + new_shape = array_ops.concat([[v1, v0], value_shape[2:]], axis=0) + return array_ops.reshape(value, new_shape) + + +@RegisterPFor("TensorArrayGatherV3") +def _convert_tensor_array_gather_v3(pfor_input): + handle = pfor_input.unstacked_input(0) + indices, indices_stacked, _ = pfor_input.input(1) + indices = array_ops.reshape(indices, [-1]) + flow, flow_stacked, _ = pfor_input.input(2) + if flow_stacked: + flow = _unstack_flow(flow) + dtype = pfor_input.get_attr("dtype") + # TODO(agarwal): support element_shape attr? + + n = pfor_input.pfor.loop_len_vector + value = data_flow_ops.tensor_array_gather_v3( + handle, indices, flow, dtype=dtype) + is_inside = _handle_inside_pfor(pfor_input, pfor_input.op.inputs[0]) + if is_inside: + # flow_stacked indicates if values in the TensorArray are stacked or not. + if indices_stacked: + if flow_stacked: + raise ValueError( + "It looks like TensorArrayGatherV3 was called on a TensorArray " + "whose values are not loop-invariant, and the indices were also " + "not loop invariant. This is currently unsupported.") + else: + value = _unflatten_first_dim(value, n) + return wrap(value, True) + else: + if flow_stacked: + # Since elements in this array are stacked and `value` was produced by + # gather, its first two dims are "gathered elements" and "stack + # dimension". Our semantics require these two to be flipped. + value = _transpose_first_two_dims(value) + return wrap(value, flow_stacked) + else: + # Values in the TensorArray should be unstacked (since different iterations + # couldn't write to the same location). So whether output is stacked or not + # depends on indices_stacked. + if indices_stacked: + value = _unflatten_first_dim(value, n) + return wrap(value, indices_stacked) + + +@RegisterPFor("TensorArrayScatterV3") +def _convert_tensor_array_scatter_v3(pfor_input): + handle = pfor_input.unstacked_input(0) + indices, indices_stacked, _ = pfor_input.input(1) + indices = array_ops.reshape(indices, [-1]) + value, value_stacked, _ = pfor_input.input(2) + flow, flow_stacked, _ = pfor_input.input(3) + + if flow_stacked: + flow = _unstack_flow(flow) + + is_inside = _handle_inside_pfor(pfor_input, pfor_input.op.inputs[0]) + if is_inside: + if indices_stacked: + raise ValueError("Need indices for %s to be loop invariant" % handle) + # Note that flow_stacked indicates if existing values in the array are + # stacked or not. + if not flow_stacked and not value_stacked: + flow_out = data_flow_ops.tensor_array_scatter_v3(handle, indices, value, + flow) + return wrap(flow_out, False) + if not value_stacked: + # TODO(agarwal): tile in the second dimension directly instead of + # transposing below. + value = _stack(value, pfor_input.pfor.loop_len_vector).t + + value = _transpose_first_two_dims(value) + # TODO(agarwal): Note that if a previous write was unstacked, flow will be + # unstacked, and a stacked value may be written here which may cause + # runtime error due to different elements having different shape. We do + # not try to prevent that. + flow_out = data_flow_ops.tensor_array_scatter_v3(handle, indices, value, + flow) + return _stack(flow_out, pfor_input.pfor.loop_len_vector) + if not indices_stacked: + raise ValueError("Need indices for %s to be not loop invariant" % handle) + if not value_stacked: + value = _stack(value, pfor_input.pfor.loop_len_vector).t + value = _flatten_first_two_dims(value) + flow_out = data_flow_ops.tensor_array_scatter_v3(handle, indices, value, + flow) + return _stack(flow_out, pfor_input.pfor.loop_len_vector) + + +@RegisterPFor("TensorArrayGradV3") +def _convert_tensor_array_grad_v3(pfor_input): + handle = pfor_input.unstacked_input(0) + flow, flow_stacked, _ = pfor_input.input(1) + if flow_stacked: + flow = _unstack_flow(flow) + source = pfor_input.get_attr("source") + # TODO(agarwal): For now, we assume that gradients are stacked if the + # TensorArrayGradV3 call is being done inside the pfor. Getting that wrong + # will give runtime error due to incorrect shape being written to the + # accumulator. It is difficult to know in advance if gradients written will be + # stacked or not. Note that flow being stacked is not indicative of the + # gradient being stacked or not. Revisit this later. + shape_to_prepend = pfor_input.pfor.loop_len_vector + grad_handle, flow_out = data_flow_ops.tensor_array_grad_with_shape( + handle=handle, + flow_in=flow, + shape_to_prepend=shape_to_prepend, + source=source) + flow_out = _stack(flow_out, pfor_input.pfor.loop_len_vector).t + return [wrap(grad_handle, False), wrap(flow_out, True)] + + +# StackV2 conversion is tricky since we don't have arrays of StackV2. So similar +# to TensorArrays, we convert them by changing the dimension of the elements +# inside the stack. +# +# We consider two cases: +# +# 1. StackV2 is constructed and used entirely inside the pfor loop. +# We keep a single Stack and perform the push/pop operations of all the +# iterations in lock-step. We also assume that all the iterations perform these +# operations. In case of dynamic control flow, if only some of the iterations +# try to perform a push/pop, then the conversion may not work correctly and may +# cause undefined behavior. +# TODO(agarwal): test StackV2 with dynamic control flow. +# +# 2. StackV2 is constructed outside the pfor loop. +# Performing stack push/pop in a parallel fashion is ill-defined. However given +# that reading stacks created externally is a common operation when computing +# jacobians, we provide some special semantics here as follows. +# - disallow push operations to the stack +# - pop operations are performed in lock step by all iterations, similar to the +# case when the stack is created inside. A single value is popped during the +# lock-step operation and broadcast to all the iterations. Values in the stack +# are assumed to be loop-invariant. +# +# Some other implementation details: +# We use an ugly logic to find whether values in Stack data structure are +# loop invariant or not. When converting push/pop operations, we keep track of +# whether the last conversion used a stacked value or not (see _stack_cache +# below). As a result if an unstacked value is written first, subsequent stacked +# writes are disallowed when they could have been allowed in theory. + +# Map from cache key based on StackV2 handle to a bool indicating whether values +# are stacked or not. +# TODO(agarwal): move _stack_cache inside pfor? +_stack_cache = {} + + +def _stack_cache_key(pfor_input): + """Create cache key corresponding to a stack handle.""" + op_type = pfor_input.op_type + assert op_type in ["StackPushV2", "StackPopV2"], op_type + orig_handle = pfor_input.op.inputs[0] + while orig_handle.op.type in ["Identity", "Enter"]: + orig_handle = orig_handle.op.inputs[0] + assert orig_handle.op.type == "StackV2", orig_handle.op + return ops.get_default_graph(), pfor_input.pfor, orig_handle + + +def _stack_handle_inside_pfor(handle, pfor_input): + while handle.op.type in ["Identity", "Enter"]: + handle = handle.op.inputs[0] + assert handle.op.type == "StackV2", ( + "Unable to find StackV2 op. Got %s" % handle.op) + return pfor_input.pfor.op_is_inside_loop(handle.op) + + +@RegisterPFor("StackPushV2") +def _convert_stack_push_v2(pfor_input): + handle = pfor_input.unstacked_input(0) + elem, elem_stacked, _ = pfor_input.input(1) + swap_memory = pfor_input.get_attr("swap_memory") + + if not _stack_handle_inside_pfor(pfor_input.op.inputs[0], pfor_input): + raise ValueError("StackPushV2 not allowed on stacks created outside pfor") + stack_cache_key = _stack_cache_key(pfor_input) + stacked = _stack_cache.get(stack_cache_key, None) + if stacked is None: + stacked = elem_stacked + _stack_cache[stack_cache_key] = stacked + else: + # If we previously made it unstacked then we can't revert to being stacked. + if not stacked and elem_stacked: + raise ValueError( + "It looks like the stack was previously determined to be loop" + " invariant, but we are now trying to push a loop dependent value" + " to it. This is currently unsupported.") + if stacked and not elem_stacked: + elem = _stack(elem, pfor_input.pfor.loop_len_vector).t + out = data_flow_ops.stack_push_v2(handle, elem, swap_memory=swap_memory) + return wrap(out, stacked) + + +# Note that inputs to this convertor will be unstacked. However it should get +# called since it is a stateful op. +@RegisterPFor("StackPopV2") +def _convert_stack_pop_v2(pfor_input): + handle = pfor_input.unstacked_input(0) + stack_cache_key = _stack_cache_key(pfor_input) + stacked = _stack_cache.get(stack_cache_key, None) + # If a StackPushV2 has not been converted yet, we default to unstacked since + # the push could be outside of pfor, or the covertor may not be called if the + # inputs are unconverted. + if stacked is None: + stacked = False + _stack_cache[stack_cache_key] = False + elem_type = pfor_input.get_attr("elem_type") + out = data_flow_ops.stack_pop_v2(handle, elem_type) + return wrap(out, stacked) diff --git a/tensorflow/stream_executor/cuda/cuda_dnn.cc b/tensorflow/stream_executor/cuda/cuda_dnn.cc index d4f2fd2625..84916385a8 100644 --- a/tensorflow/stream_executor/cuda/cuda_dnn.cc +++ b/tensorflow/stream_executor/cuda/cuda_dnn.cc @@ -3074,6 +3074,22 @@ port::Status CudnnSupport::DoConvolveBackwardDataImpl( } } + // Cudnn 7.1.4 has a bug if the workspace of the following convolution is not + // zero-initialized. + // TODO(timshen): Add an nvbugs/ link. + if (CUDNN_VERSION >= 7000 && + algorithm_config.algorithm().algo_id() == + CUDNN_CONVOLUTION_BWD_DATA_ALGO_1 && + cudnn_type == CUDNN_DATA_HALF && + algorithm_config.algorithm().tensor_ops_enabled() && + input_descriptor.layout() == dnn::DataLayout::kBatchYXDepth && + filter_descriptor.layout() == dnn::FilterLayout::kOutputInputYX && + output_descriptor.layout() == dnn::DataLayout::kBatchDepthYX && + (convolution_descriptor.vertical_filter_stride() > 1 || + convolution_descriptor.horizontal_filter_stride() > 1)) { + stream->ThenMemZero(&scratch, scratch.size()); + } + RETURN_IF_CUDNN_ERROR( cudnnConvolutionBackwardData(cudnn.handle(), /*alpha=*/alpha, diff --git a/tensorflow/tools/api/golden/tensorflow.estimator.-d-n-n-classifier.pbtxt b/tensorflow/tools/api/golden/tensorflow.estimator.-d-n-n-classifier.pbtxt index 111914f643..0c6b7e4a82 100644 --- a/tensorflow/tools/api/golden/tensorflow.estimator.-d-n-n-classifier.pbtxt +++ b/tensorflow/tools/api/golden/tensorflow.estimator.-d-n-n-classifier.pbtxt @@ -21,7 +21,7 @@ tf_class { } member_method { name: "__init__" - argspec: "args=[\'self\', \'hidden_units\', \'feature_columns\', \'model_dir\', \'n_classes\', \'weight_column\', \'label_vocabulary\', \'optimizer\', \'activation_fn\', \'dropout\', \'input_layer_partitioner\', \'config\', \'warm_start_from\', \'loss_reduction\'], varargs=None, keywords=None, defaults=[\'None\', \'2\', \'None\', \'None\', \'Adagrad\', \'<function relu instance>\', \'None\', \'None\', \'None\', \'None\', \'weighted_sum\'], " + argspec: "args=[\'self\', \'hidden_units\', \'feature_columns\', \'model_dir\', \'n_classes\', \'weight_column\', \'label_vocabulary\', \'optimizer\', \'activation_fn\', \'dropout\', \'input_layer_partitioner\', \'config\', \'warm_start_from\', \'loss_reduction\', \'batch_norm\'], varargs=None, keywords=None, defaults=[\'None\', \'2\', \'None\', \'None\', \'Adagrad\', \'<function relu instance>\', \'None\', \'None\', \'None\', \'None\', \'weighted_sum\', \'False\'], " } member_method { name: "eval_dir" diff --git a/tensorflow/tools/api/golden/tensorflow.estimator.-d-n-n-linear-combined-classifier.pbtxt b/tensorflow/tools/api/golden/tensorflow.estimator.-d-n-n-linear-combined-classifier.pbtxt index 67e4ee02d0..49a3e898c5 100644 --- a/tensorflow/tools/api/golden/tensorflow.estimator.-d-n-n-linear-combined-classifier.pbtxt +++ b/tensorflow/tools/api/golden/tensorflow.estimator.-d-n-n-linear-combined-classifier.pbtxt @@ -21,7 +21,7 @@ tf_class { } member_method { name: "__init__" - argspec: "args=[\'self\', \'model_dir\', \'linear_feature_columns\', \'linear_optimizer\', \'dnn_feature_columns\', \'dnn_optimizer\', \'dnn_hidden_units\', \'dnn_activation_fn\', \'dnn_dropout\', \'n_classes\', \'weight_column\', \'label_vocabulary\', \'input_layer_partitioner\', \'config\', \'warm_start_from\', \'loss_reduction\'], varargs=None, keywords=None, defaults=[\'None\', \'None\', \'Ftrl\', \'None\', \'Adagrad\', \'None\', \'<function relu instance>\', \'None\', \'2\', \'None\', \'None\', \'None\', \'None\', \'None\', \'weighted_sum\'], " + argspec: "args=[\'self\', \'model_dir\', \'linear_feature_columns\', \'linear_optimizer\', \'dnn_feature_columns\', \'dnn_optimizer\', \'dnn_hidden_units\', \'dnn_activation_fn\', \'dnn_dropout\', \'n_classes\', \'weight_column\', \'label_vocabulary\', \'input_layer_partitioner\', \'config\', \'warm_start_from\', \'loss_reduction\', \'batch_norm\'], varargs=None, keywords=None, defaults=[\'None\', \'None\', \'Ftrl\', \'None\', \'Adagrad\', \'None\', \'<function relu instance>\', \'None\', \'2\', \'None\', \'None\', \'None\', \'None\', \'None\', \'weighted_sum\', \'False\'], " } member_method { name: "eval_dir" diff --git a/tensorflow/tools/api/golden/tensorflow.estimator.-d-n-n-linear-combined-regressor.pbtxt b/tensorflow/tools/api/golden/tensorflow.estimator.-d-n-n-linear-combined-regressor.pbtxt index e1289b975e..4b81613c92 100644 --- a/tensorflow/tools/api/golden/tensorflow.estimator.-d-n-n-linear-combined-regressor.pbtxt +++ b/tensorflow/tools/api/golden/tensorflow.estimator.-d-n-n-linear-combined-regressor.pbtxt @@ -21,7 +21,7 @@ tf_class { } member_method { name: "__init__" - argspec: "args=[\'self\', \'model_dir\', \'linear_feature_columns\', \'linear_optimizer\', \'dnn_feature_columns\', \'dnn_optimizer\', \'dnn_hidden_units\', \'dnn_activation_fn\', \'dnn_dropout\', \'label_dimension\', \'weight_column\', \'input_layer_partitioner\', \'config\', \'warm_start_from\', \'loss_reduction\'], varargs=None, keywords=None, defaults=[\'None\', \'None\', \'Ftrl\', \'None\', \'Adagrad\', \'None\', \'<function relu instance>\', \'None\', \'1\', \'None\', \'None\', \'None\', \'None\', \'weighted_sum\'], " + argspec: "args=[\'self\', \'model_dir\', \'linear_feature_columns\', \'linear_optimizer\', \'dnn_feature_columns\', \'dnn_optimizer\', \'dnn_hidden_units\', \'dnn_activation_fn\', \'dnn_dropout\', \'label_dimension\', \'weight_column\', \'input_layer_partitioner\', \'config\', \'warm_start_from\', \'loss_reduction\', \'batch_norm\'], varargs=None, keywords=None, defaults=[\'None\', \'None\', \'Ftrl\', \'None\', \'Adagrad\', \'None\', \'<function relu instance>\', \'None\', \'1\', \'None\', \'None\', \'None\', \'None\', \'weighted_sum\', \'False\'], " } member_method { name: "eval_dir" diff --git a/tensorflow/tools/api/golden/tensorflow.estimator.-d-n-n-regressor.pbtxt b/tensorflow/tools/api/golden/tensorflow.estimator.-d-n-n-regressor.pbtxt index d030b2f51f..f50e375f7c 100644 --- a/tensorflow/tools/api/golden/tensorflow.estimator.-d-n-n-regressor.pbtxt +++ b/tensorflow/tools/api/golden/tensorflow.estimator.-d-n-n-regressor.pbtxt @@ -21,7 +21,7 @@ tf_class { } member_method { name: "__init__" - argspec: "args=[\'self\', \'hidden_units\', \'feature_columns\', \'model_dir\', \'label_dimension\', \'weight_column\', \'optimizer\', \'activation_fn\', \'dropout\', \'input_layer_partitioner\', \'config\', \'warm_start_from\', \'loss_reduction\'], varargs=None, keywords=None, defaults=[\'None\', \'1\', \'None\', \'Adagrad\', \'<function relu instance>\', \'None\', \'None\', \'None\', \'None\', \'weighted_sum\'], " + argspec: "args=[\'self\', \'hidden_units\', \'feature_columns\', \'model_dir\', \'label_dimension\', \'weight_column\', \'optimizer\', \'activation_fn\', \'dropout\', \'input_layer_partitioner\', \'config\', \'warm_start_from\', \'loss_reduction\', \'batch_norm\'], varargs=None, keywords=None, defaults=[\'None\', \'1\', \'None\', \'Adagrad\', \'<function relu instance>\', \'None\', \'None\', \'None\', \'None\', \'weighted_sum\', \'False\'], " } member_method { name: "eval_dir" diff --git a/tensorflow/tools/docker/Dockerfile.devel-mkl b/tensorflow/tools/docker/Dockerfile.devel-mkl index de44ba2173..6dca0e393f 100755 --- a/tensorflow/tools/docker/Dockerfile.devel-mkl +++ b/tensorflow/tools/docker/Dockerfile.devel-mkl @@ -1,6 +1,6 @@ FROM ubuntu:16.04 -LABEL maintainer="Clayne Robison <clayne.b.robison@intel.com>"
+LABEL maintainer="Clayne Robison <clayne.b.robison@intel.com>" # These parameters can be overridden by parameterized_docker_build.sh ARG TF_BUILD_VERSION=r1.9 diff --git a/third_party/eigen3/BUILD b/third_party/eigen3/BUILD index 9d9c27b180..203991b50f 100644 --- a/third_party/eigen3/BUILD +++ b/third_party/eigen3/BUILD @@ -80,5 +80,5 @@ genrule( mkdir -p "$@/$${d}" cp "$${f}" "$@/$${d}/" done - """ + """, ) |