aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/core/kernels/sparse_matmul_op.cc
diff options
context:
space:
mode:
authorGravatar A. Unique TensorFlower <gardener@tensorflow.org>2017-02-27 11:32:44 -0800
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2017-02-27 12:09:10 -0800
commitdb8ea4ff07ad75cf5f0220428fbe4b84fcf68f4a (patch)
treefb6288c5b6dcc6d28167c745fa5b7688417cbaf8 /tensorflow/core/kernels/sparse_matmul_op.cc
parent332ee5051fc38babb53cc8cbf3c3120e5651f4e8 (diff)
- Upgraded libxsmm to 1.7.1.
- Applied LLVM optimization patch to libxsmm (https://github.com/hfp/libxsmm/commit/0e412d5d2769a8754cace64e56e26e14093f887d.patch). - Limited outstanding libxsmm sparse matrix multiply handle counts to limit memory usage for temporary space. - Added extra logging to libxsmm handle management in TensorFlow. - Added support for running multiple sparse matrix multiplies simultaneously in performance benchmark to match some practical use cases. - Added more size combinations to sparse matrix multiply benchmark. - Fixed dependencies for xsmm_conv2d_test. Change: 148672973
Diffstat (limited to 'tensorflow/core/kernels/sparse_matmul_op.cc')
-rw-r--r--tensorflow/core/kernels/sparse_matmul_op.cc250
1 files changed, 198 insertions, 52 deletions
diff --git a/tensorflow/core/kernels/sparse_matmul_op.cc b/tensorflow/core/kernels/sparse_matmul_op.cc
index a9ea7261cc..06734a6a2a 100644
--- a/tensorflow/core/kernels/sparse_matmul_op.cc
+++ b/tensorflow/core/kernels/sparse_matmul_op.cc
@@ -837,6 +837,15 @@ class SparseMatMul {
};
#ifdef TENSORFLOW_USE_LIBXSMM
+#ifdef EXTRA_CACHE_LOGGING
+static tensorflow::mutex global_cache_stats_lock;
+static int total_num_entries_outstanding GUARDED_BY(global_cache_stats_lock) =
+ 0;
+static int total_num_entries_in_cache GUARDED_BY(global_cache_stats_lock) = 0;
+#endif // EXTRA_CACHE_LOGGING
+
+static const int max_entries_per_graph_node = 40;
+
template <typename TL, typename TR>
class LibxsmmSparseMatMul {
typedef Eigen::Tensor<TL, 2, Eigen::RowMajor> MatrixL;
@@ -852,6 +861,7 @@ class LibxsmmSparseMatMul {
MatrixMapR;
public:
+#if 1
// This structure contains a set of libxsmm kernels for sizes that have been
// encountered previously by this operator so that libxsmm does not need to
// reallocate its scratchpad memory each time (which hurts performance
@@ -870,57 +880,181 @@ class LibxsmmSparseMatMul {
// useful (it is an empty struct right now)
typename SparseMatMul<TL, TR>::TensorInfoCache
non_libxsmm_cache; // Currently not used
+ TF_DISALLOW_COPY_AND_ASSIGN(TensorInfoCacheEntry);
+ ~TensorInfoCacheEntry() {
+#ifdef EXTRA_CACHE_LOGGING
+ LOG(INFO) << "Deleting tensor cache entry at " << (void*)this;
+#endif // EXTRA_CACHE_LOGGING
+ libxsmm_spmdm_destroy(&handle);
+ }
};
- // protects entries; invariant: entries is a valid std::multimap
+ // protects entries; invariant: entries is a valid std::list.
tensorflow::mutex lock;
// Because there could be multiple matrix multiplies with the same sizes
// going on at the same time, we need to allow multiple cache entries for a
// given set of parameters. Taking and returning entries is used to make
// sure the same cache entry is not used from two threads at a time.
- std::multimap<std::tuple<int, int, int, int>,
- std::unique_ptr<TensorInfoCacheEntry>>
- entries GUARDED_BY(lock);
-
- TensorInfoCache() : lock(), entries() {}
+ using entries_map_type = std::list<std::pair<
+ std::tuple<int, int, int, int>,
+ std::unique_ptr<TensorInfoCacheEntry>>>; // multimap in LRU order
+ entries_map_type entries GUARDED_BY(
+ lock); // MRU element at end so reverse search will find it first
+ int num_entries_outstanding GUARDED_BY(lock);
+
+ TensorInfoCache() : lock(), entries(), num_entries_outstanding(0) {}
// Look up and remove first entry with these parameters, creating one if
// there isn't one
std::unique_ptr<TensorInfoCacheEntry> take_cache_entry(int M, int K, int N,
int max_threads)
- LOCKS_EXCLUDED(lock) {
+#ifdef EXTRA_CACHE_LOGGING
+ LOCKS_EXCLUDED(lock, global_cache_stats_lock)
+#else
+ LOCKS_EXCLUDED(lock)
+#endif
+ {
tensorflow::mutex_lock ml(lock);
+#ifdef EXTRA_CACHE_LOGGING
+ tensorflow::mutex_lock ml2(global_cache_stats_lock);
+#endif
auto key = std::make_tuple(M, K, N, max_threads);
- auto it = entries.find(key);
+ auto it_rev =
+ std::find_if(entries.rbegin(), entries.rend(),
+ [&](const typename entries_map_type::value_type& e) {
+ return e.first == key;
+ });
+ auto it =
+ (it_rev == entries.rend() ? entries.end() : std::next(it_rev).base());
if (it != entries.end()) {
auto val = std::move(it->second);
entries.erase(it);
+ ++num_entries_outstanding;
+#ifdef EXTRA_CACHE_LOGGING
+ ++total_num_entries_outstanding;
+ --total_num_entries_in_cache;
+ LOG(INFO) << "Used existing cache entry at " << (void*)val.get()
+ << " for " << M << "x" << K << "x" << N << " max_threads "
+ << max_threads
+ << ", num_entries_outstanding = " << num_entries_outstanding
+ << ", new cache size = " << entries.size()
+ << ", total num_entries_outstanding = "
+ << total_num_entries_outstanding
+ << ", total cache size = " << total_num_entries_in_cache;
+#endif
return val;
} else {
+ while (!entries.empty() &&
+ entries.size() + num_entries_outstanding + 1 >
+ max_entries_per_graph_node) {
+#ifdef EXTRA_CACHE_LOGGING
+ LOG(INFO) << "Removing old cache entry at "
+ << (void*)entries.front().second.get();
+#endif
+ entries.pop_front();
+ }
std::unique_ptr<TensorInfoCacheEntry> e{
new TensorInfoCacheEntry{M, K, N, max_threads, {}, nullptr}};
// setup scoped allocator, which uses cpu_allocator() for this scope
const libxsmm_tf_allocator<libxsmm_scratch_allocator> tf_allocator;
libxsmm_spmdm_init(M, N, K, max_threads, &e->handle, &e->output_csr);
+ ++num_entries_outstanding;
+#ifdef EXTRA_CACHE_LOGGING
+ ++total_num_entries_outstanding;
+ LOG(INFO) << "Created cache entry at " << (void*)e.get() << " for " << M
+ << "x" << K << "x" << N << " max_threads " << max_threads
+ << ", num_entries_outstanding = " << num_entries_outstanding
+ << ", new cache size = " << entries.size()
+ << ", total num_entries_outstanding = "
+ << total_num_entries_outstanding
+ << ", total cache size = " << total_num_entries_in_cache;
+#endif
return e;
}
}
// Add a cache entry with certain parameters
void return_cache_entry(std::unique_ptr<TensorInfoCacheEntry> e)
- LOCKS_EXCLUDED(lock) {
+#ifdef EXTRA_CACHE_LOGGING
+ LOCKS_EXCLUDED(lock, global_cache_stats_lock)
+#else
+ LOCKS_EXCLUDED(lock)
+#endif
+ {
tensorflow::mutex_lock ml(lock);
+#ifdef EXTRA_CACHE_LOGGING
+ tensorflow::mutex_lock ml2(global_cache_stats_lock);
+#endif
auto key = std::make_tuple(e->M, e->K, e->N, e->max_threads);
- entries.insert(std::make_pair(key, std::move(e)));
+ --num_entries_outstanding;
+#ifdef EXTRA_CACHE_LOGGING
+ --total_num_entries_outstanding;
+ LOG(INFO) << "Returned cache entry at " << (void*)e.get() << " for "
+ << e->M << "x" << e->K << "x" << e->N << " max_threads "
+ << e->max_threads
+ << ", num_entries_outstanding = " << num_entries_outstanding
+ << ", prev cache size = " << entries.size()
+ << ", total num_entries_outstanding = "
+ << total_num_entries_outstanding
+ << ", total cache size = " << total_num_entries_in_cache;
+#endif
+ entries.push_back(std::make_pair(key, std::move(e)));
+#ifdef EXTRA_CACHE_LOGGING
+ ++total_num_entries_in_cache;
+#endif
}
~TensorInfoCache() {
tensorflow::mutex_lock ml(lock);
- for (auto& p : entries) {
- libxsmm_spmdm_destroy(&p.second->handle);
- }
+#ifdef EXTRA_CACHE_LOGGING
+ tensorflow::mutex_lock ml2(global_cache_stats_lock);
+ LOG(INFO) << "Deleting TensorInfoCache, cache size = " << entries.size()
+ << ", total num_entries_outstanding = "
+ << total_num_entries_outstanding
+ << ", total cache size = " << total_num_entries_in_cache;
+#endif
+ CHECK_EQ(num_entries_outstanding, 0);
entries.clear();
}
private:
TF_DISALLOW_COPY_AND_ASSIGN(TensorInfoCache);
};
+#else
+ // This structure contains a set of libxsmm kernels for sizes that have been
+ // encountered previously by this operator so that libxsmm does not need to
+ // reallocate its scratchpad memory each time (which hurts performance
+ // substantially).
+ struct TensorInfoCache {
+ struct TensorInfoCacheEntry {
+ // Parameters for kernel
+ int M;
+ int K;
+ int N;
+ int max_threads;
+ // libxsmm handle and matrix data
+ libxsmm_spmdm_handle handle;
+ libxsmm_CSR_sparseslice* output_csr;
+ // Chain to non-libxsmm implementation's cache in case that ever becomes
+ // useful (it is an empty struct right now)
+ typename SparseMatMul<TL, TR>::TensorInfoCache
+ non_libxsmm_cache; // Currently not used
+ };
+ TensorInfoCache() {}
+ // Look up and remove first entry with these parameters, creating one if
+ // there isn't one
+ std::unique_ptr<TensorInfoCacheEntry> take_cache_entry(int M, int K, int N,
+ int max_threads) {
+ std::unique_ptr<TensorInfoCacheEntry> e{
+ new TensorInfoCacheEntry{M, K, N, max_threads, {}, nullptr}};
+ libxsmm_spmdm_init(M, N, K, max_threads, &e->handle, &e->output_csr);
+ return e;
+ }
+ // Add a cache entry with certain parameters
+ void return_cache_entry(std::unique_ptr<TensorInfoCacheEntry> e) {
+ libxsmm_spmdm_destroy(&e->handle);
+ }
+
+ private:
+ TF_DISALLOW_COPY_AND_ASSIGN(TensorInfoCache);
+ };
+#endif
// Perform matrix multiplication of "left" and "right", and store the result
// in *"output".
@@ -1345,21 +1479,21 @@ inline void SparseMatMul<TL, TR>::ComputeBlockSizes(
template <typename F>
void do_on_all_threads(const DeviceBase::CpuWorkerThreads* thread_pool,
- const F& f) {
+ ptrdiff_t max_thread_count, const F& f) {
int num_threads = thread_pool->num_threads;
if (num_threads == 0) {
LOG(FATAL) << "Have 0 threads in thread pool";
} else if (num_threads == 1) {
- f(0);
+ f(0, 1);
} else {
BlockingCounter counter(num_threads - 1);
for (int i = 1; i < num_threads; ++i) {
thread_pool->workers->Schedule([&, i]() {
- f(i);
+ f(i, num_threads);
counter.DecrementCount();
});
}
- f(0);
+ f(0, num_threads);
counter.Wait();
}
}
@@ -1453,11 +1587,13 @@ inline void LibxsmmSparseMatMul<TL, TR>::Compute(
const int left_dim1 = transpose_left ? left.dimension(0) : left.dimension(1);
const int right_dim0 = right.dimension(0);
const int right_dim1 = right.dimension(1);
+ const int output_dim0 =
+ transpose_output ? output->dimension(1) : output->dimension(0);
+ const int output_dim1 =
+ transpose_output ? output->dimension(0) : output->dimension(1);
CHECK_EQ(left_dim1, right_dim0);
- CHECK_EQ(left_dim0,
- (transpose_output ? output->dimension(1) : output->dimension(0)));
- CHECK_EQ(right_dim1,
- (transpose_output ? output->dimension(0) : output->dimension(1)));
+ CHECK_EQ(left_dim0, output_dim0);
+ CHECK_EQ(right_dim1, output_dim1);
if (left_dim0 < 32 || left_dim1 < 32 || right_dim1 < 32) {
// Causes problems in libxsmm
SparseMatMul<TL, TR>::Compute(
@@ -1475,42 +1611,52 @@ inline void LibxsmmSparseMatMul<TL, TR>::Compute(
// Convert the left matrix to compressed sparse row (CSR) format
ptrdiff_t total_num_creation_blocks =
libxsmm_spmdm_get_num_createSparseSlice_blocks(&entry->handle);
+ ptrdiff_t total_num_mult_blocks =
+ libxsmm_spmdm_get_num_compute_blocks(&entry->handle);
+ bool use_libxsmm =
+ !(total_num_creation_blocks + total_num_mult_blocks < num_threads &&
+ !transpose_left && !transpose_output);
+ if (!use_libxsmm) {
+ // Avoid some performance issues in libxsmm (FIXME)
+ cache->return_cache_entry(std::move(entry));
+ SparseMatMul<TL, TR>::Compute(
+ nullptr /* Assumes no cached data for fallback */, left, right,
+ transpose_left, thread_pool, transpose_output, output);
+ return;
+ }
std::atomic<int> cur_create_block_number;
cur_create_block_number.store(0);
- do_on_all_threads(thread_pool, [&](int i) {
- PinnedToCurrentCPU pin;
- while (true) {
- int work_item = cur_create_block_number.fetch_add(1);
- if (work_item >= total_num_creation_blocks) break;
- wrapper_libxsmm_spmdm_createSparseSlice_generic_thread(
- empty_type_wrapper<TL>{}, &entry->handle,
- (transpose_left ? 'Y' : 'N'), left_data, entry->output_csr, work_item,
- i, num_threads);
- }
- });
+ do_on_all_threads(thread_pool, total_num_creation_blocks,
+ [&](int i, int actual_num_threads) {
+ PinnedToCurrentCPU pin;
+ while (true) {
+ int work_item = cur_create_block_number.fetch_add(1);
+ if (work_item >= total_num_creation_blocks) break;
+ wrapper_libxsmm_spmdm_createSparseSlice_generic_thread(
+ empty_type_wrapper<TL>{}, &entry->handle,
+ (transpose_left ? 'T' : 'N'), left_data,
+ entry->output_csr, work_item, i,
+ actual_num_threads);
+ }
+ });
// Do matrix-matrix multiplication
- // TODO(jewillco): libxsmm doesn't support beta != 1 yet -- remove when
- // release
- // includes beta handling
- memset(output_data, 0, left_dim0 * right_dim1 * sizeof(TR));
- ptrdiff_t total_num_mult_blocks =
- libxsmm_spmdm_get_num_compute_blocks(&entry->handle);
std::atomic<int> cur_mult_block_number;
cur_mult_block_number.store(0);
- do_on_all_threads(thread_pool, [&](int i) {
- PinnedToCurrentCPU pin;
- while (true) {
- int work_item = cur_mult_block_number.fetch_add(1);
- if (work_item >= total_num_mult_blocks) break;
- const TL alpha(1.0); // Stored in a variable so we can get a pointer
- const TL beta(0.0); // Stored in a variable so we can get a pointer
- wrapper_libxsmm_spmdm_compute_generic_thread(
- empty_type_wrapper<TL>{}, &entry->handle,
- (transpose_left ? 'Y' : 'N'), 'N', &alpha, entry->output_csr,
- right_data, (transpose_output ? 'Y' : 'N'), &beta, output_data,
- work_item, i, num_threads);
- }
- });
+ do_on_all_threads(
+ thread_pool, total_num_mult_blocks, [&](int i, int actual_num_threads) {
+ PinnedToCurrentCPU pin;
+ while (true) {
+ int work_item = cur_mult_block_number.fetch_add(1);
+ if (work_item >= total_num_mult_blocks) break;
+ const TL alpha(1.0); // Stored in a variable so we can get a pointer
+ const TL beta(0.0); // Stored in a variable so we can get a pointer
+ wrapper_libxsmm_spmdm_compute_generic_thread(
+ empty_type_wrapper<TL>{}, &entry->handle,
+ (transpose_left ? 'T' : 'N'), 'N', &alpha, entry->output_csr,
+ right_data, (transpose_output ? 'T' : 'N'), &beta, output_data,
+ work_item, i, actual_num_threads);
+ }
+ });
// Put handle + CSR storage back into cache
cache->return_cache_entry(std::move(entry));
}