• Stars
    star
    132
  • Rank 265,278 (Top 6 %)
  • Language
    Python
  • License
    Apache License 2.0
  • Created over 3 years ago
  • Updated 2 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Distributed XGBoost on Ray

Distributed XGBoost on Ray

Build Status docs.ray.io

XGBoost-Ray is a distributed backend for XGBoost, built on top of distributed computing framework Ray.

XGBoost-Ray

All releases are tested on large clusters and workloads.

Installation

You can install the latest XGBoost-Ray release from PIP:

pip install "xgboost_ray"

If you'd like to install the latest master, use this command instead:

pip install "git+https://github.com/ray-project/xgboost_ray.git#egg=xgboost_ray"

Usage

XGBoost-Ray provides a drop-in replacement for XGBoost's train function. To pass data, instead of using xgb.DMatrix you will have to use xgboost_ray.RayDMatrix. You can also use a scikit-learn interface - see next section.

Just as in original xgb.train() function, the training parameters are passed as the params dictionary.

Ray-specific distributed training parameters are configured with a xgboost_ray.RayParams object. For instance, you can set the num_actors property to specify how many distributed actors you would like to use.

Here is a simplified example (which requires sklearn):

Training:

from xgboost_ray import RayDMatrix, RayParams, train
from sklearn.datasets import load_breast_cancer

train_x, train_y = load_breast_cancer(return_X_y=True)
train_set = RayDMatrix(train_x, train_y)

evals_result = {}
bst = train(
    {
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
    },
    train_set,
    evals_result=evals_result,
    evals=[(train_set, "train")],
    verbose_eval=False,
    ray_params=RayParams(
        num_actors=2,  # Number of remote actors
        cpus_per_actor=1))

bst.save_model("model.xgb")
print("Final training error: {:.4f}".format(
    evals_result["train"]["error"][-1]))

Prediction:

from xgboost_ray import RayDMatrix, RayParams, predict
from sklearn.datasets import load_breast_cancer
import xgboost as xgb

data, labels = load_breast_cancer(return_X_y=True)

dpred = RayDMatrix(data, labels)

bst = xgb.Booster(model_file="model.xgb")
pred_ray = predict(bst, dpred, ray_params=RayParams(num_actors=2))

print(pred_ray)

scikit-learn API

XGBoost-Ray also features a scikit-learn API fully mirroring pure XGBoost scikit-learn API, providing a completely drop-in replacement. The following estimators are available:

  • RayXGBClassifier
  • RayXGRegressor
  • RayXGBRFClassifier
  • RayXGBRFRegressor
  • RayXGBRanker

Example usage of RayXGBClassifier:

from xgboost_ray import RayXGBClassifier, RayParams
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

seed = 42

X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, train_size=0.25, random_state=42
)

clf = RayXGBClassifier(
    n_jobs=4,  # In XGBoost-Ray, n_jobs sets the number of actors
    random_state=seed
)

# scikit-learn API will automatically convert the data
# to RayDMatrix format as needed.
# You can also pass X as a RayDMatrix, in which case
# y will be ignored.

clf.fit(X_train, y_train)

pred_ray = clf.predict(X_test)
print(pred_ray)

pred_proba_ray = clf.predict_proba(X_test)
print(pred_proba_ray)

# It is also possible to pass a RayParams object
# to fit/predict/predict_proba methods - will override
# n_jobs set during initialization

clf.fit(X_train, y_train, ray_params=RayParams(num_actors=2))

pred_ray = clf.predict(X_test, ray_params=RayParams(num_actors=2))
print(pred_ray)

Things to keep in mind:

  • n_jobs parameter controls the number of actors spawned. You can pass a RayParams object to the fit/predict/predict_proba methods as the ray_params argument for greater control over resource allocation. Doing so will override the value of n_jobs with the value of ray_params.num_actors attribute. For more information, refer to the Resources section below.
  • By default n_jobs is set to 1, which means the training will not be distributed. Make sure to either set n_jobs to a higher value or pass a RayParams object as outlined above in order to take advantage of XGBoost-Ray's functionality.
  • After calling fit, additional evaluation results (e.g. training time, number of rows, callback results) will be available under additional_results_ attribute.
  • XGBoost-Ray's scikit-learn API is based on XGBoost 1.4. While we try to support older XGBoost versions, please note that this library is only fully tested and supported for XGBoost >= 1.4.

For more information on the scikit-learn API, refer to the XGBoost documentation.

