• Stars
    star
    362
  • Rank 117,671 (Top 3 %)
  • Language
    Ruby
  • License
    MIT License
  • Created over 10 years ago
  • Updated 7 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Allows identical sidekiq jobs to be processed with a single background call

Gem Version

Sidekiq::Grouping

Sponsored by Evil Martians

Allows to combine similar sidekiq jobs into groups to process them at once.

Useful for:

  • Grouping asynchronous API index calls into bulks for bulk updating/indexing.
  • Periodical batch updating of recently changing database counters.

NOTE: As of 1.0 batch_size renamed to batch_flush_size. NOTE: As of 1.0.6 works with Sidekiq 4. NOTE: As of 1.0.8 Locking is atomic (set nx/ex) and will no longer lead to batches that are permalocked and stuck

Usage

Create a worker:

class ElasticBulkIndexWorker
  include Sidekiq::Worker

  sidekiq_options(
    queue: :elastic_bulks,
    batch_flush_size: 30,     # Jobs will be combined when queue size exceeds 30
    batch_flush_interval: 60, # Jobs will be combined every 60 seconds
    retry: 5
  )

  def perform(group)
    client = Elasticsearch::Client.new
    client.bulk(body: group.flatten)
  end
end

Perform a jobs:

# At least 30 times

ElasticBulkIndexWorker.perform_async({ delete: { _index: 'test', _id: 5, _type: 'user' } })
ElasticBulkIndexWorker.perform_async({ delete: { _index: 'test', _id: 6, _type: 'user' } })
ElasticBulkIndexWorker.perform_async({ delete: { _index: 'test', _id: 7, _type: 'user' } })
...

This jobs will be grouped into the single job with the single argument:

[
  [{ delete: { _index: 'test', _id: 5, _type: 'user' } }],
  [{ delete: { _index: 'test', _id: 6, _type: 'user' } }],
  [{ delete: { _index: 'test', _id: 7, _type: 'user' } }]
  ...
]

Control grouping

  • If batch_flush_size option is set - grouping will be performed when batched queue size exceeds this value or Sidekiq::Grouping::Config.max_batch_size (1000 by default).
  • If batch_flush_interval option is set - grouping will be performed every given interval.
  • If both are set - grouping will be performed when first condition become true. For example, if batch_flush_interval is set to 60 seconds and batch_flush_size is set to 5 - group task will be enqueued even if just 3 jobs are in the queue at the end of the minute. In the other hand, if 5 jobs were enqueued during 10 seconds - they will be grouped and enqueued immediately.

Options

  • batch_unique prevents enqueue of jobs with identical arguments.

    class FooWorker
      include Sidekiq::Worker
    
      sidekiq_options batch_flush_interval: 10, batch_unique: true
    
      def perform(n)
        puts n
      end
    end
    
    FooWorker.perform_async(1)
    FooWorker.perform_async(1)
    FooWorker.perform_async(2)
    FooWorker.perform_async(2)
    
    # => [[1], [2]]
  • batch_size is used to control single group size.

    class FooWorker
      include Sidekiq::Worker
    
      sidekiq_options batch_flush_size: 5, batch_size: 2
    
      def perform(n)
        puts n
      end
    end
    
    FooWorker.perform_async(1)
    FooWorker.perform_async(2)
    FooWorker.perform_async(3)
    FooWorker.perform_async(4)
    FooWorker.perform_async(5)
    
    # => [[1], [2]]
    # => [[3], [4]]
    # => [[5]]
  • tests_env is used to silence some logging in test environments (see below). Default: true if Rails.env.test?, false otherwise.

Web UI

Web UI

Add this line to your config/routes.rb to activate web UI:

require "sidekiq/grouping/web"

Configuration

Specify grouping configuration inside of sidekiq.yml:

grouping:
  :poll_interval: 5       # Amount of time between polling batches
  :max_batch_size: 5000   # Maximum batch size allowed
  :lock_ttl: 1            # Batch queue flush lock timeout job enqueues

Or set it in your code:

