• Stars
    star
    243
  • Rank 166,489 (Top 4 %)
  • Language
    Rust
  • License
    MIT License
  • Created over 2 years ago
  • Updated about 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Simple, extensible multithreaded background job and message processing library for Rust

apalis

Simple, extensible multithreaded background job and messages processing library for Rust


Features

  • Simple and predictable job handling model.
  • Jobs handlers with a macro free API.
  • Take full advantage of the tower ecosystem of middleware, services, and utilities.
  • Runtime agnostic - Use tokio, smol etc.
  • Optional Web interface to help you manage your jobs.

apalis job processing is powered by tower::Service which means you have access to the tower middleware.

apalis has support for:

Source Crate Example
Cron Jobs
Redis
Sqlite
Postgres
MySQL
Amqp
From Scratch

Getting Started

To get started, just add to Cargo.toml

[dependencies]
apalis = { version = "0.4", features = ["redis"] }

Usage

use apalis::prelude::*;
use apalis::redis::RedisStorage;
use serde::{Deserialize, Serialize};
use anyhow::Result;

#[derive(Debug, Deserialize, Serialize)]
struct Email {
    to: String,
}

impl Job for Email {
    const NAME: &'static str = "apalis::Email";
}

/// A function that will be converted into a service.
/// The following signatures are accepted
/// ```rust
/// async fn job(email: Email, ctx: JobContext)
/// async fn job(email: Email, ctx: JobContext) -> anyhow::Result<anyhow::Error>
/// async fn job(email: Email, ctx: JobContext) -> Result<(), my::Error>
/// async fn job(email: Email, ctx: JobContext) -> primitive //eg str
/// async fn job(email: Email, ctx: JobContext) -> impl IntoJobResponse
/// ```
async fn email_service(job: Email, ctx: JobContext) {
    info!("Do something");
}

#[tokio::main]
async fn main() -> Result<()> {
    std::env::set_var("RUST_LOG", "debug");
    env_logger::init();
    let redis = std::env::var("REDIS_URL").expect("Missing env variable REDIS_URL");
    let storage = RedisStorage::new(redis).await?;
    Monitor::new()
        .register_with_count(2, move |index| {
            WorkerBuilder::new(format!("email-worker-{index}"))
                .with_storage(storage.clone())
                .build_fn(email_service)
        })
        .run()
        .await
}

Then

//This can be in another part of the program or another application eg a http server
async fn produce_route_jobs(storage: &RedisStorage<Email>) -> Result<()> {
    let mut storage = storage.clone();
    storage
        .push(Email {
            to: "[email protected]".to_string(),
        })
        .await?;
}

Feature flags

  • tracing (enabled by default) β€” Support Tracing πŸ‘€
  • redis β€” Include redis storage
  • postgres β€” Include Postgres storage
  • sqlite β€” Include SQlite storage
  • mysql β€” Include MySql storage
  • cron β€” Include cron job processing
  • sentry β€” Support for Sentry exception and performance monitoring
  • prometheus β€” Support Prometheus metrics
  • retry β€” Support direct retrying jobs
  • timeout β€” Support timeouts on jobs
  • limit β€” πŸ’ͺ Limit the amount of jobs
  • filter β€” Support filtering jobs based on a predicate
  • extensions β€” Add a global extensions to jobs

Storage Comparison

Since we provide a few storage solutions, here is a table comparing them:

Feature Redis Sqlite Postgres Sled Mysql Mongo Cron
Scheduled jobs βœ“ βœ“ βœ“ x βœ“ x βœ“
Retry jobs βœ“ βœ“ βœ“ x βœ“ x βœ“
Persistence βœ“ βœ“ βœ“ x βœ“ x BYO
Rerun Dead jobs βœ“ βœ“ βœ“ x βœ“ x x

How apalis works

sequenceDiagram
    participant App
    participant Worker
    participant Producer

    App->>+Producer: Add job to queue
    Producer-->>+Worker: Job data
    Worker->>+Producer: Update job status to 'running' via Layer
    Producer-->>-Worker: Confirmation
    Worker->>+App: Notify job started via Layer
    loop job execution
        Worker-->>-App: Report job progress via Layer
    end
    Worker->>+Producer: Update job status to 'completed' via Layer

Web UI

If you are running apalis Board, you can easily manage your jobs. See a working rest API example here

Thanks to

  • tower - Tower is a library of modular and reusable components for building robust networking clients and servers.
  • redis-rs - Redis library for rust
  • sqlx - The Rust SQL Toolkit

Roadmap

v 1.0

  • Refactor the crates structure
  • Mocking utilities
  • Support for SurrealDB and Mongo
  • Lock free for Postgres
  • Add more utility layers
  • Use extractors in job fn structure
  • Polish up documentation
  • Improve and standardize apalis Board
  • Benchmarks

v 0.4

  • Move from actor based to layer based processing
  • Graceful Shutdown
  • Allow other types of executors apart from Tokio
  • Mock/Test Worker
  • Improve monitoring
  • Add job progress via layer
  • Add more sources

v 0.3

  • Standardize API (Storage, Worker, Data, Middleware, Context )
  • Introduce SQL
  • Implement layers for Sentry and Tracing.
  • Improve documentation
  • Organized modules and features.
  • Basic Web API Interface
  • Sql Examples
  • Sqlx migrations

v 0.2

  • Redis Example
  • Actix Web Example

Resources

Contributing

Please read CONTRIBUTING.md for details on our code of conduct, and the process for submitting pull requests to us.

Versioning

We use SemVer for versioning. For the versions available, see the tags on this repository.

Authors

See also the list of contributors who participated in this project.

License

This project is licensed under the MIT License - see the LICENSE.md file for details