Data loading

Data is passed to XGBoost-Ray via a RayDMatrix object.

The RayDMatrix lazy loads data and stores it sharded in the Ray object store. The Ray XGBoost actors then access these shards to run their training on.

A RayDMatrix support various data and file types, like Pandas DataFrames, Numpy Arrays, CSV files and Parquet files.

Example loading multiple parquet files:

import glob
from xgboost_ray import RayDMatrix, RayFileType

# We can also pass a list of files
path = list(sorted(glob.glob("/data/nyc-taxi/*/*/*.parquet")))

# This argument will be passed to `pd.read_parquet()`
columns = [
    "passenger_count",
    "trip_distance", "pickup_longitude", "pickup_latitude",
    "dropoff_longitude", "dropoff_latitude",
    "fare_amount", "extra", "mta_tax", "tip_amount",
    "tolls_amount", "total_amount"
]

dtrain = RayDMatrix(
    path,
    label="passenger_count",  # Will select this column as the label
    columns=columns,
    # ignore=["total_amount"],  # Optional list of columns to ignore
    filetype=RayFileType.PARQUET)

Hyperparameter Tuning

XGBoost-Ray integrates with Ray Tune to provide distributed hyperparameter tuning for your distributed XGBoost models. You can run multiple XGBoost-Ray training runs in parallel, each with a different hyperparameter configuration, and each training run parallelized by itself. All you have to do is move your training code to a function, and pass the function to tune.run. Internally, train will detect if tune is being used and will automatically report results to tune.

Example using XGBoost-Ray with Ray Tune:

from xgboost_ray import RayDMatrix, RayParams, train
from sklearn.datasets import load_breast_cancer

num_actors = 4
num_cpus_per_actor = 1

ray_params = RayParams(
    num_actors=num_actors,
    cpus_per_actor=num_cpus_per_actor)

def train_model(config):
    train_x, train_y = load_breast_cancer(return_X_y=True)
    train_set = RayDMatrix(train_x, train_y)

    evals_result = {}
    bst = train(
        params=config,
        dtrain=train_set,
        evals_result=evals_result,
        evals=[(train_set, "train")],
        verbose_eval=False,
        ray_params=ray_params)
    bst.save_model("model.xgb")

from ray import tune

# Specify the hyperparameter search space.
config = {
    "tree_method": "approx",
    "objective": "binary:logistic",
    "eval_metric": ["logloss", "error"],
    "eta": tune.loguniform(1e-4, 1e-1),
    "subsample": tune.uniform(0.5, 1.0),
    "max_depth": tune.randint(1, 9)
}

# Make sure to use the `get_tune_resources` method to set the `resources_per_trial`
analysis = tune.run(
    train_model,
    config=config,
    metric="train-error",
    mode="min",
    num_samples=4,
    resources_per_trial=ray_params.get_tune_resources())
print("Best hyperparameters", analysis.best_config)

Also see examples/simple_tune.py for another example.

Fault tolerance

XGBoost-Ray leverages the stateful Ray actor model to enable fault tolerant training. There are currently two modes implemented.

Non-elastic training (warm restart)

When an actor or node dies, XGBoost-Ray will retain the state of the remaining actors. In non-elastic training, the failed actors will be replaced as soon as resources are available again. Only these actors will reload their parts of the data. Training will resume once all actors are ready for training again.

You can set this mode in the RayParams:

from xgboost_ray import RayParams

ray_params = RayParams(
    elastic_training=False,  # Use non-elastic training
    max_actor_restarts=2,    # How often are actors allowed to fail
)

Elastic training

In elastic training, XGBoost-Ray will continue training with fewer actors (and on fewer data) when a node or actor dies. The missing actors are staged in the background, and are reintegrated into training once they are back and loaded their data.

This mode will train on fewer data for a period of time, which can impact accuracy. In practice, we found these effects to be minor, especially for large shuffled datasets. The immediate benefit is that training time is reduced significantly to almost the same level as if no actors died. Thus, especially when data loading takes a large part of the total training time, this setting can dramatically speed up training times for large distributed jobs.

You can configure this mode in the RayParams:

from xgboost_ray import RayParams

ray_params = RayParams(
    elastic_training=True,  # Use elastic training
    max_failed_actors=3,    # Only allow at most 3 actors to die at the same time
    max_actor_restarts=2,   # How often are actors allowed to fail
)

Resources

