Skip to main content

Kafka Container

This solution works well for anyone who already has a Kafka cluster setup to manage their data and does't want to integrate whylogs specific code into their application. The container is configured to listen to one or multiple topics in a cluster with one or multiple consumers on each instance of the container. The container can be configured to send profiles to s3, disk, or WhyLabs.

How it Works#

REST Container Sequence Diagram

The Kafka Container is technically the same container that we use for our REST offering but configured to consume from a Kafka cluster (optionally in addition to functioning as a REST service). It takes the same configuration options as the REST container as well. The options that are important for running it as a Kafka consumer are highlighted below. See the rest container page for full configuration docs. Check out the full run instructions for running details.

# Tells the container that it should setup Kafka consumer(s) on initialization.
KAFKA_ENABLED=true
# A JSON formatted list of the topics that the container should consume from.
KAFKA_TOPICS=["test"]
# The dataset ids to use in the generated whylogs profiles for each of the topics.
# This is required for sending profiles to WhyLabs because we need to know what dataset to
# log the profile under. It's optional otherwise.
KAFKA_TOPIC_DATASET_IDS={"test": "model-1"}
# The Kafka group id to use (https://docs.confluent.io/platform/current/clients/consumer.html#consumer-groups)
# If multiple are listed here then each of the created consumers are subscribed to the entire list
# of topics. If KAFKA_CONSUMER_THREADS > 1 (2 for example) then each container will create 2 consumers
# with 2 dedicated threads, each consumer subscribed to all of the group ids here.
KAFKA_GROUP_ID=["whylogs"]
# The Kafka bootstrap server (https://kafka.apache.org/11/documentation.html)
KAFKA_BOOTSTRAP_SERVERS=["127.0.0.1:9093"]
# The amount of consumers to create within the container. Setting this value to 3 will
# dedicate 3 threads to 3 Kafka consumers (1:1), each one operating independent of the others.
# This would behave similar to 3 separate containers configured to run a single thread each.
# How many containers and threads you use will depend on the hardware you run the containers on,
# the partitions that exist in your Kafka cluster, and your throughput requirements. Just make
# sure you don't have more consumers than you have partitions or some will be idle.
KAFKA_CONSUMER_THREADS=3
# How to treat nested values in the Kafka messages. FLATTEN will log nested keys as `a.b.foo` while IGNORE
# will just drop them.
KAFKA_MESSAGE_NESTING_BEHAVIOR=FLATTEN
# Keys to ignore in each message. You can ignore nested keys by using their fully qualified key paths separated
# by `.` in the nested message.
IGNORED_KEYS=["foo", "bar.baz"]

Data Format#

vhe container expects each Kafka message to be a possibly nested JSON map of key value pairs. Nested values will either be ignored or flattened based on configuration. Take the following payload as an example.

{
"col_a": 10,
"col_b": "June",
"group": {
"col_c": 2020
}
}

If the container is configured to flatten data then it would effectively end up executing the following code against a whylogs profile.

profile.track("col_a", 10)
profile.track("col_b", "June")
profile.track("group.col_c", 2020) // Would be omitted if configured to ignore nesting

Dataset Profile Management#

The purpose of the container is to group data into buckets of profiles for you outside of your application so that you don't have to embed whylogs specific code in your main application path. If your container is configured to group data into hourly profiles then the timestamp that Kafka stores for each message will be used to determine what profile a data point gets rolled into, rather than any timestamp field that might be present in the message itself.

Prefooter Illustration Mobile
Run AI With Certainty
Get started for free
Prefooter Illustration