diff options
Diffstat (limited to 'tensorflow/core/kernels/queue_op.h')
-rw-r--r-- | tensorflow/core/kernels/queue_op.h | 233 |
1 files changed, 212 insertions, 21 deletions
diff --git a/tensorflow/core/kernels/queue_op.h b/tensorflow/core/kernels/queue_op.h index 6c19f9841c..2efd838a5f 100644 --- a/tensorflow/core/kernels/queue_op.h +++ b/tensorflow/core/kernels/queue_op.h @@ -13,12 +13,13 @@ See the License for the specific language governing permissions and limitations under the License. ==============================================================================*/ -#ifndef TENSORFLOW_KERNELS_QUEUE_OP_H_ -#define TENSORFLOW_KERNELS_QUEUE_OP_H_ +#ifndef TENSORFLOW_CORE_KERNELS_QUEUE_OP_H_ +#define TENSORFLOW_CORE_KERNELS_QUEUE_OP_H_ #include <deque> #include "tensorflow/core/framework/op_kernel.h" +#include "tensorflow/core/framework/queue_interface.h" #include "tensorflow/core/framework/resource_op_kernel.h" #include "tensorflow/core/framework/tensor.h" #include "tensorflow/core/framework/types.h" @@ -32,22 +33,9 @@ namespace tensorflow { // Defines a QueueOp, an abstract class for Queue construction ops. class QueueOp : public ResourceOpKernel<QueueInterface> { public: - QueueOp(OpKernelConstruction* context) : ResourceOpKernel(context) { - OP_REQUIRES_OK(context, context->GetAttr("capacity", &capacity_)); - if (capacity_ < 0) { - capacity_ = QueueBase::kUnbounded; - } - OP_REQUIRES_OK(context, - context->GetAttr("component_types", &component_types_)); - } + QueueOp(OpKernelConstruction* context); - void Compute(OpKernelContext* context) override { - ResourceOpKernel<QueueInterface>::Compute(context); - mutex_lock l(mu_); - if (resource_ && context->track_allocations()) { - context->record_persistent_memory_allocation(resource_->MemoryUsed()); - } - } + void Compute(OpKernelContext* context) override; protected: // Variables accessible by subclasses @@ -55,9 +43,7 @@ class QueueOp : public ResourceOpKernel<QueueInterface> { DataTypeVector component_types_; private: - Status VerifyResource(QueueInterface* queue) override { - return queue->MatchesNodeDef(def()); - } + Status VerifyResource(QueueInterface* queue) override; }; class TypedQueueOp : public QueueOp { @@ -75,6 +61,211 @@ class TypedQueueOp : public QueueOp { } }; +// Queue manipulator kernels + +class QueueOpKernel : public AsyncOpKernel { + public: + explicit QueueOpKernel(OpKernelConstruction* context); + + void ComputeAsync(OpKernelContext* ctx, DoneCallback callback) final; + + protected: + virtual void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, + DoneCallback callback) = 0; +}; + +class QueueAccessOpKernel : public QueueOpKernel { + public: + explicit QueueAccessOpKernel(OpKernelConstruction* context); + + protected: + int64 timeout_; +}; + +// Defines an EnqueueOp, the execution of which enqueues a tuple of +// tensors in the given Queue. +// +// The op has 1 + k inputs, where k is the number of components in the +// tuples stored in the given Queue: +// - Input 0: queue handle. +// - Input 1: 0th element of the tuple. +// - ... +// - Input (1+k): kth element of the tuple. +class EnqueueOp : public QueueAccessOpKernel { + public: + explicit EnqueueOp(OpKernelConstruction* context); + + protected: + void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, + DoneCallback callback) override; + + private: + TF_DISALLOW_COPY_AND_ASSIGN(EnqueueOp); +}; + +// Defines an EnqueueManyOp, the execution of which slices each +// component of a tuple of tensors along the 0th dimension, and +// enqueues tuples of slices in the given Queue. +// +// The op has 1 + k inputs, where k is the number of components in the +// tuples stored in the given Queue: +// - Input 0: queue handle. +// - Input 1: 0th element of the tuple. +// - ... +// - Input (1+k): kth element of the tuple. +// +// N.B. All tuple components must have the same size in the 0th +// dimension. +class EnqueueManyOp : public QueueAccessOpKernel { + public: + explicit EnqueueManyOp(OpKernelConstruction* context); + + protected: + void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, + DoneCallback callback) override; + + ~EnqueueManyOp() override; + + private: + TF_DISALLOW_COPY_AND_ASSIGN(EnqueueManyOp); +}; + +// Defines a DequeueOp, the execution of which dequeues a tuple of +// tensors from the given Queue. +// +// The op has one input, which is the handle of the appropriate +// Queue. The op has k outputs, where k is the number of components in +// the tuples stored in the given Queue, and output i is the ith +// component of the dequeued tuple. +class DequeueOp : public QueueAccessOpKernel { + public: + explicit DequeueOp(OpKernelConstruction* context); + + protected: + void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, + DoneCallback callback) override; + + ~DequeueOp() override; + + private: + TF_DISALLOW_COPY_AND_ASSIGN(DequeueOp); +}; + +// Defines a DequeueManyOp, the execution of which concatenates the +// requested number of elements from the given Queue along the 0th +// dimension, and emits the result as a single tuple of tensors. +// +// The op has two inputs: +// - Input 0: the handle to a queue. +// - Input 1: the number of elements to dequeue. +// +// The op has k outputs, where k is the number of components in the +// tuples stored in the given Queue, and output i is the ith component +// of the dequeued tuple. +class DequeueManyOp : public QueueAccessOpKernel { + public: + explicit DequeueManyOp(OpKernelConstruction* context); + + protected: + void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, + DoneCallback callback) override; + + ~DequeueManyOp() override; + + private: + TF_DISALLOW_COPY_AND_ASSIGN(DequeueManyOp); +}; + +// Defines a DequeueUpToOp, the execution of which concatenates the +// requested number of elements from the given Queue along the 0th +// dimension, and emits the result as a single tuple of tensors. +// +// The difference between this op and DequeueMany is the handling when +// the Queue is closed. While the DequeueMany op will return if there +// an error when there are less than num_elements elements left in the +// closed queue, this op will return between 1 and +// min(num_elements, elements_remaining_in_queue), and will not block. +// If there are no elements left, then the standard DequeueMany error +// is returned. +// +// This op only works if the underlying Queue implementation accepts +// the allow_small_batch = true parameter to TryDequeueMany. +// If it does not, an errors::Unimplemented exception is returned. +// +// The op has two inputs: +// - Input 0: the handle to a queue. +// - Input 1: the number of elements to dequeue. +// +// The op has k outputs, where k is the number of components in the +// tuples stored in the given Queue, and output i is the ith component +// of the dequeued tuple. +// +// The op has one attribute: allow_small_batch. If the Queue supports +// it, setting this to true causes the queue to return smaller +// (possibly zero length) batches when it is closed, up to however +// many elements are available when the op executes. In this case, +// the Queue does not block when closed. +class DequeueUpToOp : public QueueAccessOpKernel { + public: + explicit DequeueUpToOp(OpKernelConstruction* context); + + protected: + void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, + DoneCallback callback) override; + + ~DequeueUpToOp() override; + + private: + TF_DISALLOW_COPY_AND_ASSIGN(DequeueUpToOp); +}; + +// Defines a QueueCloseOp, which closes the given Queue. Closing a +// Queue signals that no more elements will be enqueued in it. +// +// The op has one input, which is the handle of the appropriate Queue. +class QueueCloseOp : public QueueOpKernel { + public: + explicit QueueCloseOp(OpKernelConstruction* context); + + protected: + void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, + DoneCallback callback) override; + + private: + bool cancel_pending_enqueues_; + TF_DISALLOW_COPY_AND_ASSIGN(QueueCloseOp); +}; + +// Defines a QueueSizeOp, which computes the number of elements in the +// given Queue, and emits it as an output tensor. +// +// The op has one input, which is the handle of the appropriate Queue; +// and one output, which is a single-element tensor containing the current +// size of that Queue. +class QueueSizeOp : public QueueOpKernel { + public: + explicit QueueSizeOp(OpKernelConstruction* context); + + protected: + void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, + DoneCallback callback) override; + + private: + TF_DISALLOW_COPY_AND_ASSIGN(QueueSizeOp); +}; + +class QueueIsClosedOp : public QueueOpKernel { + public: + explicit QueueIsClosedOp(OpKernelConstruction* context); + + protected: + void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, + DoneCallback callback) override; + + private: + TF_DISALLOW_COPY_AND_ASSIGN(QueueIsClosedOp); +}; + } // namespace tensorflow -#endif // TENSORFLOW_KERNELS_QUEUE_OP_H_ +#endif // TENSORFLOW_CORE_KERNELS_QUEUE_OP_H_ |