By default, XGBoost-Ray tries to determine the number of CPUs available and distributes them evenly across actors.

In the case of very large clusters or clusters with many different machine sizes, it makes sense to limit the number of CPUs per actor by setting the cpus_per_actor argument. Consider always setting this explicitly.

The number of XGBoost actors always has to be set manually with the num_actors argument.

Multi GPU training

XGBoost-Ray enables multi GPU training. The XGBoost core backend will automatically leverage NCCL2 for cross-device communication. All you have to do is to start one actor per GPU and set XGBoost's tree_method to a GPU-compatible option, eg. gpu_hist (see XGBoost documentation for more details.)

For instance, if you have 2 machines with 4 GPUs each, you will want to start 8 remote actors, and set gpus_per_actor=1. There is usually no benefit in allocating less (e.g. 0.5) or more than one GPU per actor.

You should divide the CPUs evenly across actors per machine, so if your machines have 16 CPUs in addition to the 4 GPUs, each actor should have 4 CPUs to use.

from xgboost_ray import RayParams

ray_params = RayParams(
    num_actors=8,
    gpus_per_actor=1,
    cpus_per_actor=4,   # Divide evenly across actors per machine
)

How many remote actors should I use?

This depends on your workload and your cluster setup. Generally there is no inherent benefit of running more than one remote actor per node for CPU-only training. This is because XGBoost core can already leverage multiple CPUs via threading.

However, there are some cases when you should consider starting more than one actor per node:

  • For multi GPU training, each GPU should have a separate remote actor. Thus, if your machine has 24 CPUs and 4 GPUs, you will want to start 4 remote actors with 6 CPUs and 1 GPU each
  • In a heterogeneous cluster, you might want to find the greatest common divisor for the number of CPUs. E.g. for a cluster with three nodes of 4, 8, and 12 CPUs, respectively, you should set the number of actors to 6 and the CPUs per actor to 4.

Distributed data loading

XGBoost-Ray can leverage both centralized and distributed data loading.

In centralized data loading, the data is partitioned by the head node and stored in the object store. Each remote actor then retrieves their partitions by querying the Ray object store. Centralized loading is used when you pass centralized in-memory dataframes, such as Pandas dataframes or Numpy arrays, or when you pass a single source file, such as a single CSV or Parquet file.

from xgboost_ray import RayDMatrix

# This will use centralized data loading, as only one source file is specified
# `label_col` is a column in the CSV, used as the target label
ray_params = RayDMatrix("./source_file.csv", label="label_col")

In distributed data loading, each remote actor loads their data directly from the source (e.g. local hard disk, NFS, HDFS, S3), without a central bottleneck. The data is still stored in the object store, but locally to each actor. This mode is used automatically when loading data from multiple CSV or Parquet files. Please note that we do not check or enforce partition sizes in this case - it is your job to make sure the data is evenly distributed across the source files.

from xgboost_ray import RayDMatrix

# This will use distributed data loading, as four source files are specified
# Please note that you cannot schedule more than four actors in this case.
# `label_col` is a column in the Parquet files, used as the target label
ray_params = RayDMatrix([
    "hdfs:///tmp/part1.parquet",
    "hdfs:///tmp/part2.parquet",
    "hdfs:///tmp/part3.parquet",
    "hdfs:///tmp/part4.parquet",
], label="label_col")

Lastly, XGBoost-Ray supports distributed dataframe representations, such as Ray Datasets, Modin and Dask dataframes (used with Dask on Ray). Here, XGBoost-Ray will check on which nodes the distributed partitions are currently located, and will assign partitions to actors in order to minimize cross-node data transfer. Please note that we also assume here that partition sizes are uniform.

from xgboost_ray import RayDMatrix

# This will try to allocate the existing Modin partitions
# to co-located Ray actors. If this is not possible, data will
# be transferred across nodes
ray_params = RayDMatrix(existing_modin_df)

Data sources

The following data sources can be used with a RayDMatrix object.

Type Centralized loading Distributed loading
Numpy array Yes No
Pandas dataframe Yes No
Single CSV Yes No
Multi CSV Yes Yes
Single Parquet Yes No
Multi Parquet Yes Yes
Ray Dataset Yes Yes
Petastorm Yes Yes
Dask dataframe Yes Yes
Modin dataframe Yes Yes

Memory usage

