From 16a2a9f6bb9bf4421119b591fa56d58ee95c9a0e Mon Sep 17 00:00:00 2001 From: Yong Tang Date: Fri, 26 Jan 2018 12:45:35 -0800 Subject: Add KafkaReader for processing streaming data with Apache Kafka (#14098) * Add KafkaReader for processing streaming data with Apache Kafka Apache Kafka is a widely used distributed streaming platform in open source community. The goal of this fix is to create a contrib Reader ops (inherits ReaderBase and is similiar to TextLineReader/TFRecordReader) so that it is possible to reader Kafka streaming data from TensorFlow in a similiar fashion. This fix uses a C/C++ Apache Kafka client library librdkafka which is released under the 2-clause BSD license, and is widely used in a number of Kafka bindings such as Go, Python, C#/.Net, etc. Signed-off-by: Yong Tang * Add KafkaReader Python wrapper. Signed-off-by: Yong Tang * Add BUILD file and op registration for KafkaReader. Signed-off-by: Yong Tang * Add C++ Kernel for KafkaReader Signed-off-by: Yong Tang * Add librdkafka to third_party packages in Bazel Signed-off-by: Yong Tang * Add contrib/kafka to part of the contrib bazel file. Signed-off-by: Yong Tang * Update workspace.bzl Signed-off-by: Yong Tang * Comment out clean_deps of `tensorflow/core:framework` and `tensorflow/core:lib` so that it is possible to build with ReaderBase. See 1419 for details. Signed-off-by: Yong Tang * Add group id flag. Signed-off-by: Yong Tang * Sync offset Signed-off-by: Yong Tang * Add test cases and scipt to start and stop Kafka server (with docker) Signed-off-by: Yong Tang * Convert to KafkaConsumer from the legacy Consumer with librdkafka so that thread join does not hang. Signed-off-by: Yong Tang * Only output offset as the key. Signed-off-by: Yong Tang * Add timeout attr so that Kafka Consumer could use Signed-off-by: Yong Tang * Build Kafka kernels by default, so that to get around the linkage issue. Signed-off-by: Yong Tang * Convert KafkaReader to KafkaDataset. Signed-off-by: Yong Tang * Fix workspace.bzl for kafka with tf_http_archive Signed-off-by: Yong Tang * Add public visibility Signed-off-by: Yong Tang * Address review feedbacks Signed-off-by: Yong Tang * Optionally select Kafka support through ./configure Signed-off-by: Yong Tang --- configure.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'configure.py') diff --git a/configure.py b/configure.py index 083fed1710..16763b8c0d 100644 --- a/configure.py +++ b/configure.py @@ -1354,6 +1354,7 @@ def main(): environ_cp['TF_NEED_GCP'] = '0' environ_cp['TF_NEED_HDFS'] = '0' environ_cp['TF_NEED_JEMALLOC'] = '0' + environ_cp['TF_NEED_KAFKA'] = '0' environ_cp['TF_NEED_OPENCL_SYCL'] = '0' environ_cp['TF_NEED_COMPUTECPP'] = '0' environ_cp['TF_NEED_OPENCL'] = '0' @@ -1372,6 +1373,8 @@ def main(): 'with_hdfs_support', True, 'hdfs') set_build_var(environ_cp, 'TF_NEED_S3', 'Amazon S3 File System', 'with_s3_support', True, 's3') + set_build_var(environ_cp, 'TF_NEED_KAFKA', 'Apache Kafka Platform', + 'with_kafka_support', False, 'kafka') set_build_var(environ_cp, 'TF_ENABLE_XLA', 'XLA JIT', 'with_xla_support', False, 'xla') set_build_var(environ_cp, 'TF_NEED_GDR', 'GDR', 'with_gdr_support', -- cgit v1.2.3 From 6b23b788980787684fd9fcc716660d06c5e02ad6 Mon Sep 17 00:00:00 2001 From: elilienstein Date: Mon, 29 Jan 2018 15:27:57 -0800 Subject: Fix typos 'followings' 'optionanl' (#16549) --- configure.py | 4 ++-- tensorflow/contrib/coder/README.md | 2 +- tensorflow/core/kernels/fractional_pool_common.h | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) (limited to 'configure.py') diff --git a/configure.py b/configure.py index 16763b8c0d..27519b4aba 100644 --- a/configure.py +++ b/configure.py @@ -298,7 +298,7 @@ def get_var(environ_cp, System". enabled_by_default: boolean for default behavior. question: optional string for how to ask for user input. - yes_reply: optionanl string for reply when feature is enabled. + yes_reply: optional string for reply when feature is enabled. no_reply: optional string for reply when feature is disabled. Returns: @@ -411,7 +411,7 @@ def set_action_env_var(environ_cp, System". enabled_by_default: boolean for default behavior. question: optional string for how to ask for user input. - yes_reply: optionanl string for reply when feature is enabled. + yes_reply: optional string for reply when feature is enabled. no_reply: optional string for reply when feature is disabled. """ var = int( diff --git a/tensorflow/contrib/coder/README.md b/tensorflow/contrib/coder/README.md index e1e867db5a..c6c379c458 100644 --- a/tensorflow/contrib/coder/README.md +++ b/tensorflow/contrib/coder/README.md @@ -30,7 +30,7 @@ following sense: around, - The number of CDF axes does not extend, i.e., `CDF.ndim == data.ndim + 1`. -In the previous example where data has shape (10, 10), the followings are +In the previous example where data has shape (10, 10), the following are acceptable CDF shapes: - (10, 10, 65) diff --git a/tensorflow/core/kernels/fractional_pool_common.h b/tensorflow/core/kernels/fractional_pool_common.h index df0bbbfa06..2d7a230fc0 100644 --- a/tensorflow/core/kernels/fractional_pool_common.h +++ b/tensorflow/core/kernels/fractional_pool_common.h @@ -57,7 +57,7 @@ static inline void RandomShuffle(Iter first, Iter last, const Random& uniform) { // * sum(generated_diff_pooling_sequence) = input_length // * Let's define floor(input_length / output_length) = K, then // K <= generated_diff_pooling_sequence[i] <= K+1 -// For example, when input_length = 10, output_length = 6, the followings are +// For example, when input_length = 10, output_length = 6, the following are // valid pooling sequence: // * [1, 2, 2, 1, 2, 2] // * [1, 1, 2, 2, 2, 2] -- cgit v1.2.3