• Stars
    star
    204
  • Rank 186,559 (Top 4 %)
  • Language
    Ruby
  • License
    MIT License
  • Created about 7 years ago
  • Updated 10 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Composable pipelines for Enumerators.

Piperator

Pipelines for streaming large collections. The pipeline enables composition of streaming pipelines with lazy enumerables.

The library is heavily inspired by Elixir pipe operator and Node.js Stream.

Table of Contents

Installation

Piperator is distributed as a ruby gem and can be installed with

$ gem install piperator

Usage

Start by requiring the gem

require 'piperator'

Pipelines

As an appetizer, here's a pipeline that triples all input values and then sums the values.

Piperator.
  pipe(->(values) { values.lazy.map { |i| i * 3 } }).
  pipe(->(values) { values.sum }).
  call([1, 2, 3])
# => 18

The same could also be achieved using DSL instead of method chaining:

Piperator.build do
  pipe(->(values) { values.lazy.map { |i| i * 3 } })
  pipe(->(values) { values.sum })
end.call([1, 2, 3])

If desired, the input enumerable can also be given as the first element of the pipeline using Piperator.wrap.

Piperator.
  wrap([1, 2, 3]).
  pipe(->(values) { values.lazy.map { |i| i * 3 } }).
  pipe(->(values) { values.sum }).
  call
# => 18

Have reasons to defer constructing a pipe? Evaluate it lazily:

summing = ->(values) { values.sum }
Piperator.build
  pipe(->(values) { values.lazy.map { |i| i * 3 } })
  lazy do
    summing
  end
end.call([1, 2, 3])

There is, of course, a much more idiomatic alternative in Ruby:

[1, 2, 3].map { |i| i * 3 }.sum

So why bother?

To run code before the stream processing start and after processing has ended. Let's use the same pattern to calculate the decompressed length of a GZip file fetched over HTTP with streaming.

require 'piperator'
require 'uri'
require 'em-http-request'
require 'net/http'

module HTTPFetch
  def self.call(url)
    uri = URI(url)
    Enumerator.new do |yielder|
      Net::HTTP.start(uri.host, uri.port, use_ssl: uri.scheme == 'https') do |http|
        request = Net::HTTP::Get.new(uri.request_uri)
        http.request request do |response|
          response.read_body { |chunk| yielder << chunk }
        end
      end
    end
  end
end

module GZipDecoder
  def self.call(enumerable)
    Enumerator.new do |yielder|
      decoder = EventMachine::HttpDecoders::GZip.new do |chunk|
        yielder << chunk
      end

      enumerable.each { |chunk| decoder << chunk }
      yielder << decoder.finalize.to_s
    end
  end
end

length = proc do |enumerable|
  enumerable.lazy.reduce(0) { |aggregate, chunk| aggregate + chunk.length }
end

Piperator.
  pipe(HTTPFetch).
  pipe(GZipDecoder).
  pipe(length).
  call('http://ftp.gnu.org/gnu/gzip/gzip-1.2.4.tar.gz')

At no point is it necessary to keep the full response or decompressed content in memory. This is a huge win when the file sizes grow beyond the 780kB seen in the example.

Pipelines themselves respond to #call. This enables using pipelines as pipes in other pipelines.

append_end = proc do |enumerator|
  Enumerator.new do |yielder|
    enumerator.each { |item| yielder << item }
    yielder << 'end'
  end
end

prepend_start = proc do |enumerator|
  Enumerator.new do |yielder|
    yielder << 'start'
    enumerator.each { |item| yielder << item }
  end
end

double = ->(enumerator) { enumerator.lazy.map { |i| i * 2 } }

prepend_append = Piperator.pipe(prepend_start).pipe(append_end)
Piperator.pipe(double).pipe(prepend_append).call([1, 2, 3]).to_a
# => ['start', 2, 4, 6, 'end']

Enumerators as IO objects

Piperator also provides a helper class that allows Enumerators to be used as IO objects. This is useful to provide integration with libraries that work only with IO objects such as Nokogiri or Oj.

An example pipe that would yield all XML node in a document read in streams:

require 'nokogiri'
streaming_xml = lambda do |enumerable|
  Enumerator.new do |yielder|
    io = Piperator::IO.new(enumerable.each)
    reader = Nokogiri::XML::Reader(io)
    reader.each { |node| yielder << node }
  end
end

In real-world scenarios, the pipe would need to filter the nodes. Passing every single XML node forward is not that useful.

Development

After checking out the repo, run bin/setup to install dependencies. Then, run rake spec to run the tests. You can also run bin/console for an interactive prompt that will allow you to experiment.

To install this gem onto your local machine, run bundle exec rake install. To release a new version, update the version number in version.rb, and then run bundle exec rake release, which will create a git tag for the version, push git commits and tags, and push the .gem file to rubygems.org.

Related Projects

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/lautis/piperator.

License

The gem is available as open source under the terms of the MIT License.

More Repositories

1

uglifier

Ruby wrapper for UglifyJS JavaScript compressor.
JavaScript
613
star
2

refined-github-safari

(Deprecated) Safari version of the Refined Github extension
Swift
126
star
3

ruuvitag-listener

Listen to RuuviTag measurements and output in Telegraf compatible format
Rust
30
star
4

rollup-plugin-coffee-script

Integration between Rollup and CoffeeScript.
JavaScript
22
star
5

unicode-substring

Unicode-aware substring for JavaScript
JavaScript
22
star
6

rollup-plugin-browserify-transform

Use Browserify transforms with Rollup
JavaScript
21
star
7

redis-rails-instrumentation

Railtie to include Redis commands in Rails logging.
Ruby
16
star
8

emojie

Image fallback for emoji
JavaScript
16
star
9

rscsv

Experiments combining Ruby, Rust, and CSV
Ruby
16
star
10

bacontrap

Keyboard shortcuts with Bacon.js
JavaScript
11
star
11

excon-rails

Railtie to include Excon HTTP requests in Rails logging.
Ruby
6
star
12

cec-alsa-volume

Sync HDMI CEC volume events with ALSA mixer
JavaScript
6
star
13

encrypted_form_fields

Encrypted form fields for Rails apps.
Ruby
5
star
14

lisp-prolog

Implementation of a toy Lisp using Prolog
Prolog
4
star
15

socket-io-rails

Socket.IO-client in Rails asset pipeline
Ruby
4
star
16

sweet_notifications

Syntactic sugar for ActiveSupport::LogSubscribers
Ruby
3
star
17

terraform-kubernetes-influxdb-grafana

Terraform Kubernetes example with InfluxDB and Grafana
HCL
3
star
18

dotfiles

This is how I roll
Shell
2
star
19

bacon-jest-test

An example of using Bacon.js with Jest
JavaScript
2
star
20

wtf8

UTF-8 encoder and decoder for Node
JavaScript
2
star
21

hot-shots-deux

Hot Shots (Node.js StatsD client) wrapper with key-value Datadog/Telegraf tags
TypeScript
2
star
22

emojie-iphone

Emoji images for incompatible browsers using Emojie
JavaScript
2
star
23

plex-yle-areena

Yle Areena plugin for Plex Media Center
Python
1
star
24

rack-inflater

Decompress body of incoming HTTP requests.
Ruby
1
star
25

scala-tmbundle

My hacks on the "official" TextMate Scala bundle
Ruby
1
star