• Stars
    star
    867
  • Rank 52,618 (Top 2 %)
  • Language
    Clojure
  • Created about 10 years ago
  • Updated almost 2 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Clojure reducers, but for parallel execution: locally and on distributed systems.
API docs Clojars Purpose
tesser.core tesser.core The core library and essential folds
tesser.math tesser.math Statistical folds: means, correlations, covariance matrices, quantiles, etc.

Tesser

"Now we will tesser, we will wrinkle again. Do you understand?" "No," Meg said flatly. Mrs. Whatsit sighed. "Explanations are not easy when they are about things for which your civilization still has no words. Calvin talked about traveling at the speed of light. You understand that, little Meg?" "Yes," Meg nodded. "That, of course, is the impractical, long way around. We have learned to take short cuts wherever possible." "Sort of like in math?" Meg asked. "Like in math."

β€” Madeline L'Engle, A Wrinkle In Time.

A Motivating Example

You've got a big pile of data--say, JSON in files on disk, or TSVs in Hadoop--and you'd like to reduce over that data: computing some statistics, searching for special values, etc. You might want to find the median housing price in each city given a collection of all sales, or find the total mass of all main-sequence stars in a region of sky, or search for an anticorrelation between vaccine use and the prevalence of a disease. These are all folds: collapsing a collection of data into a smaller value.

In Clojure, we're used to writing programs like

(->> stars
     (filter main-sequence?)
     (map :mass)
     (reduce +))

But this reduction is singlethreaded, and can only run on a single machine. You've got 48 cores in your desktop computer. Why aren't they all helping?

