Java
The whylogs library includes both a Java and Python version. This page is specific to the java version which includes support for Apache Spark integration.
Usage
To get started, add WhyLogs to your Maven POM:
<dependency>
<groupId>ai.whylabs</groupId>
<artifactId>whylogs-core</artifactId>
<version>0.1.0</version>
</dependency>
For the full Java API signature, see the Java Documentation.
Spark package (Scala 2.11 or 2.12 only):
<dependency>
<groupId>ai.whylabs</groupId>
<artifactId>whylogs-spark_2.11</artifactId>
<version>0.1.0</version>
</dependency>
For the full Scala API signature, see the Scala API Documentation.
Examples Repo
For examples in different languages, please checkout our whylogs-examples repository.
Simple tracking
The following code is a simple tracking example that does not output data to disk:
import com.whylogs.core.DatasetProfile;
import java.time.Instant;
import java.util.HashMap;
import com.google.common.collect.ImmutableMap;
public class Demo {
public void demo() {
final Map<String, String> tags = ImmutableMap.of("tag", "tagValue");
final DatasetProfile profile = new DatasetProfile("test-session", Instant.now(), tags);
profile.track("my_feature", 1);
profile.track("my_feature", "stringValue");
profile.track("my_feature", 1.0);
final HashMap<String, Object> dataMap = new HashMap<>();
dataMap.put("feature_1", 1);
dataMap.put("feature_2", "text");
dataMap.put("double_type_feature", 3.0);
profile.track(dataMap);
}
}
Serialization and deserialization
WhyLogs uses Protobuf as the backing storage format. To write the data to disk, use the standard Protobuf serialization API as follows.
import com.whylogs.core.DatasetProfile;
import java.io.InputStream;import java.nio.file.Files;
import java.io.OutputStream;
import java.nio.file.Paths;
import com.whylogs.core.message.DatasetProfileMessage;
class SerializationDemo {
public void demo(DatasetProfile profile) {
try (final OutputStream fos = Files.newOutputStream(Paths.get("profile.bin"))) {
profile.toProtobuf().build().writeDelimitedTo(fos);
}
try (final InputStream is = new FileInputStream("profile.bin")) {
final DatasetProfileMessage msg = DatasetProfileMessage.parseDelimitedFrom(is);
final DatasetProfile profile = DatasetProfile.fromProtobuf(msg);
// continue tracking
profile.track("feature_1", 1);
}
}
}
Merging dataset profiles
In enterprise systems, data is often partitioned across multiple machines for distributed processing. Online systems may also process data on multiple machines, requiring engineers to run ad-hoc analysis using an ETL-based system to build complex metrics, such as counting unique visitors to a website.
WhyLogs resolves this by allowing users to merge sketches from different machines. To merge two WhyLogs DatasetProfile
files, those files must:
- Have the same name
- Have the same session ID
- Have the same data timestamp
- Have the same tags The following is an example of the code for merging files that meet these requirements.
import com.whylogs.core.DatasetProfile;
import java.io.InputStream;import java.nio.file.Files;
import java.io.OutputStream;
import java.nio.file.Paths;
import com.whylogs.core.message.DatasetProfileMessage;
class SerializationDemo {
public void demo(DatasetProfile profile) {
try (final InputStream is1 = new FileInputStream("profile1.bin");
final InputStream is2 = new FileInputStream("profile2.bin")) {
final DatasetProfileMessage msg = DatasetProfileMessage.parseDelimitedFrom(is);
final DatasetProfile profile1 = DatasetProfile.fromProtobuf(DatasetProfileMessage.parseDelimitedFrom(is1));
final DatasetProfile profile2 = DatasetProfile.fromProtobuf(DatasetProfileMessage.parseDelimitedFrom(is2));
// merge
profile1.merge(profile2);
}
}
}
Apache Spark integration
Our integration is compatible with Apache Spark 2.x (3.0 support is to come). This example shows how we use WhyLogs to profile a dataset based on time and categorical information. The data is from the public dataset for Fire Department Calls & Incident.
import org.apache.spark.sql.functions._
// implicit import for WhyLogs to enable newProfilingSession API
import com.whylogs.spark.WhyLogs._
// load the data
val raw_df = spark.read.option("header", "true").csv("/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv")
val df = raw_df.withColumn("call_date", to_timestamp(col("Call Date"), "MM/dd/YYYY"))
val profiles = df.newProfilingSession("profilingSession") // start a new WhyLogs profiling job
.withTimeColumn("call_date") // split dataset by call_date
.groupBy("City", "Priority") // tag and group the data with categorical information
.aggProfiles() // runs the aggregation. returns a dataframe of <timestamp, datasetProfile> entries
For further analysis, dataframes can be stored in a Parquet file, or collected to the driver if the number of entries is small enough.
Building and Testing
- To build, run ./gradlew build
- To test, run ./gradlew test