Skip to main content

Apache Spark


whylogs profiles are mergeable and therefore suitable for Spark's map-reduce style processing. Since whylogs requires only a single pass of data, the integration is highly efficient: no shuffling is required to build whylogs profiles with Spark.



PySpark Examples


The following examples show how to use whylogs to profile a dataset using PySpark.

Please note that whylogs v1 users will need to first install the PySpark module. This is done as a separate installation to keep the core whylogs library as lightweight as possible and minimize dependencies.

whylogs v1
pip install "whylogs[spark]"
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from whylogs.api.pyspark.experimental import collect_dataset_profile_view

spark = SparkSession.builder.appName('whylogs-testing').getOrCreate()
arrow_config_key = "spark.sql.execution.arrow.pyspark.enabled"
spark.conf.set(arrow_config_key, "true")

data_url = ""

spark_dataframe ="delimiter", ";") \
.option("inferSchema", "true") \
.csv(SparkFiles.get("winequality-red.csv"), header=True)

dataset_profile_view = collect_dataset_profile_view(input_df=spark_dataframe)

Profiling with Segments

If you want to create segments, you'll need to initialize a segmented schema object and pass it to the profiling function as shown in the code snippet below:

from whylogs.core.segmentation_partition import segment_on_column
from whylogs.core.resolvers import STANDARD_RESOLVER
from whylogs.core.schema import DeclarativeSchema
from whylogs.api.pyspark.experimental import (
column_segments = segment_on_column("quality")
segmented_schema = DeclarativeSchema(STANDARD_RESOLVER, segments=column_segments)

segmented_results = collect_segmented_results(spark_dataframe,schema=segmented_schema)

Inspecting the Profiles

Standard profiles

In case of standard (unsegmented) profiles you can then generate a pandas DataFrame from the profiled dataset as shown below:


Segmented profiles

Segmented profiles have a more complex structure with one profile per each segment. The amount of segments profiled can be checked as follows:

print(f"After profiling the result set has: {segmented_results.count} segments")

You can inspect each of the profiles within a segmented profile as shown below:

first_segment = segmented_results.segments()[0]
segmented_profile = segmented_results.profile(first_segment)

print("Profile view for segment {}".format(first_segment.key))

Uploading Profiles to WhyLabs

You can then upload the profile to the WhyLabs platform with the following code:


In case you wish to upload your data as a reference profile, use the code below instead:

dataset_profile_view.writer("whylabs").option(reference_profile_name="<reference profile alias>").write()

Scala Examples

This example shows how we use WhyLogs to profile a the lending club dataset.

// Tested on Databricks cluster running as scala notebook:
// * cluster version: 8.3 (includes Apache Spark 3.1.1, Scala 2.12)
// * installed whylogs jar: whylogs_spark_bundle_3_1_1_scala_2_12_0_1_21aeb7b2_20210903_224257_1_all-d1b20.jar
// * from:

import java.time.LocalDateTime

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SaveMode, SparkSession}
import com.whylogs.spark.WhyLogs._

// COMMAND ----------

// For demo purposes we will create a time column with yesterday's date, so that Whylabs ingestion sees this as a recent dataset profile
// and it shows up in default dashboard of last 7 days on Whylabs.
def unixEpochTimeForNumberOfDaysAgo(numDaysAgo: Int): Long = {
import java.time._
val numDaysAgoDateTime: LocalDateTime =
val zdt: ZonedDateTime = numDaysAgoDateTime.atZone(ZoneId.of("America/Los_Angeles"))
val numDaysAgoDateTimeInMillis = zdt.toInstant.toEpochMilli
val unixEpochTime = numDaysAgoDateTimeInMillis / 1000L

val timestamp_yesterday = unixEpochTimeForNumberOfDaysAgo(1)
val timeColumn = "dataset_timestamp"

// COMMAND ----------

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DataTypes
val spark = SparkSession
.master("local[*, 3]")
.appName("SparkTesting-" +
.config("spark.ui.enabled", "false")

// the file location below is using the lending_club_1000.csv uploaded onto a Databricks dbfs
// e.g. from here
// you will need to update that location based on a dataset you use or follow this example.
val input_dataset_location = "dbfs:/FileStore/tables/lending_club_1000.csv"

val raw_df =
.option("header", "true")
.option("inferSchema", "true")

// Here we add an artificial column for time. It is required that there is a TimestampType column for profiling with this API
val df = raw_df.withColumn(timeColumn, lit(timestamp_yesterday).cast(DataTypes.TimestampType))

// COMMAND ----------

val session = df.newProfilingSession("LendingClubScala") // start a new WhyLogs profiling job
val profiles = session
.aggProfiles() // runs the aggregation. returns a dataframe of <dataset_timestamp, datasetProfile> entries

// COMMAND ----------

// optionally you might write the dataset profiles out somewhere before uploading to Whylabs

// COMMAND ----------

// Replace the following parameters below with your values after signing up for an account at
// You can find Organization Id on and the value looks something like: org-123abc
// also the settings page allows you t create new apiKeys which you will need an apiKey to upload to your account in Whylabs
// The modelId below specifies which model this profile is for, by default an initial model-1 is created but you will update this
// if you create a new model here
orgId = "replace_with_your_orgId",
modelId = "model-1",
apiKey = "replace_with_your_api_key")

This example can also be found in our whylogs library here

Prefooter Illustration Mobile
Run AI With Certainty
Get started for free
Prefooter Illustration