Skip to main content

Apache Spark

Overview#

whylogs profiles are mergeable and therefore suitable for Spark's map-reduce style processing. Since whylogs requires only a single pass of data, the integration is highly efficient: no shuffling is required to build whylogs profiles with Spark.

spark

Build from Source#

To get started, users will need to build the jar bundle from our GitHub:

git clone https://github.com/whylabs/whylogs-java
cd whylogs-java
./gradlew shadowJar

The JAR bundle is under whylogs-java/spark-bundle/build/libs. You'll need this JAR bundle for the following examples.

Configure Your Spark Session#

  • Add the JAR bundle to your Spark session
    • Via --jars parameter of your spark-submit script ( see documentation)
  • Setting spark.jars in your Spark configuration
  • [Python only] Configure your Spark session:
spark = pyspark.sql.SparkSession.builder \
.config("spark.submit.pyFiles", whylogs_jar) \
.config("spark.jars", whylogs_jar) \
... \
.getOrCreate()

Examples#

PySpark Example#

This example shows how we use WhyLogs to profile a dataset based on time and categorical information using pyspark.

whylogs v1 users will need to first install the PySpark module. This is done as a separate installation to keep the core whylogs library as lightweight as possible and minimize dependencies.

whylogs v1
pip install "whylogs[spark]"
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from whylogs.api.pyspark.experimental import collect_dataset_profile_view
spark = SparkSession.builder.appName('whylogs-testing').getOrCreate()
arrow_config_key = "spark.sql.execution.arrow.pyspark.enabled"
spark.conf.set(arrow_config_key, "true")
data_url = "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
spark.sparkContext.addFile(data_url)
spark_dataframe = spark.read.option("delimiter", ";") \
.option("inferSchema", "true") \
.csv(SparkFiles.get("winequality-red.csv"), header=True)
dataset_profile_view = collect_dataset_profile_view(input_df=spark_dataframe)

You can then generate a Pandas DataFrame from the profiled dataset.

dataset_profile_view.to_pandas()

Scala Example#

This example shows how we use WhyLogs to profile a the lending club dataset.

// Tested on Databricks cluster running as scala notebook:
// * cluster version: 8.3 (includes Apache Spark 3.1.1, Scala 2.12)
// * installed whylogs jar: whylogs_spark_bundle_3_1_1_scala_2_12_0_1_21aeb7b2_20210903_224257_1_all-d1b20.jar
// * from: https://oss.sonatype.org/content/repositories/snapshots/ai/whylabs/whylogs-spark-bundle_3.1.1-scala_2.12/0.1-21aeb7b2-SNAPSHOT/whylogs-spark-bundle_3.1.1-scala_2.12-0.1-21aeb7b2-20210903.224257-1-all.jar
import java.time.LocalDateTime
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SaveMode, SparkSession}
import com.whylogs.spark.WhyLogs._
// COMMAND ----------
// For demo purposes we will create a time column with yesterday's date, so that Whylabs ingestion sees this as a recent dataset profile
// and it shows up in default dashboard of last 7 days on Whylabs.
def unixEpochTimeForNumberOfDaysAgo(numDaysAgo: Int): Long = {
import java.time._
val numDaysAgoDateTime: LocalDateTime = LocalDateTime.now().minusDays(numDaysAgo)
val zdt: ZonedDateTime = numDaysAgoDateTime.atZone(ZoneId.of("America/Los_Angeles"))
val numDaysAgoDateTimeInMillis = zdt.toInstant.toEpochMilli
val unixEpochTime = numDaysAgoDateTimeInMillis / 1000L
unixEpochTime
}
val timestamp_yesterday = unixEpochTimeForNumberOfDaysAgo(1)
println(timestamp_yesterday)
val timeColumn = "dataset_timestamp"
// COMMAND ----------
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DataTypes
val spark = SparkSession
.builder()
.master("local[*, 3]")
.appName("SparkTesting-" + LocalDateTime.now().toString)
.config("spark.ui.enabled", "false")
.getOrCreate()
// the file location below is using the lending_club_1000.csv uploaded onto a Databricks dbfs
// e.g. from here https://github.com/whylabs/whylogs/blob/mainline/testdata/lending_club_1000.csv
// you will need to update that location based on a dataset you use or follow this example.
val input_dataset_location = "dbfs:/FileStore/tables/lending_club_1000.csv"
val raw_df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(input_dataset_location)
// Here we add an artificial column for time. It is required that there is a TimestampType column for profiling with this API
val df = raw_df.withColumn(timeColumn, lit(timestamp_yesterday).cast(DataTypes.TimestampType))
df.printSchema()
// COMMAND ----------
val session = df.newProfilingSession("LendingClubScala") // start a new WhyLogs profiling job
.withTimeColumn(timeColumn)
val profiles = session
.aggProfiles() // runs the aggregation. returns a dataframe of <dataset_timestamp, datasetProfile> entries
// COMMAND ----------
// optionally you might write the dataset profiles out somewhere before uploading to Whylabs
profiles.write
.mode(SaveMode.Overwrite)
.parquet("dbfs:/FileStore/tables/whylogs_demo_profiles_parquet")
// COMMAND ----------
// Replace the following parameters below with your values after signing up for an account at https://whylabs.ai/
// You can find Organization Id on https://hub.whylabsapp.com/settings/access-tokens and the value looks something like: org-123abc
// also the settings page allows you t create new apiKeys which you will need an apiKey to upload to your account in Whylabs
// The modelId below specifies which model this profile is for, by default an initial model-1 is created but you will update this
// if you create a new model here https://hub.whylabsapp.com/settings/model-management
session.log(
orgId = "replace_with_your_orgId",
modelId = "model-1",
apiKey = "replace_with_your_api_key")

This example can also be found in our whylogs library here

Prefooter Illustration Mobile
Run AI With Certainty
Get started for free
Prefooter Illustration