diff options
10 files changed, 180 insertions, 49 deletions
diff --git a/tensorflow/compiler/xla/client/computation_builder.h b/tensorflow/compiler/xla/client/computation_builder.h index f11d769746..18ab24d9e6 100644 --- a/tensorflow/compiler/xla/client/computation_builder.h +++ b/tensorflow/compiler/xla/client/computation_builder.h @@ -422,8 +422,12 @@ class ComputationBuilder { // Enqueues an outfeed instruction onto the computation. This instruction // generates outgoing data transfers for the given data. - void Outfeed(const ComputationDataHandle& operand, const Shape& shape, - const string& outfeed_config); + // + // shape_with_layout communicates the laid out shape that we want to outfeed + // -- if !ShapeUtil::Compatible(GetShape(operand), shape_with_layout) an error + // will occur. + void Outfeed(const ComputationDataHandle& operand, + const Shape& shape_with_layout, const string& outfeed_config); // Enqueues a call instruction onto the computation. ComputationDataHandle Call( diff --git a/tensorflow/compiler/xla/python/local_computation_builder.cc b/tensorflow/compiler/xla/python/local_computation_builder.cc index 2fd8936b0f..06f80cc91d 100644 --- a/tensorflow/compiler/xla/python/local_computation_builder.cc +++ b/tensorflow/compiler/xla/python/local_computation_builder.cc @@ -63,20 +63,32 @@ LocalClient* GetOrCreateLocalClient() { } Status TransferToInfeedLocal(const Literal& literal) { - VLOG(1) << "Infeeding literal without replica number."; + VLOG(1) << "Infeeding literal without replica number; shape: " + << literal.shape(); LocalClient* client = GetOrCreateLocalClient(); return client->TransferToInfeedLocal(literal, /*device_ordinal=*/0); } Status TransferToInfeedLocalReplica(const Literal& literal, int replica_number) { - VLOG(1) << "Infeeding literal to replica number: " << replica_number; + VLOG(1) << "Infeeding shape " << literal.shape() + << " to replica number: " << replica_number; LocalClient* client = GetOrCreateLocalClient(); TF_ASSIGN_OR_RETURN(int device_ordinal, client->ReplicaNumberToDeviceOrdinal(replica_number)); return client->TransferToInfeedLocal(literal, device_ordinal); } +StatusOr<std::unique_ptr<Literal>> TransferFromOutfeedLocalReplica( + const Shape& shape, int replica_number) { + VLOG(1) << "Outfeeding literal from replica number: " << replica_number + << " shape: " << shape; + LocalClient* client = GetOrCreateLocalClient(); + TF_ASSIGN_OR_RETURN(int device_ordinal, + client->ReplicaNumberToDeviceOrdinal(replica_number)); + return client->TransferFromOutfeedLocal(shape, device_ordinal); +} + LocalShapedBuffer::LocalShapedBuffer( std::unique_ptr<ScopedShapedBuffer> shaped_buffer) : shaped_buffer_(std::move(shaped_buffer)) {} @@ -111,6 +123,8 @@ StatusOr<std::unique_ptr<Literal>> CompiledLocalComputation::Execute( const std::vector<Literal>& arguments) { LocalClient* client = GetOrCreateLocalClient(); + VLOG(1) << "Execution requested with " << GetReplicaCount() << " replicas."; + // Each replica populates a StatusOr result, but only replica zero actually // retrieves its literal value. std::vector<StatusOr<std::unique_ptr<Literal>>> results(GetReplicaCount()); @@ -261,6 +275,12 @@ ComputationDataHandle LocalComputationBuilder::Infeed(const Shape& shape) { return builder_.Infeed(shape); } +void LocalComputationBuilder::Outfeed(const ComputationDataHandle& operand, + const Shape& shape, + const string& outfeed_config) { + builder_.Outfeed(operand, shape, outfeed_config); +} + ComputationDataHandle LocalComputationBuilder::ConstantLiteral( const Literal& literal) { return builder_.ConstantLiteral(literal); diff --git a/tensorflow/compiler/xla/python/local_computation_builder.h b/tensorflow/compiler/xla/python/local_computation_builder.h index 1104d7f408..61bf209834 100644 --- a/tensorflow/compiler/xla/python/local_computation_builder.h +++ b/tensorflow/compiler/xla/python/local_computation_builder.h @@ -47,6 +47,12 @@ Status TransferToInfeedLocal(const Literal& literal); // The replica number is resolved to an appropriate device ordinal. Status TransferToInfeedLocalReplica(const Literal& literal, int replica_number); +// Transfers a literal of the given shape from the outfeed of the given replica. +// +// The replica number is resolved to an appropriate device ordinal. +StatusOr<std::unique_ptr<Literal> > TransferFromOutfeedLocalReplica( + const Shape& shape, int replica_number); + // Wraps a ScopedShapedBuffer produced by copying a literal "to // device," i.e. copying a literal to a scoped buffer via the local // client. @@ -115,6 +121,9 @@ class LocalComputationBuilder { ComputationDataHandle Infeed(const Shape& shape); + void Outfeed(const ComputationDataHandle& operand, const Shape& shape, + const string& outfeed_config); + ComputationDataHandle ConstantLiteral(const Literal& literal); ComputationDataHandle Broadcast( diff --git a/tensorflow/compiler/xla/python/local_computation_builder.i b/tensorflow/compiler/xla/python/local_computation_builder.i index 7d1f057101..3204fc63fd 100644 --- a/tensorflow/compiler/xla/python/local_computation_builder.i +++ b/tensorflow/compiler/xla/python/local_computation_builder.i @@ -286,9 +286,13 @@ tensorflow::ImportNumpy(); // Literal -%typemap(in) const Literal& (std::unique_ptr<Literal> temp) { - temp = numpy::XlaLiteralFromPyObject($input); - $1 = &*temp; +%typemap(in) const Literal& (StatusOr< std::unique_ptr<Literal> > literal_status) { + literal_status = numpy::XlaLiteralFromPyObject($input); + if (!literal_status.ok()) { + PyErr_SetString(PyExc_RuntimeError, literal_status.status().ToString().c_str()); + return NULL; + } + $1 = literal_status.ValueOrDie().get(); } %typemap(out) std::unique_ptr<Literal> { @@ -311,7 +315,13 @@ tensorflow::ImportNumpy(); const int size = PySequence_Size($input); for (int i = 0; i < size; ++i) { PyObject* o = PySequence_GetItem($input, i); - temps.push_back(std::move(*numpy::XlaLiteralFromPyObject(o))); + StatusOr< std::unique_ptr<Literal> > literal_status = numpy::XlaLiteralFromPyObject(o); + if (!literal_status.ok()) { + PyErr_SetString(PyExc_RuntimeError, literal_status.status().ToString().c_str()); + Py_DECREF(o); + return NULL; + } + temps.push_back(std::move(*literal_status.ConsumeValueOrDie())); Py_DECREF(o); } $1 = &temps; @@ -320,7 +330,9 @@ tensorflow::ImportNumpy(); // Shape %typemap(in) const Shape& (Shape temp) { - if (!numpy::CheckPyShapeInfo($input)) { + Status shape_status = numpy::CheckPyShapeInfo($input); + if (!shape_status.ok()) { + PyErr_SetString(PyExc_RuntimeError, shape_status.ToString().c_str()); return NULL; } temp = numpy::XlaShapeFromPyShapeInfo($input); @@ -339,7 +351,9 @@ tensorflow::ImportNumpy(); const int size = PySequence_Size($input); for (int i = 0; i < size; ++i) { PyObject* o = PySequence_GetItem($input, i); - if (!numpy::CheckPyShapeInfo(o)) { + Status shape_status = numpy::CheckPyShapeInfo(o); + if (!shape_status.ok()) { + PyErr_SetString(PyExc_RuntimeError, shape_status.ToString().c_str()); Py_DECREF(o); return NULL; } @@ -561,6 +575,7 @@ tensorflow::ImportNumpy(); %unignore xla::swig::GetReplicaCount; %unignore xla::swig::TransferToInfeedLocal; %unignore xla::swig::TransferToInfeedLocalReplica; +%unignore xla::swig::TransferFromOutfeedLocalReplica; %unignore xla::swig::LocalShapedBuffer; %unignore xla::swig::LocalShapedBuffer::FromLiteral; %unignore xla::swig::LocalShapedBuffer::ToLiteral; @@ -575,6 +590,7 @@ tensorflow::ImportNumpy(); %unignore xla::swig::LocalComputationBuilder::Parameter; %unignore xla::swig::LocalComputationBuilder::GetShape; %unignore xla::swig::LocalComputationBuilder::Infeed; +%unignore xla::swig::LocalComputationBuilder::Outfeed; %unignore xla::swig::LocalComputationBuilder::ConstantLiteral; %unignore xla::swig::LocalComputationBuilder::ConstantR0; %unignore xla::swig::LocalComputationBuilder::Broadcast; diff --git a/tensorflow/compiler/xla/python/numpy_bridge.cc b/tensorflow/compiler/xla/python/numpy_bridge.cc index d88d78e474..ae283db2fd 100644 --- a/tensorflow/compiler/xla/python/numpy_bridge.cc +++ b/tensorflow/compiler/xla/python/numpy_bridge.cc @@ -139,62 +139,84 @@ static int NumpyTypenum(PyObject* o) { return reinterpret_cast<PyArray_Descr*>(o)->type_num; } -bool CheckPyShapeInfo(PyObject* o) { +// Safely returns a repr of the given Python object o as a C++ string. +static string PyObjectCppRepr(PyObject* o) { + PyObject* r = PyObject_Repr(o); + auto error = [r] { + return tensorflow::strings::Printf("<repr-failed object %p>", r); + }; + if (r == nullptr) { + return error(); + } +#if PY_MAJOR_VERSION < 3 + string result = PyString_AsString(r); +#else + PyObject* bytes = PyUnicode_AsEncodedString(r, 0, 0); + if (bytes == nullptr) { + return error(); + } + CHECK(PyBytes_Check(bytes)); + string result = PyBytes_AsString(bytes); + Py_DECREF(bytes); +#endif + Py_DECREF(r); + return result; +} + +Status CheckPyShapeInfo(PyObject* o) { + auto error = [o](const string& prefix) { + return InvalidArgument("%s; got %s", prefix.c_str(), + PyObjectCppRepr(o).c_str()); + }; // The object is a tuple (a pair) if (!PyTuple_Check(o)) { - PyErr_SetString(PyExc_TypeError, "Shape record must be a tuple"); - return false; + return error("Shape record must be a tuple"); } if (PyTuple_Size(o) != 2) { - PyErr_SetString(PyExc_ValueError, "Shape record tuple must be of length 2"); - return false; + return error("Shape record tuple must be of length 2"); } // It has a first element, which is a numpy dtype object PyObject* first = PyTuple_GetItem(o, 0); - if (!first) { - return false; + if (first == nullptr) { + return error("Tuple has no item 0 (shape dtype)"); } if (first->ob_type != &PyArrayDescr_Type) { - PyErr_SetString( - PyExc_TypeError, + return error( "Shape record does not have a numpy dtype as its first element"); - return false; } const int np_type = NumpyTypenum(first); if (!NumpyTypeIsValid(np_type)) { - PyErr_SetString(PyExc_ValueError, - "Shape record has an invalid integer dtype"); - return false; + return error("Shape record has an invalid integer dtype"); } // It has a second element, which is a tuple, either of shape // records or of Python ints PyObject* second = PyTuple_GetItem(o, 1); if (!second) { - return false; + return error("Tuple has no item 0 (shape dimensions)"); } if (!PyTuple_Check(second)) { - PyErr_SetString(PyExc_TypeError, - "Shape record does not have a tuple as its second element"); - return false; + return error("Shape record does not have a tuple as its second element"); } const int length = PyTuple_Size(second); const PrimitiveType element_type = NumpyTypeToPrimitiveType(np_type); for (int i = 0; i < length; i++) { PyObject* dimension = PyTuple_GetItem(second, i); if (element_type == TUPLE) { - if (!CheckPyShapeInfo(dimension)) { - return false; + VLOG(3) << "element_type is tuple, checking member: " << i; + Status result = CheckPyShapeInfo(dimension); + if (!result.ok()) { + return AddStatus( + result, tensorflow::strings::StrCat("Validating tuple member ", i, + " of ", PyObjectCppRepr(o))); } } else if (!CheckPyIntOrLong(dimension)) { - PyErr_SetString(PyExc_TypeError, - "Non-tuple shape record has a non-integer dimension"); - return false; + return error("Non-tuple shape record has a non-integer dimension"); } } - return true; + return Status::OK(); } // Precondition: CheckPyShapeInfo(o) @@ -247,14 +269,15 @@ PyObject* PyObjectFromXlaLiteral(const Literal& literal) { } } -std::unique_ptr<Literal> XlaLiteralFromPyObject(PyObject* o) { +StatusOr<std::unique_ptr<Literal>> XlaLiteralFromPyObject(PyObject* o) { if (PyTuple_Check(o)) { int num_elements = PyTuple_Size(o); std::vector<std::unique_ptr<Literal>> elements; elements.reserve(num_elements); for (int i = 0; i < num_elements; i++) { PyObject* element = PyTuple_GetItem(o, i); - elements.push_back(XlaLiteralFromPyObject(element)); + TF_ASSIGN_OR_RETURN(auto literal, XlaLiteralFromPyObject(element)); + elements.push_back(std::move(literal)); } return Literal::MakeTupleOwned(std::move(elements)); } else if (PyArray_Check(o)) { @@ -267,16 +290,17 @@ std::unique_ptr<Literal> XlaLiteralFromPyObject(PyObject* o) { int np_type = PyArray_TYPE(py_array); auto literal = Literal::CreateFromDimensions( NumpyTypeToPrimitiveType(np_type), dimensions); - CopyNumpyArrayToLiteral(np_type, py_array, literal.get()); - return literal; + TF_RETURN_IF_ERROR( + CopyNumpyArrayToLiteral(np_type, py_array, literal.get())); + return std::move(literal); } else { - LOG(FATAL) - << "Non-tuple or Numpy array encountered in conversion to XLA literal"; + return InvalidArgument( + "Non-tuple or Numpy array encountered in conversion to XLA literal."); } } -void CopyNumpyArrayToLiteral(int np_type, PyArrayObject* py_array, - Literal* literal) { +Status CopyNumpyArrayToLiteral(int np_type, PyArrayObject* py_array, + Literal* literal) { switch (np_type) { case NPY_BOOL: CopyNumpyArrayToLiteral<bool>(py_array, literal); @@ -306,8 +330,10 @@ void CopyNumpyArrayToLiteral(int np_type, PyArrayObject* py_array, CopyNumpyArrayToLiteral<double>(py_array, literal); break; default: - LOG(FATAL) << "No XLA literal container for Numpy type" << np_type; + return InvalidArgument( + "No XLA literal container for Numpy type number: %d", np_type); } + return Status::OK(); } void CopyLiteralToNumpyArray(int np_type, const Literal& literal, diff --git a/tensorflow/compiler/xla/python/numpy_bridge.h b/tensorflow/compiler/xla/python/numpy_bridge.h index 3f39869765..554fc84ffe 100644 --- a/tensorflow/compiler/xla/python/numpy_bridge.h +++ b/tensorflow/compiler/xla/python/numpy_bridge.h @@ -59,7 +59,7 @@ PyObject* PyShapeInfoFromXlaShape(const Shape& shape); // Returns the outcome of a best-effort check that the Python object // is a pair of the form (numpy dtype, dimensions), as produced by // PyShapeInfoFromXlaShape. -bool CheckPyShapeInfo(PyObject* o); +Status CheckPyShapeInfo(PyObject* o); // Performs the inverse conversion to that of PyShapeInfoFromXlaShape. // @@ -82,13 +82,13 @@ PyObject* PyObjectFromXlaLiteral(const Literal& literal); // To avoid transferring ownership of the data buffers that underlie // PyArrays and XLA literals, this function makes deep copies of all // array data. -std::unique_ptr<Literal> XlaLiteralFromPyObject(PyObject* o); +StatusOr<std::unique_ptr<Literal> > XlaLiteralFromPyObject(PyObject* o); // The following functions copy array data from the buffers underlying Numpy // ndarrays into those underlying XLA literals, and vice versa. -void CopyNumpyArrayToLiteral(int np_type, PyArrayObject* py_array, - Literal* literal); +Status CopyNumpyArrayToLiteral(int np_type, PyArrayObject* py_array, + Literal* literal); void CopyLiteralToNumpyArray(int np_type, const Literal& literal, PyArrayObject* py_array); diff --git a/tensorflow/compiler/xla/python/xla_client.py b/tensorflow/compiler/xla/python/xla_client.py index fead7d6259..60573c9bb3 100644 --- a/tensorflow/compiler/xla/python/xla_client.py +++ b/tensorflow/compiler/xla/python/xla_client.py @@ -139,6 +139,10 @@ class Shape(object): self.np_dtype = np_dtype self._dimensions = dimensions + def __repr__(self): + return 'xla_client.Shape(np_dtype={!r}, dimensions={!r})'.format( + self.np_dtype, self._dimensions) + def element_type(self): return DTYPE_TO_XLA_ELEMENT_TYPE[str(self.np_dtype)] @@ -229,6 +233,21 @@ def transfer_to_infeed(value, replica_number=None): require_numpy_array_layout(value), replica_number) +def transfer_from_outfeed(shape, replica_number=None): + """Transfers a literal of the given shape from replica_number's outfeed. + + Args: + shape: The shape of the value to transfer from outfeed. + replica_number: The replica number ordinal to transfer the outfeed value + from. (Each replica has a distinct outfeed queue.) + + Returns: + The literal value that is produced from the outfeed queue. + """ + return c_api.TransferFromOutfeedLocalReplica( + _unwrap_shape(shape), replica_number or 0) + + class LocalComputation(object): """Python wrapper for a local XLA Computation. @@ -312,6 +331,16 @@ class ComputationBuilder(object): """ return _wrap_data_handle(self._client.Infeed(_unwrap_shape(shape))) + def Outfeed(self, operand): + """Enqueues an outfeed op onto the computation. + + Outfeed operations enqueue data, using the given operand, onto the XLA + outfeed queue for subsequent dequeue via the client API. + """ + self._client.Outfeed( + _unwrap_data_handle(operand), _unwrap_shape(self.GetShape(operand)), + ''.encode('utf-8')) + def Constant(self, value): """Enqueues a constant op onto the computation. diff --git a/tensorflow/compiler/xla/python/xla_client_test.py b/tensorflow/compiler/xla/python/xla_client_test.py index b195256769..12f689ff2e 100644 --- a/tensorflow/compiler/xla/python/xla_client_test.py +++ b/tensorflow/compiler/xla/python/xla_client_test.py @@ -19,6 +19,7 @@ from __future__ import division from __future__ import print_function import itertools +import threading import numpy as np @@ -1053,6 +1054,23 @@ class EmbeddedComputationsTest(LocalComputationTest): result = compiled_c.Execute() self.assertEqual(result, item) + def testInfeedThenOutfeedS32(self): + to_round_trip = NumpyArrayS32([1, 2, 3, 4]) + c = self._NewComputation() + x = c.Infeed(xla_client.Shape.from_numpy(to_round_trip[0])) + c.Outfeed(x) + + compiled_c = c.Build().CompileWithExampleArguments() + + for want in to_round_trip: + execution = threading.Thread(target=compiled_c.Execute) + execution.start() + xla_client.transfer_to_infeed(want) + got = xla_client.transfer_from_outfeed( + xla_client.Shape.from_numpy(to_round_trip[0])) + execution.join() + self.assertEqual(want, got) + class ErrorTest(LocalComputationTest): diff --git a/tensorflow/compiler/xla/service/cpu/xfeed_manager.cc b/tensorflow/compiler/xla/service/cpu/xfeed_manager.cc index d0f2142029..47543b2082 100644 --- a/tensorflow/compiler/xla/service/cpu/xfeed_manager.cc +++ b/tensorflow/compiler/xla/service/cpu/xfeed_manager.cc @@ -41,6 +41,8 @@ void XfeedQueueManager::EnqueueBuffersAtomically( tensorflow::mutex_lock l(mu_); bool was_empty = enqueued_buffers_.empty(); for (XfeedBuffer* b : buffers) { + VLOG(3) << "Enqueueing " << queue_name_ << " buffer (of " << buffers.size() + << " buffers) with length: " << b->length(); enqueued_buffers_.push_back(b); } if (was_empty && !buffers.empty()) { @@ -54,9 +56,11 @@ void XfeedQueueManager::EnqueueBuffersAtomically( XfeedBuffer* XfeedQueueManager::BlockingDequeueBuffer() { tensorflow::mutex_lock l(mu_); + VLOG(3) << "Waiting for an available buffer."; while (enqueued_buffers_.empty()) { cv_.wait(l); } + VLOG(3) << "A buffer is available!"; CHECK(current_buffer_ == nullptr); current_buffer_ = enqueued_buffers_.front(); enqueued_buffers_.pop_front(); @@ -65,6 +69,9 @@ XfeedBuffer* XfeedQueueManager::BlockingDequeueBuffer() { void XfeedQueueManager::ReleaseCurrentBuffer(int32 length, void* data, StatusOr<Shape> shape) { + VLOG(3) << "Releasing buffer with shape: " + << (shape.ok() ? ShapeUtil::HumanString(shape.ValueOrDie()) + : "<error status>"); tensorflow::mutex_lock l(mu_); CHECK(current_buffer_ != nullptr); CHECK_EQ(length, current_buffer_->length()); diff --git a/tensorflow/compiler/xla/service/cpu/xfeed_manager.h b/tensorflow/compiler/xla/service/cpu/xfeed_manager.h index 6af5570005..b4ace23260 100644 --- a/tensorflow/compiler/xla/service/cpu/xfeed_manager.h +++ b/tensorflow/compiler/xla/service/cpu/xfeed_manager.h @@ -50,7 +50,7 @@ class XfeedBuffer { // Reusable component for managing the infeed and outfeed queue state. class XfeedQueueManager { public: - XfeedQueueManager() = default; + XfeedQueueManager(string queue_name) : queue_name_(queue_name) {} // Calls the completion callback for any enqueued buffers that have // not been dequeued by the runtime, and empties the @@ -86,6 +86,8 @@ class XfeedQueueManager { void ReleaseCurrentBuffer(int32 length, void* data, StatusOr<Shape> shape); private: + const string queue_name_; + tensorflow::mutex mu_; // Condition variable that is signaled every time a buffer is @@ -112,8 +114,8 @@ class XfeedManager { XfeedQueueManager* outfeed() { return &outfeed_; } private: - XfeedQueueManager infeed_; - XfeedQueueManager outfeed_; + XfeedQueueManager infeed_ = {"infeed"}; + XfeedQueueManager outfeed_ = {"outfeed"}; }; } // namespace runtime |