(require '[tesser.core :as t])
(->> (t/filter main-sequence)
     (t/map :mass)
     (t/fold +)
     (t/tesser (t/chunk 1024 stars)))

Tesser goes much deeper, but this is the essence: writing understandable, composable, fast programs for exploring datasets.

A Clojure Library for Concurrent & Commutative Folds

Tesser gives us a library for building up folds, and applying those folds to a collection of inputs, divided into chunks. Chunks are reduced with maximal parallelism, and the results of those reductions are reduced together. We call the concurrent reduction :reducer, and the serial reduction :combiner.

Reduce/combine diagram

In order to reduce over a chunk, we need an initial value. A :reducer-identity function generates this initial value. Once the reduction is complete, we may want to transform it using a :post-reducer function--for instance, converting from a transient to a persistent data structure, or discarding internal reducer state.

Diagram of reducer identities and post-reduce

Likewise, we need a :combiner-identity function to generate an initial value for the combine reduction, and a final :post-combine function to transform the combiner's output.

Diagram of combiner identity and post-combine

Tesser folds are associative and commutative monoids: they can be applied concurrently and do not preserve order. We make this tradeoff to reduce the need for coordination between reducers. Not all concurrent folds require/provide commutativity: Algebird, for example, provides ordered distributed folds, and clojure.core.reducers preserves order as well. Right now Tesser doesn't, but we might change that someday.

Unlike CRDTs, Tesser's folds do not require idempotence. In Tesser, 1 + 1 is 2, not 1. Each input factors in to the reduced value exactly once--though the order is up to Tesser.

Representing a Fold

If all you need is a quick-n-dirty replacement for an existing reduce or fold, just drop tesser.simple into your program.

(require '[tesser.simple :as s])
user=> (s/reduce + 0 (range 10000000))
  49999995000000

But Tesser is capable of composing folds together--combining simple functions to express complex ideas. In Tesser, we represent a compiled fold as map of six functions:

{:reducer-identity  (fn [] ...)
 :reducer           (fn [accumulator input] ...)
 :post-reducer      (fn [accumulator] ...)
 :combiner-identity (fn [] ...)
 :combiner          (fn [accumulator post-reducer-result])
 :post-combiner     (fn [accumulator] ...)}

For instance, here's a fold to find the sum of all inputs. While the reducer and combiner often have the same accumulator type and identities, this is not always the case.

(require '[tesser.core :as t])
(t/tesser [[1] [2 3]]
          (t/fold {:reducer-identity  (constantly 0)
                   :reducer           +
                   :post-reducer      identity
                   :combiner-identity (constantly 0)
                   :combiner          +
                   :post-combiner     identity}))
; => 6

Since (+) returns 0 (the additive identity), we can leave off the identities; t/fold will use the reducer as the identity function. The same goes for the post-reducer and post-combiner: (+ x) returns x, so we can leave them off too:

(t/tesser [[1] [2 3]]
          (t/fold {:reducer  +
                   :combiner +}))
; => 6

Or simply pass a function to t/fold, which will be used for all 6 functions.

(t/tesser [[1] [2 3]] (t/fold +))
; => 6

But that's not all; we can transform the summing fold into one that operates on strings like "1" with the map function:

(->> (t/map read-string)
     (t/fold +)
     (t/tesser [["1" "2" "3"]]))
; => 6

Tesser provides a rich library of fold transforms, allowing you to build up complex folds out of simple, modular parts.

Core

Tesser.core looks a lot like the Clojure seq API, and many of its functions have similar names. Their semantics differ, however: Tesser folds do not preserve the order of inputs, and when executed, they run in parallel.

Applying a fold using tesser.core/tesser uses multiple threads proportional to processor cores. Unlike reducers, we don't use the Java forkjoin pool, just plain old threads; it avoids contention issues and improves performance on most JDKs.

(require '[tesser.core :as t])

(t/tesser [[1 2 3] [4 5 6]] (t/into [] (t/map str)))
=> ["4" "5" "6" "1" "2" "3"]

(->> (t/map inc)      ; Increment each number
     (t/filter odd?)  ; Take only odd numbers
     (t/take 5)       ; *which* five odd numbers are selected is arbitrary
     (t/mapcat range) ; Explode each odd number n into the numbers from 0 to n
     (t/frequencies)  ; Compute the frequency of appearances
     (t/tesser (partition 3 (range 100))))
=> {0 5, 7 2, 20 1, 27 1, 1 4, 24 1, 4 3, 15 2, 21 1, 13 2, 22 1, 6 2, 28 1, 25 1, 17 1, 3 3, 12 2, 2 4, 23 1, 19 1, 11 2, 9 2, 5 2, 14 2, 26 1, 16 2, 10 2, 18 1, 8 2}

Fold combinators like facet and fuse allow multiple reductions to be done in a single pass, possibly sharing expensive operations like deserialization. This is a particularly effective way of working with a set of data files on disk or in Hadoop.

Given JSON records about a codebase like

{"year":         2004,
 "lines-of-code" {"ruby": 100,
                  "c":    1693}}

We can find the range of years and the total lines of code in each language in a single pass.

(->> (t/map #(json/parse-string % true))
     (t/fuse {:year-range (t/range (t/map :year))
              :total-code (->> (t/map :lines-of-code)
                               (t/facet)
                               (t/reduce + 0)})
     (t/tesser records))

=> {:year-range [1986 2014]
    :total-code {:c    153423,
                 :ruby 4578,
                 :tcl  3453
                 :bf   1}}

Ready to get started? Start with the tesser.core docs

Math

Tesser.math provides statistical folds. Some, like sum, mean, and correlation, are exact. Others, like quantiles, are estimates.

For instance, to find a Pearson correlation matrix between ln(age), height, and weight; and in the same pass, to find the total number of samples:

(require '[tesser.math :as m])
(->> (t/fuse {:count (t/count)
              :corrs (m/correlation-matrix {:log-age #(Math/ln (:age %))
                                            :height  :height
                                            :weight  :weight})})
     (t/tesser records))

=> {:count 7123525
    :corrs {[:log-age :weight] 0.74
            [:log-age :height] 0.86
            [:weight  :height] 0.91
            ... and the same keys in symmetric order ...}}

Ready? To the tesser.math API!

Hadoop

Update, 2021-03-01: Parkour, the library tesser.hadoop used to talk to Hadoop, hasn't been maintained in some time: its dependencies are now unresolvable. Tesser is likely still a good fit for Hadoop, but I need someone who actually uses Hadoop to take this over. PRs, please!

The tesser.hadoop API takes Tesser folds and distributes them using the Parkour Hadoop library. You can test your folds locally, then run them on a cluster to reduce over huge datasets.

(require '[tesser [core :as t]
                  [math :as m]
                  [hadoop :as h]])

(defn analyze
  "A fold that analyzes measurements of trees from a certain location."
  [location]
  (->> (t/map parse-record)
       (t/filter #(= location (:location %)))
       (t/fuse {:count (t/count)
                :oldest (->> (t/map :age)
                             (t/max))
                :corrs (m/correlation-matrix
                         {:age          :age
                          :log-mass     #(Math/log (:mass %))
                          :growth-rings :growth-rings
                          :humidity     :humdity})})))

(h/fold conf
        (text/dseq "hdfs:/some/file/part-*")
        "hdfs:/tmp/tesser"
        #'analyze "Redwood National Park")

See the Hadoop demo project for an example of how to run a fold in Hadoop.

An integrative example

Let's say we're public health researchers, and we're trying to understand what factors influence the prevalence of a communicable disease. There are three medications, x, y, and z, which show promise for preventing its spread, but not everyone takes medications regularly. We have anonymized case reports for each infection, like:

{:year        2011                ; The year the infection was reported
 :age         22                  ; The patient's age
 :zip         94110               ; The patient's zip code
 :primary-facility "City Clinic"  ; Where does this person go for care?
 :medications [:x, :y]            ; The medications this person has been using
 :compliance  {:x 0.9             ; How often do they adhere to the
               :y 0.6}            ; prescribed dosage?
(require '[tesser.core :as t]
          [tesser.math :as m]
          [tesser.quantiles :as q])

(defn fold
  "Computes aggregate statistics over infection cases"
  []
  (t/fuse
    {; Total number of cases
     :total       (t/count)

     ; How has the number of cases changed over time?
     :trend       (->> (t/group-by :year)
                       (t/count))

     ; Number of cases for the year 2018, broken down by zip code
     :recent-cases (->> (t/filter #(= 2018 (:year %)))
                        (t/group-by :zip)
                        (t/count))

     ; A sorted map of ages to the number of cases with that age
     :age-range   (->> (t/map :age)
                       (t/frequencies)
                       (t/post-combine (partial into (sorted-map))))

     ; How many cases were on no medications at all?
     :no-meds     (->> (t/filter (comp empty? :medications))
                       (t/count))

     ; A histogram of medication compliance, broken down by medication type
     :med-compliance (->> (t/map :compliance)
                          (t/facet)
                          (m/digest (partial q/dual q/hdr-histogram))
                          (t/post-combine q/distribution))

     ; Correlation and coocurrence count between age and number of medications
     :age-meds  (m/correlation+count :age (comp count :medications))

     ; The covariance matrix between medication compliance--are any pair of
     medications linearly covariant?
     :med-cov        (->> (t/map :compliance)
                          (m/covariance-matrix {:x :x
                                                :y :y
                                                :z :z}))}))

Because these folds are collection-indepedent, and defined in small chunks, we can break them up into functions, write small tests to verify each folds behavior indepedently, then compose them into larger programs. We're free to name transformations at any level by binding them to let variables or defns, or to build complex folds in a single pass.

Invariants

In order for Tesser to execute a fold concurrently, a fold must obey some simple invariants.

  • All functions should be deterministic functions purely of their inputs. Constructing two copies of the same fold on, say, different nodes in a cluster should result in equivalent behavior. You cannot squirrel away state in a lexical closure, for instance.
  • Accumulators may be mutable, and folds never modify the same accumulator concurrently. You may mutate the accumulator in a reducer, combiner, or post-fn without synchronization.
  • Reducers and combiners must be associative: (f a (f b c)) = (f (f a b) c).
  • Reducers and combiners must be commutative: (f a b) = (f b a).
  • Reducers and combiners may emit reduced values, which force immediate completion of that particular reduce or combine. If a reducer emits a reduced value, it has no impact on the execution of other reducers, or the combiner.

Performance

When the computation you're performing on each input dominates, Tesser should be significantly faster than clojure.core/reduce and somewhat faster than clojure.core.reducers/fold. Tesser performs worst when the reducing operations are cheap compared to the cost of traversing the collection. Even in these cases, Tesser's performance on laptop and server-class x64 hardware ain't too shabby.

On a 48-way (including HT) E5-2697, summing 10 million random longs:

Collection Clojure reduce Reducers fold Tesser
Array 460 MHz 420 MHz 2900 MHz
Vector 490 MHz 4300 MHz 4700 MHz

And the equivalent of (->> (map inc) (filter even?) (reduce +)) over those 10 million longs:

Collection Clojure reduce Reducers fold Tesser
Array 43 MHz 270 MHz 2400 MHz
Vector 120 MHz 3400 MHz 3200 MHz

Run lein test :bench to reproduce results on your hardware.

In general, Tesser...

  • Sees the same benefits of stream fusion as reducers: fewer allocations for intermediate seqs,
  • Can parallelize over primitive arrays (unlike reducers), and
  • Reduces thread contention versus Java forkjoin (used by reducers/fold).

However, Tesser cannot automatically partition and traverse vectors as efficiently as core.reducers can, which makes it slightly slower when you have a single vector and ask Tesser to partition it for you. Passing a series of vectors to tesser.core/tesser makes Tesser's traversal costs identical to core.reducers.

There's also some low-hanging fruit in fuse, take, etc. that can be optimized later; we allocate fresh vectors to box reduction state on every new input, instead of clobbering a variable in place. Haven't gotten around to tuning that yet.

In real-world applications, Tesser has significantly improved single-node performance relative to reducers/fold, but YMMV.

Vs Reducers and Transducers

Clojure's reducers and transducers embody sequential folds: they move from left to right over a sequence. Reducers also includes a less-well-known hierarchical fold which parallelizes a reduction via Java's fork-join pool, but this reduction is still fundamentally ordered and local to a single machine.

Tesser explores a different niche. It offers:

  • Commutativity. Tesser folds must not depend on the order of inputs.
  • Concurrency. Reductions over independent chunks require no coordination, making them good candidates for distributed contexts like Hadoop.
  • Stream fusion. Like Reducers and Transducers, map, filter, etc. are all folded into a single reduction function. Intermediate values are stack-allocated, reducing GC load.
  • Collection independence. Like Transducers, Tesser folds are abstract transformations and can be re-used against varying types of collections.

Building

  • Update project versions in core/project.clj, math/project.clj, hadoop/project.clj, all/project.clj

Test and install

cd core/ && lein do test, install && cd ../
cd math/ && lein do test, install && cd ../
cd hadoop/ && lein do test, install && cd ../

Commit

VERSION="x.x.x"
git commit -a -m "Version $VERSION"
git tag "v$VERSION"
git push
git push --tags

Deploy

cd core/ && lein deploy clojars && cd ../
cd math/ && lein deploy clojars && cd ../
cd hadoop/ && lein deploy clojars && cd ../

Rebuild documentation

cd all/ && lein codox

Docs commit on gh-pages branch (assumes you've got all/doc set up as gh-pages)

cd all/doc && git commit -am "Docs for version x.x.x"
git push

Contributors

License

Eclipse Public License v1.0, same as Clojure.

More Repositories

1

distsys-class

Class materials for a distributed systems lecture series
8,983
star
2

meangirls

Convergent Replicated Data Types
Ruby
650
star
3

tund

SSH reverse tunnel daemon
Ruby
418
star
4

tea-time

Lightweight Clojure task scheduler
Clojure
240
star
5

salticid

A deployment system, with design goals 1: Magic and 2: More Magic
Ruby
222
star
6

dom-top

Unorthodox control flow, for Clojurists with masochistic sensibilities.
Clojure
204
star
7

timelike

A library for simulating parallel systems, in Clojure
Clojure
184
star
8

partitions-post

A blog post on network partitions in practice
182
star
9

clj-antlr

Clojure bindings for the ANTLR 4 parser
Clojure
167
star
10

less-awful-ssl

Sssh no tears, only TLS now. For Clojure.
Clojure
154
star
11

interval-metrics

Clojure data structures for performance metrics over discrete time intervals.
Clojure
118
star
12

dist-sagas

A paper on sagas in distributed systems
TeX
91
star
13

gretchen

Offline serializability verification, in Clojure
Clojure
68
star
14

prism

Automatically re-run clojure tests
Clojure
59
star
15

verschlimmbesserung

An etcd client with modern Clojure sensibilities
Clojure
56
star
16

merkle

Clojure Merkle Trees
Clojure
51
star
17

gnuplot

Clojure gnuplot bindings
Clojure
47
star
18

risky

A lightweight Ruby ORM for Riak
Ruby
40
star
19

schadenfreude

Clojure benchmarking tools
Clojure
35
star
20

jepsen-talks

Slides and resources for talks on partition tolerance
Clojure
33
star
21

meitner

Explodes Clojure functions and macros into dependency graphs
Clojure
30
star
22

aesahaettr

Sharding, partitioning, and consistent hashing for Clojure. May release spectres.
Clojure
26
star
23

bitcask-ruby

An (incomplete) interface to the Bitcask storage system
Ruby
19
star
24

ustate

micro state daemon
Ruby
18
star
25

salesfear

A Clojure salesforce client.
Clojure
17
star
26

construct

Extensible, persistent, structured configuration for Ruby
Ruby
16
star
27

skewbinheap

A Skew Binomial Heap for Erlang.
Erlang
15
star
28

yamr

A Linux Yammer client.
Ruby
12
star
29

bifurcan-clj

Clojure wrapper for the Bifurcan family of data structures
Clojure
12
star
30

tumblr-archiver

Hacky Clojure program to download media from tumblr liked posts
Clojure
12
star
31

cyclic.js

Cyclic time series data structures for javascript
JavaScript
10
star
32

riemann-bench

An example for using the Reimann Clojure client
Clojure
10
star
33

london-gen

Silly London Landmarks
Clojure
8
star
34

gifdex

A gif tagging server for local use
Clojure
8
star
35

mtrc

Ruby metrics
Ruby
8
star
36

adaptive-executor

Adaptive threadpool executor experiment
Clojure
6
star
37

lights

Change your Hue lights to randomly generated colors, continuously
Clojure
5
star
38

thought-leaders

The definitive list of thought leaders
5
star
39

mastodon-utils

Utilities for working with Mastodon's API
Clojure
5
star
40

prometheus-mastodon-exporter

Exports Mastodon statistics for polling by Prometheus
Clojure
5
star
41

cortex-reaver

A dangerous Ruby blog engine, with a photographic memory.
Ruby
5
star
42

hangman

A ridiculously overpowered hangman AI in Clojure
Clojure
4
star
43

qsd-phase-space-reconstruction

Some ruby scripts for exploring datasets in an attempt to reconstruct phase space dynamics (and to identify lyapunov exponents) of a QSD-simulated Duffing oscillator
Ruby
4
star
44

exocora

A lightweight CGI script framework
Ruby
3
star
45

joedahato

Predict's Joe Damato's hat choices
Clojure
3
star
46

tattoo

Overkill
Clojure
2
star
47

producer_consumer

Ruby queue-backed producer consumer gem
Ruby
2
star
48

ruby-vodpod

Ruby bindings for the Vodpod API.
Ruby
2
star
49

frisk-management

NLP + erotic fiction -> PowerPoint slides
Clojure
2
star
50

heliotrope

A client for the distributed processing system Fabric
Ruby
2
star
51

qsd-tangent

Fork of QSD library with tangent space evolution
C++
2
star
52

caremad

Consistent Commutative Replicated DataTypes
2
star
53

euler-clj

Project Euler solutions in Clojure
Clojure
1
star
54

req-replay

silly experiment
Clojure
1
star
55

autotags

Jquery javascript tag editor with autocomplete
JavaScript
1
star
56

riakeys

Riak key cache (apparently impossible)
Clojure
1
star
57

clojure-perf

Mucking around with clojure performance testing
Clojure
1
star
58

mecha-query

Clojure
1
star