• Stars
    star
    131
  • Rank 275,867 (Top 6 %)
  • Language
    Clojure
  • License
    Eclipse Public Li...
  • Created over 9 years ago
  • Updated about 2 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Clojure API for a more dynamic Google Dataflow

Datasplash

Clojars Project

cljdoc badge

Clojure API for a more dynamic Google Cloud Dataflow and (not really battle tested) any other Apache Beam backend.

Usage

API docs

You can also see ports of the official Dataflow examples in the datasplash.examples namespace.

Here is the classic word count example.

ℹ️ You will need to run (compile 'datasplash.examples) every time you make a change.

(ns datasplash.examples
  (:require [clojure.string :as str]
            [datasplash.api :as ds]
            [datasplash.options :refer [defoptions]])
  (:gen-class))

(defn tokenize
  [^String l]
  (remove empty? (.split (str/trim l) "[^a-zA-Z']+")))

(defn count-words
  [p]
  (ds/->> :count-words p
          (ds/mapcat tokenize {:name :tokenize})
          (ds/frequencies)))

(defn format-count
  [[k v]]
  (format "%s: %d" k v))

(defoptions WordCountOptions
  {:input {:default "gs://dataflow-samples/shakespeare/kinglear.txt"
           :type String}
   :output {:default "kinglear-freqs.txt" :type String}
   :numShards {:default 0 :type Long}})

(defn -main
  [& str-args]
  (let [p (ds/make-pipeline WordCountOptions str-args)
        {:keys [input output numShards]} (ds/get-pipeline-options p)]
    (->> p
         (ds/read-text-file input {:name "King-Lear"})
         (count-words)
         (ds/map format-count {:name :format-count})
         (ds/write-text-file output {:num-shards numShards})
         (ds/run-pipeline))))

Run it from the repl

Locally on your machine using a DirectRunner:

(in-ns 'datasplash.examples)
(clojure.core/compile 'datasplash.examples)
(-main "--input=sometext.txt" "--output=out-freq.txt" "--numShards=1")

Remotely on Google Cloud using a DataflowRunner:

You should have properly configured your Google Cloud account and Dataflow access from your machine.

(in-ns 'datasplash.examples)
(clojure.core/compile 'datasplash.examples)
(-main "--project=my-project"
       "--runner=DataflowRunner"
       "--gcpTempLocation=gs://bucket/tmp"
       "--input=gs://apache-beam-samples/shakespeare/kinglear.txt"
       "--output=gs://bucket/outputs/kinglear-freq.txt"
       "--numShards=1")

Run it as a standalone program

Datasplash needs to be AOT compiled, so you should prepare an uberjar and run from your main entry like so:

java -jar my-dataflow-job-uber.jar [beam-args]

Caveats

  • Due to the way the code is loaded when running in distributed mode, you may get some exceptions about unbound vars, especially when using instances with a high number of cpu. They will not however cause the job to fail and are of no consequences. They are caused by the need to prep the Clojure runtime when loading the class files in remote instances and some tricky business with locks and require.
  • If you have to write your own low-level ParDo objects (you shouldn't), wrap all your code in the safe-exec macro to avoid issues with unbound vars. Any good idea about finding a better way to do this would be greatly appreciated!
  • If some of the UserCodeException as seen in the cloud UI are mangled and missing the relevant part of the Clojure source code, this is due to a bug with the way the sdk mangles stacktraces in Clojure. In this case look for ClojureRuntimeException in the logs to find the original unaltered stacktrace.
  • Beware of using Clojure 1.9: proxy results are not Serializable anymore, so you cannot use anywhere in your pipeline Clojure code that uses proxy. Use Java shim for these objects instead.
  • If you see something like java.lang.ClassNotFoundException: Options you probably forgot to compile your namespace.
  • Whenever you need to check some spec in user code, you will have to first require those specs because they may not be loaded in your Clojure runtime. But don't use (require) because it's not thread safe. See [this issue] for a workaround.
  • If you see a java.io.IOException: No such file or directory when invoking compile, make sure there is a directory in your project root that matches the value of *compile-path* (default classes).

License

Copyright © 2015-2023 Oscaro.com

Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.

More Repositories

1

ring-middleware-format

Ring middleware for parsing parameters and emitting responses in JSON or other formats
Clojure
163
star
2

gavagai

Fast Clojure library to convert deep Java objects structures to native Clojure.
Clojure
63
star
3

ultra-csv

All in one pixie fairy sparkling Clojure tool for easily reading and writing csv files
Clojure
31
star
4

meta-csv

A Clojure smart reader for CSV files
Clojure
28
star
5

clj-elasticsearch

Clojure client for the ElasticSearch Java API
Clojure
24
star
6

clj-rome

A Clojure wrapper for the ROME feed parsing and manipulation library
Clojure
14
star
7

pumila

Lighter Clojure replacement for Netflix hystrix latency and fault tolerance library
Clojure
10
star
8

stanford-nlp-tools

Clojure wrapper around the Stanford NLP tools
Clojure
6
star
9

clj-spore

Clojure implementation of the Spore ReST specification.
Clojure
6
star
10

clj-mapdb

Clojure idiomatic wrapper for MapDB embedded database
Clojure
5
star
11

clj-htmlunit

HTMLUnit wrapper for Clojure
Clojure
4
star
12

clj-jdbm

Clojure wrapper for the jdbm embedded store
Clojure
3
star
13

emacs-run-command-babashka-tasks

Emacs Lisp
3
star
14

clj-treetagger

Thin functional wrapper for the tt4j Java wrapper of the Treetagger POS tagger
Clojure
2
star
15

brotab.el

Emacs Lisp
2
star
16

clj-joq-client

"Implementation of the joq client protocol in Clojure"
Clojure
2
star
17

lazymap

Take the greediness out of maps
Clojure
2
star
18

test-async-fetch

testing asynchronous http fetch
Clojure
2
star
19

fixie

Clojure persisted on disk datastructures based on MapDB
Clojure
1
star
20

fpw2012

scripts and slides for the fpw2012 usesthis talk
Perl
1
star
21

tendre

Clojure persistent map implemntation based on Jetbrain Xodus
Clojure
1
star
22

clj-bbq

Clojure client for Google Big Query
Clojure
1
star
23

thersite

blog infrastructure
1
star
24

presque-worker

a basic worker for presque
Perl
1
star
25

mersenne

Module to generate Jekyll ready Markdown files from Google docs exports
Perl
1
star
26

emacs-run-command-clojure-deps

emacs-run-command extension to execute aliases and command from deps.edn files
Emacs Lisp
1
star
27

presque-client

Clojure client for presque persistent messaging system
Clojure
1
star