Ray

There are a lot of ways to use Ray but in general, you'll be creating whylogs profiles on each of the nodes/processes that are created by the Ray scheduler and returning them as protobuf serialized strings that can be serialized into profile objects and merged with other profiles. Currently, whylogs profiles don't pickle (the default serialization used by Ray) so it's necessary to serialize them using our protobuf format.

# Roughly what will happen on each Ray process
# If you're sending profiles to WhyLabs. If you're generating local profiles then writers can be empty.
writer = WhyLabsWriter()
session = Session(project="default-project", pipeline="default-pipeline", writers=[writer])
pb = session.profile_dataframe(df).to_protobuf()
serialized_profile = pb.SerializeToString(deterministic=True)

The snippet above will get you a profile that you can return from remote calls or actor executions. Once you have many of these profiles, you can merge them down into a single one.

# From some distributed work by Ray
profiles: List[DatasetProfile] = [..]
# Parse the serialized profiles and create DatasetProfile objects
profiles = map(DatasetProfile.parse_delimited, profiles)
flat_profiles = [item for sublist in profiles for item in sublist]
# Merge them into a single profile
profile = reduce(lambda acc, cur: acc.merge(cur), flat_profiles, DatasetProfile(""))

Here is an example that uses ray functions and pipelines. A few CSVs are wrapped in a pipeline and then batches of 1000 are converted into whylogs profiles in parallel and returned as serialized profiles that can be merged into a single profile.

# Or however you want to make a pipeline
pipeline = ray.data.read_csv(data_files).pipeline()
pipelines = pipeline.iter_batches(batch_size=1000, batch_format="pandas")
serialized_profiles = [log_frame.remote(batch) for batch in pipelines]
results = ray.get(serialized_profiles)

Here is another example where pipelines are used to split data into 8 equal parts and then sent off to individual actors for some further processing.

@ray.remote
class RemotePipelineActor:
def __init__(self, pipeline: DatasetPipeline) -> None:
self.pipeline = pipeline
self.session = Session(project="ignored",
pipeline="ignored",
writers=[])
self.logger = self.session.logger()
def log_from_pipeline(self) -> List[bytes]:
for df in self.pipeline.iter_batches(batch_size=1000, batch_format="pandas"):
self.logger.log_dataframe(df)
return self.logger.profile.serialize_delimited()
pipelines = ray.data.read_csv(data_files).pipeline().split(8)
actors = [RemotePipelineActor.remote(pipeline) for pipeline in pipelines]
results = ray.get([actor.log_from_pipeline.remote() for actor in actors])

Ray also has Ray Serve as a higher level serving library that utilizes its core functionality to serve models (or anything really). You can use the examples above to integrate whylogs into Ray Serve as well, or you check out our container based integration if you would rather keep whylogs separate from your Ray setup. That will allow you to forward data to a dedicated container endpoint instead of managing whylogs on Ray.

The examples are admittedly a bit contrived, but Ray is a very flexible platform. Stop by our community slack to give us more information about how you're using Ray and we can try to custom tailor the advice to your use case.

Prefooter Illustration Mobile
Run AI With Certainty
Get started for free
Prefooter Illustration