From 33879661ed221167f3987bcc2fdc5298986d29fb Mon Sep 17 00:00:00 2001 From: Todd Wang Date: Wed, 25 Jul 2018 23:53:16 -0700 Subject: Ensure failed sub-streams are not re-used. Streams have a monotonic state machine; if a stream encounters any error, it will remain in an error state forever. Without this change, a previously failed sub-stream will be put back on sub_streams_, only to cause the next usage of the sub-stream to trivially fail. PiperOrigin-RevId: 206112024 --- tensorflow/stream_executor/stream.cc | 9 +- tensorflow/stream_executor/stream.h | 2 +- tensorflow/stream_executor/stream_test.cc | 139 ++++++++++++++++++++++++++++++ 3 files changed, 148 insertions(+), 2 deletions(-) create mode 100644 tensorflow/stream_executor/stream_test.cc (limited to 'tensorflow/stream_executor') diff --git a/tensorflow/stream_executor/stream.cc b/tensorflow/stream_executor/stream.cc index 2c495c99e1..6248aa2d01 100644 --- a/tensorflow/stream_executor/stream.cc +++ b/tensorflow/stream_executor/stream.cc @@ -1941,7 +1941,14 @@ void Stream::ReturnSubStream(Stream *sub_stream) { mutex_lock lock(mu_); for (auto &stream : sub_streams_) { if (stream.first.get() == sub_stream) { - stream.second = true; + // Streams have a monotonic state machine; if a stream + // encounters an error, it will remain in an error state + // forever. Only allow re-use of ok streams. + // + // TODO(toddw): Improve this mechanism, if necessary, to drop + // failed streams completely. + const bool ready_to_reuse = sub_stream->ok(); + stream.second = ready_to_reuse; return; } } diff --git a/tensorflow/stream_executor/stream.h b/tensorflow/stream_executor/stream.h index 63d64947c8..706442a666 100644 --- a/tensorflow/stream_executor/stream.h +++ b/tensorflow/stream_executor/stream.h @@ -125,7 +125,7 @@ class Stream { Stream *GetOrCreateSubStream() LOCKS_EXCLUDED(mu_); // Return the sub-stream back to the host stream so that it can be reused - // later. + // later. Sub-streams that are !ok() will not be reused. void ReturnSubStream(Stream *sub_stream) LOCKS_EXCLUDED(mu_); // Allocate temporary memories. The stream will deallocate them when blocked diff --git a/tensorflow/stream_executor/stream_test.cc b/tensorflow/stream_executor/stream_test.cc new file mode 100644 index 0000000000..47dd675834 --- /dev/null +++ b/tensorflow/stream_executor/stream_test.cc @@ -0,0 +1,139 @@ +/* Copyright 2018 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include "tensorflow/stream_executor/stream_executor.h" + +#include "tensorflow/core/platform/test.h" + +namespace stream_executor { +namespace { + +class StreamTest : public ::testing::Test { + protected: + std::unique_ptr NewStreamExecutor() { + Platform* platform = + MultiPlatformManager::PlatformWithName("Host").ConsumeValueOrDie(); + StreamExecutorConfig config(/*ordinal=*/0); + return platform->GetUncachedExecutor(config).ConsumeValueOrDie(); + } +}; + +TEST_F(StreamTest, NoInitNotOk) { + std::unique_ptr executor = NewStreamExecutor(); + Stream stream(executor.get()); + EXPECT_FALSE(stream.ok()); +} + +TEST_F(StreamTest, InitOk) { + std::unique_ptr executor = NewStreamExecutor(); + Stream stream(executor.get()); + stream.Init(); + EXPECT_TRUE(stream.ok()); +} + +TEST_F(StreamTest, OneSubStream) { + std::unique_ptr executor = NewStreamExecutor(); + Stream stream(executor.get()); + stream.Init(); + EXPECT_TRUE(stream.ok()); + + // Get and return a sub-stream. Sub-streams are always initialized. + Stream* sub_stream1 = stream.GetOrCreateSubStream(); + EXPECT_TRUE(sub_stream1->ok()); + stream.ReturnSubStream(sub_stream1); + + // Get and return another sub-stream. + Stream* sub_stream2 = stream.GetOrCreateSubStream(); + EXPECT_TRUE(sub_stream2->ok()); + stream.ReturnSubStream(sub_stream1); + + // The underlying sub-streams should be the same, since sub_stream1 + // was returned before we tried to get sub_stream2. + EXPECT_EQ(sub_stream1, sub_stream2); +} + +TEST_F(StreamTest, TwoSubStreams) { + std::unique_ptr executor = NewStreamExecutor(); + Stream stream(executor.get()); + stream.Init(); + EXPECT_TRUE(stream.ok()); + + // Get two sub-streams. + Stream* sub_stream1 = stream.GetOrCreateSubStream(); + EXPECT_TRUE(sub_stream1->ok()); + Stream* sub_stream2 = stream.GetOrCreateSubStream(); + EXPECT_TRUE(sub_stream2->ok()); + + // The underlying sub-streams should be different, since neither + // sub-stream has been returned. + EXPECT_NE(sub_stream1, sub_stream2); + + // Return sub_stream1 and get sub_stream3, which should be the same. + stream.ReturnSubStream(sub_stream1); + Stream* sub_stream3 = stream.GetOrCreateSubStream(); + EXPECT_TRUE(sub_stream3->ok()); + EXPECT_EQ(sub_stream1, sub_stream3); + EXPECT_NE(sub_stream2, sub_stream3); + + // Return sub_stream2 and get sub_stream4, which should be the same. + stream.ReturnSubStream(sub_stream2); + Stream* sub_stream4 = stream.GetOrCreateSubStream(); + EXPECT_TRUE(sub_stream4->ok()); + EXPECT_EQ(sub_stream2, sub_stream4); + EXPECT_NE(sub_stream3, sub_stream4); +} + +TEST_F(StreamTest, FailedSubStreamNotReused) { + std::unique_ptr executor = NewStreamExecutor(); + Stream stream(executor.get()); + stream.Init(); + EXPECT_TRUE(stream.ok()); + + // Get a sub-stream. + Stream* sub_stream1 = stream.GetOrCreateSubStream(); + EXPECT_TRUE(sub_stream1->ok()); + + // Force an error on the stream; here we call a method that requires + // DNN support, which we know the Host platform doesn't support. + sub_stream1->ThenDepthConcatenate({}, {}, nullptr); + EXPECT_FALSE(sub_stream1->ok()); + + // Return sub_stream1 and get sub_stream2. + stream.ReturnSubStream(sub_stream1); + Stream* sub_stream2 = stream.GetOrCreateSubStream(); + EXPECT_TRUE(sub_stream2->ok()); + + // The underlying streams should be different. They would have been + // the same, but since we forced an error on sub_stream1, it will + // not be re-used. Sadly we can't just check: + // EXPECT_NE(sub_stream1, sub_stream2); + // + // The above should hold logically, but it may fail if the new + // stream instance allocated for sub_stream2 happens to reside in + // the same memory address as sub_stream1. + // + // The check that sub_stream2->ok() serves as a good-enough check. + + // Return sub_stream2 and get sub_stream3. The previous error on + // sub_stream1 has no effect on these streams, and they are the + // same. + stream.ReturnSubStream(sub_stream2); + Stream* sub_stream3 = stream.GetOrCreateSubStream(); + EXPECT_TRUE(sub_stream3->ok()); + EXPECT_EQ(sub_stream2, sub_stream3); +} + +} // namespace +} // namespace stream_executor -- cgit v1.2.3