XGBoost uses a compute-optimized datastructure, the DMatrix, to hold training data. When converting a dataset to a DMatrix, XGBoost creates intermediate copies and ends up holding a complete copy of the full data. The data will be converted into the local dataformat (on a 64 bit system these are 64 bit floats.) Depending on the system and original dataset dtype, this matrix can thus occupy more memory than the original dataset.

The peak memory usage for CPU-based training is at least 3x the dataset size (assuming dtype float32 on a 64bit system) plus about 400,000 KiB for other resources, like operating system requirements and storing of intermediate results.

Example

  • Machine type: AWS m5.xlarge (4 vCPUs, 16 GiB RAM)
  • Usable RAM: ~15,350,000 KiB
  • Dataset: 1,250,000 rows with 1024 features, dtype float32. Total size: 5,000,000 KiB
  • XGBoost DMatrix size: ~10,000,000 KiB

This dataset will fit exactly on this node for training.

Note that the DMatrix size might be lower on a 32 bit system.

GPUs

Generally, the same memory requirements exist for GPU-based training. Additionally, the GPU must have enough memory to hold the dataset.

In the example above, the GPU must have at least 10,000,000 KiB (about 9.6 GiB) memory. However, empirically we found that using a DeviceQuantileDMatrix seems to show more peak GPU memory usage, possibly for intermediate storage when loading data (about 10%).

Best practices

In order to reduce peak memory usage, consider the following suggestions:

  • Store data as float32 or less. More precision is often not needed, and keeping data in a smaller format will help reduce peak memory usage for initial data loading.
  • Pass the dtype when loading data from CSV. Otherwise, floating point values will be loaded as np.float64 per default, increasing peak memory usage by 33%.

Placement Strategies

XGBoost-Ray leverages Ray's Placement Group API (https://docs.ray.io/en/master/placement-group.html) to implement placement strategies for better fault tolerance.

By default, a SPREAD strategy is used for training, which attempts to spread all of the training workers across the nodes in a cluster on a best-effort basis. This improves fault tolerance since it minimizes the number of worker failures when a node goes down, but comes at a cost of increased inter-node communication To disable this strategy, set the RXGB_USE_SPREAD_STRATEGY environment variable to 0. If disabled, no particular placement strategy will be used.

Note that this strategy is used only when elastic_training is not used. If elastic_training is set to True, no placement strategy is used.

When XGBoost-Ray is used with Ray Tune for hyperparameter tuning, a PACK strategy is used. This strategy attempts to place all workers for each trial on the same node on a best-effort basis. This means that if a node goes down, it will be less likely to impact multiple trials.

When placement strategies are used, XGBoost-Ray will wait for 100 seconds for the required resources to become available, and will fail if the required resources cannot be reserved and the cluster cannot autoscale to increase the number of resources. You can change the RXGB_PLACEMENT_GROUP_TIMEOUT_S environment variable to modify how long this timeout should be.

More examples

For complete end to end examples, please have a look at the examples folder:

Resources

More Repositories

1

ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
Python
30,993
star
2

llm-numbers

Numbers every LLM developer should know
3,845
star
3

ray-llm

RayLLM - LLMs on Ray
Python
1,029
star
4

kuberay

A toolkit to run Ray applications on Kubernetes
Go
861
star
5

tutorial

Jupyter Notebook
772
star
6

tune-sklearn

A drop-in replacement for Scikit-Learn’s GridSearchCV / RandomizedSearchCV -- but with cutting edge hyperparameter tuning techniques.
Python
464
star
7

llmperf

LLMPerf is a library for validating and benchmarking LLMs
Python
366
star
8

llmperf-leaderboard

358
star
9

ray-educational-materials

This is suite of the hands-on training materials that shows how to scale CV, NLP, time-series forecasting workloads with Ray.
Jupyter Notebook
272
star
10

ray_lightning

Pytorch Lightning Distributed Accelerators using Ray
Python
204
star
11

langchain-ray

Examples on how to use LangChain and Ray
Python
202
star
12

rl-experiments

Keeping track of RL experiments
148
star
13

deltacat

A portable Pythonic Data Catalog API powered by Ray that brings exabyte-level scalability and fast, ACID-compliant, change-data-capture to your big data workloads.
Python
97
star
14

rayfed

A multiple parties joint, distributed execution engine based on Ray, to help build your own federated learning frameworks in minutes.
Python
81
star
15

mobius

Mobius is an AI infrastructure platform for distributed online learning, including online sample processing, training and serving.
Java
78
star
16

plasma

A minimal shared memory object store design
C
40
star
17

enhancements

