Skip to main content

Ray

⚠️ See the v0 docs for using Ray with whylogs v0.

There are a lot of ways to use Ray but in general, you'll be creating whylogs profiles on each of the nodes/processes that are created by the Ray scheduler and returning them as DatasetProfileView instances that can be merged with other profiles. Make sure to return DatasetProfileView types from functions because DatasetProfile can't be serialized, and Ray needs to be able to serialize anything moving in between nodes.

Here is a simple example that makes use of Ray's remote decorator to execute a function that profiles a dataframe.

import pandas as pd
import ray
from whylogs.core import DatasetProfile, DatasetProfileView

from util import data_files

def merge_profiles(profiles: List[DatasetProfileView]) -> DatasetProfileView:
return reduce(lambda acc, cur: acc.merge(cur), profiles, DatasetProfile().view())

@ray.remote
def log_frame(df: pd.DataFrame) -> DatasetProfileView:
profile = DatasetProfile()
profile.track(df)
return profile.view()


def main() -> DatasetProfile:
pipeline = ray.data.read_csv(data_files).window()
pipelines = pipeline.iter_batches(batch_size=1000, batch_format="pandas")
results = ray.get([log_frame.remote(batch) for batch in pipelines])

return merge_profiles(results)


if __name__ == "__main__":
ray.init()
main()

Here is an example that uses Ray's actor and pipeline abstractions to split up a dataset into 8 shards, profile them, and merge the results into a single profile.

import ray
import time
from ray.data.dataset_pipeline import DatasetPipeline
from whylogs.core import DatasetProfile, DatasetProfileView

from util import data_files

# Use an actor to encapsulate some state/logic to eventually be executed remotely.
@ray.remote
class RemotePipelineActor:
def __init__(self, pipeline: DatasetPipeline) -> None:
self.pipeline = pipeline

def log_from_pipeline(self) -> DatasetProfileView:
profile = DatasetProfile()
# Larger batches profile quicker with whylogs
for df in self.pipeline.iter_batches(batch_size=10_000, batch_format="pandas"):
profile.track(pandas=df)
return profile.view()


def main_pipeline():
pipelines = ray.data.read_csv(data_files).window().split(8)
actors = [RemotePipelineActor.remote(pipeline) for pipeline in pipelines]
results = ray.get([actor.log_from_pipeline.remote() for actor in actors])

merged = merge_profiles(results)
merged.write("actor-pipeline-profile.bin")


if __name__ == "__main__":
ray.init()
main_pipeline()

Ray also has Ray Serve as a higher level serving library that utilizes its core functionality to serve models (or anything really). You can use the examples above to integrate whylogs into Ray Serve as well, or you check out our container based integration if you would rather keep whylogs separate from your Ray setup. That will allow you to forward data to a dedicated container endpoint instead of managing whylogs on Ray.

The examples are a bit contrived, but Ray is a very flexible platform. Stop by our community slack to give us more information about how you're using Ray and we can try to custom tailor the advice to your use case.

Prefooter Illustration Mobile
Run AI With Certainty
Get started for free
Prefooter Illustration