Skip to main content

Batch Process Logging


Most batch data processing pipelines have some of these characteristics:

  • Running on a regular cadence
  • Large scale or in a distributed manner
  • Data can be transformed to a final format (ETL) or stored in an unprocessed/raw forms in data lakes (ELT)

It is a good practice to track data-related metrics for batch processing jobs. Basic metrics that are often built include:

  • Input count
  • Output count
  • Schema
  • Sum, min, max, mean, stddev

Tracking these metrics over time can help teams detect issues. One common pattern to track these metrics is to run analysis on top of a storage/ELT/SQL-based systems. However, these process tend to be manual and painstaking, and thus managing data quality is a common challenge in teams dealing with data at large scale.

Benefits of whylogs

whylogs provides a lightweight mechanism to track complex data insights for batch processing systems. whylogs integrates naturally with these distributed batch systems such as Spark or Flink.

The output of whylogs is multiple orders of magnitude smaller than the dataset, while retaining a significant amount of characteristics for the data. The data profiling technique used in whylogs is mergeable, which means that profiles generated from multiple batches of data or across a distributed system can easily be combined to produce a holistic view of a dataset's properties. This allows teams to detect common issues such as data drift much more effectively.

Example: Apache Spark

A common use of whylogs with batch datasets is to run profiling with batch datasets in Apache Spark. Users can run integration in either Scala or Python mode.

In the following example, we are reading in a public dataset as a Spark's Dataset (a.k.a DataFrame) object, and then perform whylogs profiling on this dataset.


import java.time.LocalDateTime

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SaveMode, SparkSession}
// this import adds newProfilingSession to Spark's Dataset
import com.whylogs.spark.WhyLogs._

object WhyLogsDemo extends App {
// creating a Spark session val spark = SparkSession
.master("local[*, 3]")
.appName("SparkTesting-" +
.config("spark.ui.enabled", "false")

// Creating a dataset
val raw_df =
.option("header", "true")

// Parse in the call_date string
val df = raw_df.withColumn("call_date", to_timestamp(col("Call Date"), "MM/dd/YYYY"))

// Run profiling on the Dataeset
val profiles = df
.newProfilingSession("FireDepartment") // start a new WhyLogs profiling job
.withTimeColumn("call_date") // split dataset by call_date
.groupBy("City", "Priority") // tag and group the data with categorical information
.aggProfiles() // runs the aggregation. returns a dataframe of <timestamp, datasetProfile> entries

// Write output to Parquet


whylogs v1 users will need to first install the PySpark module. This is done as a separate installation to keep the core whylogs library as lightweight as possible and minimize dependencies.

whylogs v1
pip install "whylogs[spark]"
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from whylogs.api.pyspark.experimental import collect_dataset_profile_view

spark = SparkSession.builder.appName('whylogs-testing').getOrCreate()
arrow_config_key = "spark.sql.execution.arrow.pyspark.enabled"
spark.conf.set(arrow_config_key, "true")

data_url = ""

spark_dataframe ="delimiter", ";") \
.option("inferSchema", "true") \
.csv(SparkFiles.get("winequality-red.csv"), header=True)

dataset_profile_view = collect_dataset_profile_view(input_df=spark_dataframe)


Next Steps

Prefooter Illustration Mobile
Run AI With Certainty
Get started for free
Prefooter Illustration