Tracking Ray Enhancement Proposals
40
star
18

lightgbm_ray

LightGBM on Ray
Python
40
star
19

ray_beam_runner

Ray-based Apache Beam runner
Python
37
star
20

mlflow-ray-serve

MLFlow Deployment Plugin for Ray Serve
Python
35
star
21

distml

Distributed ML Optimizer
Python
29
star
22

llms-in-prod-workshop-2023

Deploy and Scale LLM-based applications
Jupyter Notebook
23
star
23

ray-legacy

An experimental distributed execution engine
Python
21
star
24

ray_shuffling_data_loader

A Ray-based data loader with per-epoch shuffling and configurable pipelining, for shuffling and loading training data for distributed training of machine learning models.
Python
18
star
25

pygloo

Pygloo provides Python bindings for Gloo.
C++
15
star
26

contrib-workflow-dag

Python
11
star
27

anyscale-berkeley-ai-hackathon

Ray and Anyscale for UC Berkeley AI Hackathon!
Jupyter Notebook
11
star
28

credis

C++
9
star
29

ray-acm-workshop-2023

Scalable/Distributed Computer Vision with Ray
Jupyter Notebook
9
star
30

spark-ray-example

A simple demonstration of embedding Ray in a Spark UDF. For Spark + AI Summit 2020.
Jupyter Notebook
8
star
31

community

Artifacts intended to support the Ray Developer Community: SIGs, RFC overviews, and governance. We're very glad you're here! ✨
8
star
32

llm-application

Jupyter Notebook
6
star
33

releaser

Python
5
star
34

scalable-learning

Scaling multi-node multi-GPU workloads
5
star
35

raynomics

Experimental genomics algorithms in Ray
Python
5
star
36

air-reference-arch

Jupyter Notebook
5
star
37

serve-movie-rec-demo

Python
5
star
38

maze-raylit

Hackathon 2020! Max Archit Zhe
Python
5
star
39

ray-serve-arize-observe

Building Real-Time Inference Pipelines with Ray Serve
Jupyter Notebook
5
star
40

anyscale-workshop-nyc-2023

Scalable NLP model fine-tuning and batch inference with Ray and Anyscale
Jupyter Notebook
5
star
41

kuberay-helm

Helm charts for the KubeRay project
Mustache
4
star
42

ray-saturday-dec-2022

Ray Saturday Dec 2022 edition
Jupyter Notebook
4
star
43

RFC

Community Documents
4
star
44

sandbox

Ray repository sandbox
Python
4
star
45

ray-demos

Collection of demos build with Ray
Jupyter Notebook
4
star
46

prototype_gpu_buffer

Python
3
star
47

arrow-build

Queue for building arrow
3
star
48

numbuf

Serializing primitive Python types in Arrow
C++
3
star
49

odsc-west-workshop-2023

Jupyter Notebook
3
star
50

2022_04_13_ray_serve_meetup_demo

Code samples for Ray Serve Meetup on 04/13/2022
Python
2
star
51

q4-2021-docs-hackathon

HTML
2
star
52

ray-scripts

Experimental scripts for deploying and using Ray
Shell
2
star
53

raytracer

Polymer WebUI for Ray
HTML
2
star
54

travis-tracker-v2

Python
2
star
55

scipy-ray-scalable-ml-tutorial-2023

Jupyter Notebook
2
star
56

rllib-contrib

Python
2
star
57

serve_workloads

Python
2
star
58

qcon-workshop-2023

Jupyter Notebook
2
star
59

travis-tracker

Dashboard for Tracking Travis Python Test Result.
TypeScript
1
star
60

common

Code that is shared between Ray projects
C
1
star
61

photon

A local scheduler and node manager for Ray
C
1
star
62

spmd_grid

Grid-style gang-scheduling and collective communication for Ray
Python
1
star
63

checkstyle_java

Python
1
star
64

raylibs

Libraries for Ray
1
star
65

issues-to-airtable

JavaScript
1
star
66

ray-docs-zh

Chinese translation of Ray documentation. This may not be update to date.
1
star
67

ray-project.github.io

The Ray project website
HTML
1
star
68

streaming

Streaming processing engine based on ray platform.
1
star
69

train-serve-primer

Jupyter Notebook
1
star
70

serve_config_examples

Python
1
star
71

Ray-Forward

Some resources about Ray Forward Meetup
1
star
72

ray-summit-2022

Website for Ray Summit 2022
HTML
1
star