Batch Process Logging
Motivation
Most batch data processing pipelines have some of these characteristics:
- Running on a regular cadence
- Large scale or in a distributed manner
- Data can be transformed to a final format (ETL) or stored in an unprocessed/raw forms in data lakes (ELT)
It is a good practice to track data-related metrics for batch processing jobs. Basic metrics that are often built include:
- Input count
- Output count
- Schema
- Sum, min, max, mean, stddev
Tracking these metrics over time can help teams detect issues. One common pattern to track these metrics is to run analysis on top of a storage/ELT/SQL-based systems. However, these process tend to be manual and painstaking, and thus managing data quality is a common challenge in teams dealing with data at large scale.
Benefits of whylogs
whylogs provides a lightweight mechanism to track complex data insights for batch processing systems. whylogs integrates naturally with these distributed batch systems such as Spark or Flink.
The output of whylogs is multiple orders of magnitude smaller than the dataset, while retaining a significant amount of characteristics for the data. The data profiling technique used in whylogs is mergeable, which means that profiles generated from multiple batches of data or across a distributed system can easily be combined to produce a holistic view of a dataset's properties. This allows teams to detect common issues such as data drift much more effectively.
Example: Apache Spark
A common use of whylogs
with batch datasets is to run profiling with batch datasets in Apache Spark.
Users can run integration in either Scala or Python mode.
In the following example, we are reading in a public dataset as a Spark's Dataset (a.k.a DataFrame) object, and then perform whylogs profiling on this dataset.
Scala
import java.time.LocalDateTime
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SaveMode, SparkSession}
// this import adds newProfilingSession to Spark's Dataset
import com.whylogs.spark.WhyLogs._
object WhyLogsDemo extends App {
// creating a Spark session val spark = SparkSession
.builder()
.master("local[*, 3]")
.appName("SparkTesting-" + LocalDateTime.now().toString)
.config("spark.ui.enabled", "false")
.getOrCreate()
// Creating a dataset
val raw_df = spark.read
.option("header", "true")
.csv("Fire_Department_Calls_for_Service.csv")
// Parse in the call_date string
val df = raw_df.withColumn("call_date", to_timestamp(col("Call Date"), "MM/dd/YYYY"))
df.printSchema()
// Run profiling on the Dataeset
val profiles = df
.newProfilingSession("FireDepartment") // start a new WhyLogs profiling job
.withTimeColumn("call_date") // split dataset by call_date
.groupBy("City", "Priority") // tag and group the data with categorical information
.aggProfiles() // runs the aggregation. returns a dataframe of <timestamp, datasetProfile> entries
// Write output to Parquet
profiles.write
.mode(SaveMode.Overwrite)
.parquet("profiles_parquet")
}
PySpark
whylogs v1 users will need to first install the PySpark module. This is done as a separate installation to keep the core whylogs library as lightweight as possible and minimize dependencies.
pip install "whylogs[spark]"
- whylogs v0
- whylogs v1
import pandas as pd
whylogs_jar = "/path/to/whylogs/bundle.jar"
spark = pyspark.sql.SparkSession.builder
.appName("whylogs")
.config("spark.pyspark.driver.python", sys.executable)
.config("spark.pyspark.python", sys.executable)
.config("spark.executor.userClassPathFirst", "true")
.config("spark.submit.pyFiles", whylogs_jar)
.config("spark.jars", whylogs_jar)
.getOrCreate()
# this comes from whylogs bundle jar
import whyspark
pdf = pd.read_parquet("demo.csv")
df = spark.createDataFrame(pdf)
session = whyspark.new_profiling_session(df, "my-dataset-name").withTimeColumn('date')
profile_df = session.aggProfiles().cache()
profile_df.write.parquet('profiles.parquet')
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from whylogs.api.pyspark.experimental import collect_dataset_profile_view
spark = SparkSession.builder.appName('whylogs-testing').getOrCreate()
arrow_config_key = "spark.sql.execution.arrow.pyspark.enabled"
spark.conf.set(arrow_config_key, "true")
data_url = "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
spark.sparkContext.addFile(data_url)
spark_dataframe = spark.read.option("delimiter", ";") \
.option("inferSchema", "true") \
.csv(SparkFiles.get("winequality-red.csv"), header=True)
dataset_profile_view = collect_dataset_profile_view(input_df=spark_dataframe)
dataset_profile_view.write(path="path/to/profile.bin")
Next Steps
- Check out the full Spark example on GitHub with the example dataset.