diff options
author | Yong Tang <yong.tang.github@outlook.com> | 2018-01-26 12:45:35 -0800 |
---|---|---|
committer | Rasmus Munk Larsen <rmlarsen@google.com> | 2018-01-26 12:45:35 -0800 |
commit | 16a2a9f6bb9bf4421119b591fa56d58ee95c9a0e (patch) | |
tree | 6fe4b11f40cfa08e96970350fe69f6c30f746a25 /third_party/kafka | |
parent | 995378c4c9ff156cae7a365cfdc1480a3ee6d0bf (diff) |
Add KafkaReader for processing streaming data with Apache Kafka (#14098)
* Add KafkaReader for processing streaming data with Apache Kafka
Apache Kafka is a widely used distributed streaming platform in
open source community. The goal of this fix is to create a contrib
Reader ops (inherits ReaderBase and is similiar to
TextLineReader/TFRecordReader) so that it is possible to reader
Kafka streaming data from TensorFlow in a similiar fashion.
This fix uses a C/C++ Apache Kafka client library librdkafka which
is released under the 2-clause BSD license, and is widely used in
a number of Kafka bindings such as Go, Python, C#/.Net, etc.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Add KafkaReader Python wrapper.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Add BUILD file and op registration for KafkaReader.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Add C++ Kernel for KafkaReader
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Add librdkafka to third_party packages in Bazel
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Add contrib/kafka to part of the contrib bazel file.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Update workspace.bzl
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Comment out clean_deps of `tensorflow/core:framework` and `tensorflow/core:lib`
so that it is possible to build with ReaderBase.
See 1419 for details.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Add group id flag.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Sync offset
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Add test cases and scipt to start and stop Kafka server (with docker)
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Convert to KafkaConsumer from the legacy Consumer with librdkafka
so that thread join does not hang.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Only output offset as the key.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Add timeout attr so that Kafka Consumer could use
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Build Kafka kernels by default, so that to get around the linkage issue.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Convert KafkaReader to KafkaDataset.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Fix workspace.bzl for kafka with tf_http_archive
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Add public visibility
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Address review feedbacks
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
* Optionally select Kafka support through ./configure
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
Diffstat (limited to 'third_party/kafka')
-rw-r--r-- | third_party/kafka/BUILD | 147 | ||||
-rw-r--r-- | third_party/kafka/config.patch | 44 |
2 files changed, 191 insertions, 0 deletions
diff --git a/third_party/kafka/BUILD b/third_party/kafka/BUILD new file mode 100644 index 0000000000..a61a9e1f6c --- /dev/null +++ b/third_party/kafka/BUILD @@ -0,0 +1,147 @@ +# Description: +# Kafka C/C++ (librdkafka) client library + +licenses(["notice"]) # 2-clause BSD license + +exports_files(["LICENSE"]) + +cc_library( + name = "kafka", + srcs = [ + "config.h", + "src-cpp/ConfImpl.cpp", + "src-cpp/ConsumerImpl.cpp", + "src-cpp/HandleImpl.cpp", + "src-cpp/KafkaConsumerImpl.cpp", + "src-cpp/MessageImpl.cpp", + "src-cpp/MetadataImpl.cpp", + "src-cpp/QueueImpl.cpp", + "src-cpp/RdKafka.cpp", + "src-cpp/TopicImpl.cpp", + "src-cpp/TopicPartitionImpl.cpp", + "src/crc32c.c", + "src/crc32c.h", + "src/lz4.c", + "src/lz4.h", + "src/lz4frame.c", + "src/lz4frame.h", + "src/lz4frame_static.h", + "src/lz4hc.c", + "src/lz4hc.h", + "src/lz4opt.h", + "src/queue.h", + "src/rd.h", + "src/rdaddr.c", + "src/rdaddr.h", + "src/rdatomic.h", + "src/rdavg.h", + "src/rdavl.c", + "src/rdavl.h", + "src/rdbuf.c", + "src/rdbuf.h", + "src/rdcrc32.h", + "src/rddl.h", + "src/rdendian.h", + "src/rdgz.c", + "src/rdgz.h", + "src/rdinterval.h", + "src/rdkafka.c", + "src/rdkafka.h", + "src/rdkafka_assignor.c", + "src/rdkafka_assignor.h", + "src/rdkafka_broker.c", + "src/rdkafka_broker.h", + "src/rdkafka_buf.c", + "src/rdkafka_buf.h", + "src/rdkafka_cgrp.c", + "src/rdkafka_cgrp.h", + "src/rdkafka_conf.c", + "src/rdkafka_conf.h", + "src/rdkafka_event.h", + "src/rdkafka_feature.c", + "src/rdkafka_feature.h", + "src/rdkafka_int.h", + "src/rdkafka_interceptor.c", + "src/rdkafka_interceptor.h", + "src/rdkafka_lz4.c", + "src/rdkafka_lz4.h", + "src/rdkafka_metadata.c", + "src/rdkafka_metadata.h", + "src/rdkafka_metadata_cache.c", + "src/rdkafka_msg.c", + "src/rdkafka_msg.h", + "src/rdkafka_msgset.h", + "src/rdkafka_msgset_reader.c", + "src/rdkafka_msgset_writer.c", + "src/rdkafka_offset.c", + "src/rdkafka_offset.h", + "src/rdkafka_op.c", + "src/rdkafka_op.h", + "src/rdkafka_partition.c", + "src/rdkafka_partition.h", + "src/rdkafka_pattern.c", + "src/rdkafka_pattern.h", + "src/rdkafka_proto.h", + "src/rdkafka_queue.c", + "src/rdkafka_queue.h", + "src/rdkafka_range_assignor.c", + "src/rdkafka_request.c", + "src/rdkafka_request.h", + "src/rdkafka_roundrobin_assignor.c", + "src/rdkafka_sasl.c", + "src/rdkafka_sasl.h", + "src/rdkafka_sasl_int.h", + "src/rdkafka_sasl_plain.c", + "src/rdkafka_subscription.c", + "src/rdkafka_subscription.h", + "src/rdkafka_timer.c", + "src/rdkafka_timer.h", + "src/rdkafka_topic.c", + "src/rdkafka_topic.h", + "src/rdkafka_transport.c", + "src/rdkafka_transport.h", + "src/rdkafka_transport_int.h", + "src/rdlist.c", + "src/rdlist.h", + "src/rdlog.c", + "src/rdlog.h", + "src/rdports.c", + "src/rdports.h", + "src/rdposix.h", + "src/rdrand.c", + "src/rdrand.h", + "src/rdregex.c", + "src/rdregex.h", + "src/rdstring.c", + "src/rdstring.h", + "src/rdsysqueue.h", + "src/rdtime.h", + "src/rdtypes.h", + "src/rdunittest.c", + "src/rdunittest.h", + "src/rdvarint.c", + "src/rdvarint.h", + "src/snappy.c", + "src/snappy.h", + "src/tinycthread.c", + "src/tinycthread.h", + "src/xxhash.c", + "src/xxhash.h", + ], + hdrs = [ + "config.h", + ], + defines = [ + ], + includes = [ + "src", + "src-cpp", + ], + linkopts = [ + "-lpthread", + ], + visibility = ["//visibility:public"], + deps = [ + "@boringssl//:ssl", + ], +) diff --git a/third_party/kafka/config.patch b/third_party/kafka/config.patch new file mode 100644 index 0000000000..fa5c2d35b4 --- /dev/null +++ b/third_party/kafka/config.patch @@ -0,0 +1,44 @@ +diff -Naur a/config.h b/config.h +--- a/config.h 1970-01-01 00:00:00.000000000 +0000 ++++ b/config.h 2017-10-28 00:57:03.316957390 +0000 +@@ -0,0 +1,40 @@ ++#pragma once ++#define WITHOUT_OPTIMIZATION 0 ++#define ENABLE_DEVEL 0 ++#define ENABLE_REFCNT_DEBUG 0 ++#define ENABLE_SHAREDPTR_DEBUG 0 ++ ++#define HAVE_ATOMICS_32 1 ++#define HAVE_ATOMICS_32_SYNC 1 ++ ++#if (HAVE_ATOMICS_32) ++# if (HAVE_ATOMICS_32_SYNC) ++# define ATOMIC_OP32(OP1,OP2,PTR,VAL) __sync_ ## OP1 ## _and_ ## OP2(PTR, VAL) ++# else ++# define ATOMIC_OP32(OP1,OP2,PTR,VAL) __atomic_ ## OP1 ## _ ## OP2(PTR, VAL, __ATOMIC_SEQ_CST) ++# endif ++#endif ++ ++#define HAVE_ATOMICS_64 1 ++#define HAVE_ATOMICS_64_SYNC 1 ++ ++#if (HAVE_ATOMICS_64) ++# if (HAVE_ATOMICS_64_SYNC) ++# define ATOMIC_OP64(OP1,OP2,PTR,VAL) __sync_ ## OP1 ## _and_ ## OP2(PTR, VAL) ++# else ++# define ATOMIC_OP64(OP1,OP2,PTR,VAL) __atomic_ ## OP1 ## _ ## OP2(PTR, VAL, __ATOMIC_SEQ_CST) ++# endif ++#endif ++ ++ ++#define WITH_ZLIB 1 ++#define WITH_LIBDL 1 ++#define WITH_PLUGINS 0 ++#define WITH_SNAPPY 1 ++#define WITH_SOCKEM 1 ++#define WITH_SSL 1 ++#define WITH_SASL 0 ++#define WITH_SASL_SCRAM 0 ++#define WITH_SASL_CYRUS 0 ++#define HAVE_REGEX 1 ++#define HAVE_STRNDUP 1 |