diff options
Diffstat (limited to 'tensorflow/compiler/xla/service/cpu/infeed_manager.cc')
-rw-r--r-- | tensorflow/compiler/xla/service/cpu/infeed_manager.cc | 72 |
1 files changed, 72 insertions, 0 deletions
diff --git a/tensorflow/compiler/xla/service/cpu/infeed_manager.cc b/tensorflow/compiler/xla/service/cpu/infeed_manager.cc new file mode 100644 index 0000000000..23a2dfcc32 --- /dev/null +++ b/tensorflow/compiler/xla/service/cpu/infeed_manager.cc @@ -0,0 +1,72 @@ +/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include "tensorflow/compiler/xla/service/cpu/infeed_manager.h" + +#include "tensorflow/core/platform/logging.h" + +namespace xla { +namespace cpu { +namespace runtime { + +InfeedBuffer::~InfeedBuffer() = default; + +InfeedManager::InfeedManager() : current_buffer_(nullptr) {} + +void InfeedManager::Reset() { + std::unique_lock<std::mutex> l(mu_); + CHECK(!current_buffer_); + for (auto buffer : enqueued_buffer_) { + buffer->Done(); + } + enqueued_buffer_.clear(); +} + +void InfeedManager::EnqueueBuffer(InfeedBuffer* buffer) { + std::unique_lock<std::mutex> l(mu_); + bool was_empty = enqueued_buffer_.empty(); + enqueued_buffer_.push_back(buffer); + if (was_empty) { + // This has the potential to suffer from the notified thread + // immediately trying and failing to acquire mu_, but seems + // preferable to the alternative of notifying outside the lock + // on every enqueue. + cv_.notify_one(); + } +} + +InfeedBuffer* InfeedManager::BlockingDequeueBuffer() { + std::unique_lock<std::mutex> l(mu_); + while (enqueued_buffer_.empty()) { + cv_.wait(l); + } + CHECK(!current_buffer_); + current_buffer_ = enqueued_buffer_.front(); + enqueued_buffer_.pop_front(); + return current_buffer_; +} + +void InfeedManager::ReleaseCurrentBuffer(int32 length, void* data) { + std::unique_lock<std::mutex> l(mu_); + CHECK(current_buffer_); + CHECK_EQ(length, current_buffer_->length()); + CHECK_EQ(data, current_buffer_->data()); + current_buffer_->Done(); + current_buffer_ = nullptr; +} + +} // namespace runtime +} // namespace cpu +} // namespace xla |