Sidekiq::Grouping::Config.poll_interval = 5
Sidekiq::Grouping::Config.max_batch_size = 5000
Sidekiq::Grouping::Config.lock_ttl = 1

Note that you should set poll_interval option inside of sidekiq.yml to take effect. Setting this param in your ruby code won't change actual polling frequency.

Testing with Sidekiq::Testing.fake!

Sidekiq::Grouping uses internal queues for grouping tasks. If you need to force flush internal queues into normal Sidekiq queues, use Sidekiq::Grouping.force_flush_for_test!.

See example:

# worker
class GroupedWorker

  include Sidekiq::Worker
  sidekiq_options(
    queue: :custom_queue,
    retry: 5,
    batch_flush_size: 9,
    batch_flush_interval: 10,
    batch_size: 3,
    batch_unique: true
  )

  def perform(grouped_arguments)
    # ... important payload
  end

end

# test itself
RSpec.describe GroupedWorker, type: :worker do

  describe '#perform' do
    it 'calls perform with array of arguments' do
      Sidekiq::Testing.fake! do
        described_class.perform_async(1)
        described_class.perform_async(1)
        described_class.perform_async(2)
        described_class.perform_async(2)

        # All 4 above asks will be put to :custom_queue despite of :batch_flush_size is set to 9.
        Sidekiq::Grouping.force_flush_for_test!

        last_job = described_class.jobs.last
        expect(last_job['args']).to eq([[[1], [2]]])
        expect(last_job['queue']).to eq('custom_queue')
      end
    end
  end

end

Installation

Add this line to your application's Gemfile:

gem 'sidekiq-grouping'

And then execute:

$ bundle

Or install it yourself as:

$ gem install sidekiq-grouping

Contributing

  1. Fork it ( http://github.com/gzigzigzeo/sidekiq-grouping/fork )
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

More Repositories

1

carrierwave-meta

File's metadata saving plugin for carrierwave
Ruby
85
star
2

google_translate_diff

Google Translate API wrapper translates only changes between revisions of big texts
Ruby
55
star
3

pg_closure_tree_rebuild

Quick #rebuild! method implementation for closure_tree on PostgreSQL
Ruby
41
star
4

bitfield_attribute

Bitfield value object for ActiveModel. No hidden definitions. No callbacks. Magicless.
Ruby
21
star
5

mhedruli

Учим грузинский алфавит (Google Chrome)
JavaScript
12
star
6

ebay_request

eBay API request interface
Ruby
8
star
7

format_alias

ActiveRecord attribute accessor aliases for date formatting etc.
Ruby
8
star
8

stellar-sep-0010-implementation

Implementation of SEP-0010
JavaScript
7
star
9

resource_spec

Test your RESTful Rails controllers with ease.
Ruby
7
star
10

constructor_shortcut

Syntax sugar gem used to generate class-level alias for the constructor
Ruby
6
star
11

cap_sync

Recipes to clone database & public data from production server to developement machine.
Ruby
6
star
12

stateful_link

StatefulLink is a helper that simplifies displaying stateful navigation links
Ruby
6
star
13

cloud-rsync

PoC of file sync utility which uses fastcdc for chunking
Rust
5
star
14

arel_date_scopes

MySQL date functions for AREL 2 + AR 3 date scopes
Ruby
3
star
15

rails_model_validator

Pattern of extraction ActiveRecord model validations into separate class
Ruby
3
star
16

icmp

Ruby
3
star
17

my_ror_test_task

My ruby on rails test task
Ruby
2
star
18

static_auth

Static authentication && authorization in rails
Ruby
2
star
19

render_with_missing_template

Renders default template if referenced template is missing
Ruby
2
star
20

grid_fu

HTML table generator
Ruby
2
star
21

session_param

Easy method to save search parameters in session and populate them from params[]
Ruby
1
star
22

host_constraint

Rails 3 constraint for domain-specific routing
Ruby
1
star
23

acts_as_sequence

Emulates database sequences for MySQL. It is useful when you want to have a unique value for an unsaved object.
Ruby
1
star