aboutsummaryrefslogtreecommitdiffhomepage
path: root/third_party/kafka
diff options
context:
space:
mode:
authorGravatar Yong Tang <yong.tang.github@outlook.com>2018-01-26 12:45:35 -0800
committerGravatar Rasmus Munk Larsen <rmlarsen@google.com>2018-01-26 12:45:35 -0800
commit16a2a9f6bb9bf4421119b591fa56d58ee95c9a0e (patch)
tree6fe4b11f40cfa08e96970350fe69f6c30f746a25 /third_party/kafka
parent995378c4c9ff156cae7a365cfdc1480a3ee6d0bf (diff)
Add KafkaReader for processing streaming data with Apache Kafka (#14098)
* Add KafkaReader for processing streaming data with Apache Kafka Apache Kafka is a widely used distributed streaming platform in open source community. The goal of this fix is to create a contrib Reader ops (inherits ReaderBase and is similiar to TextLineReader/TFRecordReader) so that it is possible to reader Kafka streaming data from TensorFlow in a similiar fashion. This fix uses a C/C++ Apache Kafka client library librdkafka which is released under the 2-clause BSD license, and is widely used in a number of Kafka bindings such as Go, Python, C#/.Net, etc. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Add KafkaReader Python wrapper. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Add BUILD file and op registration for KafkaReader. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Add C++ Kernel for KafkaReader Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Add librdkafka to third_party packages in Bazel Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Add contrib/kafka to part of the contrib bazel file. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Update workspace.bzl Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Comment out clean_deps of `tensorflow/core:framework` and `tensorflow/core:lib` so that it is possible to build with ReaderBase. See 1419 for details. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Add group id flag. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Sync offset Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Add test cases and scipt to start and stop Kafka server (with docker) Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Convert to KafkaConsumer from the legacy Consumer with librdkafka so that thread join does not hang. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Only output offset as the key. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Add timeout attr so that Kafka Consumer could use Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Build Kafka kernels by default, so that to get around the linkage issue. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Convert KafkaReader to KafkaDataset. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Fix workspace.bzl for kafka with tf_http_archive Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Add public visibility Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Address review feedbacks Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Optionally select Kafka support through ./configure Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
Diffstat (limited to 'third_party/kafka')
-rw-r--r--third_party/kafka/BUILD147
-rw-r--r--third_party/kafka/config.patch44
2 files changed, 191 insertions, 0 deletions
diff --git a/third_party/kafka/BUILD b/third_party/kafka/BUILD
new file mode 100644
index 0000000000..a61a9e1f6c
--- /dev/null
+++ b/third_party/kafka/BUILD
@@ -0,0 +1,147 @@
+# Description:
+# Kafka C/C++ (librdkafka) client library
+
+licenses(["notice"]) # 2-clause BSD license
+
+exports_files(["LICENSE"])
+
+cc_library(
+ name = "kafka",
+ srcs = [
+ "config.h",
+ "src-cpp/ConfImpl.cpp",
+ "src-cpp/ConsumerImpl.cpp",
+ "src-cpp/HandleImpl.cpp",
+ "src-cpp/KafkaConsumerImpl.cpp",
+ "src-cpp/MessageImpl.cpp",
+ "src-cpp/MetadataImpl.cpp",
+ "src-cpp/QueueImpl.cpp",
+ "src-cpp/RdKafka.cpp",
+ "src-cpp/TopicImpl.cpp",
+ "src-cpp/TopicPartitionImpl.cpp",
+ "src/crc32c.c",
+ "src/crc32c.h",
+ "src/lz4.c",
+ "src/lz4.h",
+ "src/lz4frame.c",
+ "src/lz4frame.h",
+ "src/lz4frame_static.h",
+ "src/lz4hc.c",
+ "src/lz4hc.h",
+ "src/lz4opt.h",
+ "src/queue.h",
+ "src/rd.h",
+ "src/rdaddr.c",
+ "src/rdaddr.h",
+ "src/rdatomic.h",
+ "src/rdavg.h",
+ "src/rdavl.c",
+ "src/rdavl.h",
+ "src/rdbuf.c",
+ "src/rdbuf.h",
+ "src/rdcrc32.h",
+ "src/rddl.h",
+ "src/rdendian.h",
+ "src/rdgz.c",
+ "src/rdgz.h",
+ "src/rdinterval.h",
+ "src/rdkafka.c",
+ "src/rdkafka.h",
+ "src/rdkafka_assignor.c",
+ "src/rdkafka_assignor.h",
+ "src/rdkafka_broker.c",
+ "src/rdkafka_broker.h",
+ "src/rdkafka_buf.c",
+ "src/rdkafka_buf.h",
+ "src/rdkafka_cgrp.c",
+ "src/rdkafka_cgrp.h",
+ "src/rdkafka_conf.c",
+ "src/rdkafka_conf.h",
+ "src/rdkafka_event.h",
+ "src/rdkafka_feature.c",
+ "src/rdkafka_feature.h",
+ "src/rdkafka_int.h",
+ "src/rdkafka_interceptor.c",
+ "src/rdkafka_interceptor.h",
+ "src/rdkafka_lz4.c",
+ "src/rdkafka_lz4.h",
+ "src/rdkafka_metadata.c",
+ "src/rdkafka_metadata.h",
+ "src/rdkafka_metadata_cache.c",
+ "src/rdkafka_msg.c",
+ "src/rdkafka_msg.h",
+ "src/rdkafka_msgset.h",
+ "src/rdkafka_msgset_reader.c",
+ "src/rdkafka_msgset_writer.c",
+ "src/rdkafka_offset.c",
+ "src/rdkafka_offset.h",
+ "src/rdkafka_op.c",
+ "src/rdkafka_op.h",
+ "src/rdkafka_partition.c",
+ "src/rdkafka_partition.h",
+ "src/rdkafka_pattern.c",
+ "src/rdkafka_pattern.h",
+ "src/rdkafka_proto.h",
+ "src/rdkafka_queue.c",
+ "src/rdkafka_queue.h",
+ "src/rdkafka_range_assignor.c",
+ "src/rdkafka_request.c",
+ "src/rdkafka_request.h",
+ "src/rdkafka_roundrobin_assignor.c",
+ "src/rdkafka_sasl.c",
+ "src/rdkafka_sasl.h",
+ "src/rdkafka_sasl_int.h",
+ "src/rdkafka_sasl_plain.c",
+ "src/rdkafka_subscription.c",
+ "src/rdkafka_subscription.h",
+ "src/rdkafka_timer.c",
+ "src/rdkafka_timer.h",
+ "src/rdkafka_topic.c",
+ "src/rdkafka_topic.h",
+ "src/rdkafka_transport.c",
+ "src/rdkafka_transport.h",
+ "src/rdkafka_transport_int.h",
+ "src/rdlist.c",
+ "src/rdlist.h",
+ "src/rdlog.c",
+ "src/rdlog.h",
+ "src/rdports.c",
+ "src/rdports.h",
+ "src/rdposix.h",
+ "src/rdrand.c",
+ "src/rdrand.h",
+ "src/rdregex.c",
+ "src/rdregex.h",
+ "src/rdstring.c",
+ "src/rdstring.h",
+ "src/rdsysqueue.h",
+ "src/rdtime.h",
+ "src/rdtypes.h",
+ "src/rdunittest.c",
+ "src/rdunittest.h",
+ "src/rdvarint.c",
+ "src/rdvarint.h",
+ "src/snappy.c",
+ "src/snappy.h",
+ "src/tinycthread.c",
+ "src/tinycthread.h",
+ "src/xxhash.c",
+ "src/xxhash.h",
+ ],
+ hdrs = [
+ "config.h",
+ ],
+ defines = [
+ ],
+ includes = [
+ "src",
+ "src-cpp",
+ ],
+ linkopts = [
+ "-lpthread",
+ ],
+ visibility = ["//visibility:public"],
+ deps = [
+ "@boringssl//:ssl",
+ ],
+)
diff --git a/third_party/kafka/config.patch b/third_party/kafka/config.patch
new file mode 100644
index 0000000000..fa5c2d35b4
--- /dev/null
+++ b/third_party/kafka/config.patch
@@ -0,0 +1,44 @@
+diff -Naur a/config.h b/config.h
+--- a/config.h 1970-01-01 00:00:00.000000000 +0000
++++ b/config.h 2017-10-28 00:57:03.316957390 +0000
+@@ -0,0 +1,40 @@
++#pragma once
++#define WITHOUT_OPTIMIZATION 0
++#define ENABLE_DEVEL 0
++#define ENABLE_REFCNT_DEBUG 0
++#define ENABLE_SHAREDPTR_DEBUG 0
++
++#define HAVE_ATOMICS_32 1
++#define HAVE_ATOMICS_32_SYNC 1
++
++#if (HAVE_ATOMICS_32)
++# if (HAVE_ATOMICS_32_SYNC)
++# define ATOMIC_OP32(OP1,OP2,PTR,VAL) __sync_ ## OP1 ## _and_ ## OP2(PTR, VAL)
++# else
++# define ATOMIC_OP32(OP1,OP2,PTR,VAL) __atomic_ ## OP1 ## _ ## OP2(PTR, VAL, __ATOMIC_SEQ_CST)
++# endif
++#endif
++
++#define HAVE_ATOMICS_64 1
++#define HAVE_ATOMICS_64_SYNC 1
++
++#if (HAVE_ATOMICS_64)
++# if (HAVE_ATOMICS_64_SYNC)
++# define ATOMIC_OP64(OP1,OP2,PTR,VAL) __sync_ ## OP1 ## _and_ ## OP2(PTR, VAL)
++# else
++# define ATOMIC_OP64(OP1,OP2,PTR,VAL) __atomic_ ## OP1 ## _ ## OP2(PTR, VAL, __ATOMIC_SEQ_CST)
++# endif
++#endif
++
++
++#define WITH_ZLIB 1
++#define WITH_LIBDL 1
++#define WITH_PLUGINS 0
++#define WITH_SNAPPY 1
++#define WITH_SOCKEM 1
++#define WITH_SSL 1
++#define WITH_SASL 0
++#define WITH_SASL_SCRAM 0
++#define WITH_SASL_CYRUS 0
++#define HAVE_REGEX 1
++#define HAVE_STRNDUP 1