• Stars
    star
    138
  • Rank 264,508 (Top 6 %)
  • Language
    JavaScript
  • License
    Apache License 2.0
  • Created over 9 years ago
  • Updated almost 4 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

When you need to push data around, you push it. A node.js ETL tool.

Empujar. Empujarlo Bueno.

When you need to push data around, you push it. Push it real good.
An ETL and Operations tool.

Build Status

https://raw.githubusercontent.com/taskrabbit/empujar/master/empujar.png

What

Empujar is a tool which moves stuff around. It's built in node.js so you can do lots of stuff async-ly. You can move data around (a ETL tool), files (a backup tool), and more!

Empujar's top level object is a "book", which contains "chapters" and then "pages". Chapters are excecuted 1-by-1 in order, and then each page in a chapter can be run in parallel (up to a threading limit you specify).

See an example project here.

For Example, an example chapter to extract all data from a mySQL database would be:

var dateformat = require('dateformat');

exports.chapterLoader = function(book){

  // define
  var chapter = book.addChapter(1, 'EXTRACT & LOAD', {threads: 5});

  // helpers
  var source       = book.connections.source.connection;
  var destination  = book.connections.destination.connection;
  var queryLimit   = 1000;
  var tableMaxes   = {};

  var extractTable = function(table, callback){
    destination.getMax(table, 'updatedAt', function(error, max){
      if(error){ return callback(error); }

      var query = 'SELECT * FROM `' + table + '` ';
      if(max){
        query += ' WHERE `updatedAt` >= "' + dateformat(max, 'yyyy-mm-dd HH:MM:ss') + '"';
      }

      source.getAll(query, queryLimit, function(error, rows, done){
        destination.insertData(table, rows, function(error){
          if(error){ return next(error); }
          done();
        });
      }, callback);
    });
  };

  chapter.addLoader('determine extract queries', function(done){
    source.tables.forEach(function(table){
      chapter.addPage('extract table: ' + table, function(next){
        extractTable(table, next);
      });
    });
    done();
  });

};

Empujar runs operations in series or parallel. These are defined by books and chapters and pages.

#!/usr/bin/env node

process.chdir(__dirname);

var Empujar    = require('empujar');
var optimist   = require('optimist');
var options    = optimist.argv; // get command line opts, like `--logLevel debug` or `--chapters 100`

var book = new Empujar.book(options);

// you can define custom error behavior when a page callback retruns an error
var errorHandler = function(error, context){
  console.log("OH NO! (but I handled the error) | " + error);
  setTimeout(process.exit, 5000);
};

book.on('error', errorHandler);

book.connect(function(){

  // the logger will output to the console and a log file
  book.logger.log('I am a debug message', 'debug'); // log levels can be set on log lines, and toggled with the `--logLevel` flag

  // define `book.data.stuff` to make it availalbe to all phases of the book
  book.data.stuff = 'something cool';

  var chapter1 = book.addChapter(1, 'Do the first thing in parallel', {threads: 10});
  var chapter2 = book.addChapter(2, 'Do that next thing in serial', {threads: 1});

  // chapter 1
  var i = 0;
  while(i < 100){
    chapter1.addPage('sleepy thing: ' + i, function(next){
      setTimeout(next, 100);
    });
    i++;
  }

  // chapter 2

  // chapters can also have pre-loaders which run before all pages
  chapter2.addLoader('do something before', function(next){
    book.logger.log('I am the preloader');
    next();
  });

  chapter2.addPage('the final step', function(next){
    next();
    // next(new Error('on no!')); // if you end a page with an error, the errorHandler will be invoked, and the book stopped
  });

  // chapters can also be loaded from /chapters/name/chapter.js in the project
  // book.loadChapters();

  // you can also configure an optional logger (perhaps to a DB) for empujar's internal status
  // book.on('state', function(data){
  //   databse.insertData('empujar', [data]);
  // });

  book.run(function(){
    setTimeout(process.exit, 5000);
  });
});

There is also a more formal example you can explore within this project. Check out /books/etl to learn more.

Empujar will connect to connections you define in book/config/connections/NAME.js, and there should be a matching transport in /lib/connections/TYPE.js.

When book.run() is complete, you probably want to process.exit(), or more gracefully shutdown.

You can subscribe to book.on('error') and book.on('state') events. A cool thing to do would be to actually record these state events into your datawarehouse, if you are using empujar as an ETL tool:

book.on('state', function(data){  datawarehouse.insertData('empujar', [data]);  });

Project Layout

Create your project so that it looks like this:

| -\books
| ---\myBook
| -----\book.js
| -----\pids\
| -----\logs\
| -----\config\
| -----\config\connections\
| -----\config\connections\myDatabase.js
| -----\chapters\
| -----\chapters\chapte1.js
| -----\chapters\chapte2.js

Launch Flags

The defaults for all launch flags are:

{
  chapterFiles: path.normalize( process.cwd() + '/chapters/**/*.js' ),
  configPath:   path.normalize( process.cwd() + '/config' ),
  logPath:      path.normalize( process.cwd() + '/log' ),
  pidsPath:     path.normalize( process.cwd() + '/pids' ),
  logFile:      'empujar.log',
  tmpPath:      path.normalize( process.cwd() + '/tmp' ),
  logStdout:    true,
  logLevel:     'info',
  chapters:     [],
  getAllLimit:  Infinity,
}

Examples:

  1. Run your book: node yourBook.js
  2. Run your book in verbose mode: node yourBook.js --logLevel debug
  3. Run only certain chapters in your book: node yourBook.js --chapters 1,4 or a range: node yourBook.js --chapters 100-300
  4. Extract only a small subset of yoru data (great in testing) node yourBook.js --getAllLimit 1000
  • This would make all invocations of connection.getAll() exit sucessfully after retrieving 1000 rows.

Connections

While you can create your own connections, Empujar ships with the tools to work with a number of the most common ones:

MySQL

var connection = book.connections.mysql.connection;

connection.connect = function(callback)
// Connection method; handled by book.connect();
// callback is passed (error)

connection.showTables = function(callback)
// list tables
// callback is returned error, array of table names

connection.showColumns = function(table, callback)
// list the columns + metadata for each column
// callback is returned error, hash of columns + metadata

connection.query = function(query, data, callback)
// query the table
// data can be optional; used to fill in missing attributes/interpolate (?)
// callback is returned error, rows (array of hashes col-value)

connection.getAll = function(queryBase, chunkSize, dataCallback, doneCallback)
// fetch data from the cluster; normalized as an array of hashes.  Data is already typecast.
// queryBase -> the base mySQL query (Limit and offset will be appended automatically)
// chunkSize -> number of results to return (IE: limit)
// dataCallback -> callback called with each collection of data
//   -> (error, data, next)
//   -> data is normalized
//   -> next() must be called to continue
// doneCallback is passed (error, rowsFound)

connection.getMax = function(table, column, callback)
// list the maximum value for a column in a table
// callback is returned error, maximum value from the table or null

connection.queryStream = function(query, callback)
// get a stream that returns results of a query
// events listed here: https://github.com/felixge/node-mysql#streaming-query-rows
// callback is returned error, stream

connection.insertData = function(table, data, callback, mergeOnDuplicates)
// add data to an table; create the index if needed.  Data should be normalized (IE results from #getAll)
// callback is passed (error)

connection.addColumn = function(table, column, rowData, callback)
// add a column to a table.
// RowData is an array of data to insert into the column which can be used to determine the column data type
// callback is returned error

connection.alterColumn = function(table, column, definition, callback)
// change the datatype of a column
// definition is a mySQL statment
// callback is returned error

connection.mergeTables = function(sourceTable, destinationTable, callback)
// merge the data from sourceTable into destinationTable
// destinationTable will be created if if doesn't exist
// destinationTable will be erased and recreated from sourceTable if there is no primary key present
// callback is returned error

connection.copyTableSchema = function(sourceTable, destinationTable, callback)
// create a new table (destinationTable) with the same schema as (sourceTable)
// callback is returned error

connection.dump = function(file, options, callback)
// mysqlDump the DB to file
// options:
/*
  if(!options.binary){   options.binary = 'mysqldump';             }
  if(!options.database){ options.database = self.options.database; }
  if(!options.password){ options.password = self.options.password; }
  if(!options.host){     options.host = self.options.host;         }
  if(!options.port){     options.port = self.options.port;         }
  if(!options.user){     options.user = self.options.user;         }
  if(!options.tables){   options.tables = [];                      }
  if(!options.gzip){     options.gzip = false;                     }
*/
// callback is returned error

Elasticsearch

var connection = book.connections.elasticsearch.connection;

connection.connect = function(callback)
// Connection method; handled by book.connect();
// callback is passed (error)

connection.showIndices = function(callback)
// list the indices in the cluster
// callback is passed (error, indicies)
//  -> `indicies` is a hash with index names and metadata

connection.insertData = function(index, data, callback)
// add data to an index; create the index if needed.  Data should be normalized (IE results from #getAll)
// callback is passed (error)

connection.getAll = function(index, query, fields, chunkSize, dataCallback, doneCallback)
// fetch data from the cluster; normalized as an array of hashes.  Data is already typecast.
// index -> string name of index
// query -> the elasticsearch query (as a hash)
// fields -> array of fields you want returned; '*' can be passed as an argument to request all fields
// chunkSize -> number of results to return (from each server)
// dataCallback -> callback called with each collection of data
//   -> (error, data, next)
//   -> data is normalized
//   -> next() must be called to continue
// doneCallback is passed (error, rowsFound)

S3

var connection = book.connections.s3.connection;

connection.connect = function(callback)
// Connection method; handled by book.connect();
// callback is passed (error)

connection.listFolders = function(prefix, callback)
// list all folders in this S3 bucket (starting with `prefix`)
// prefix can be `*`of `''` to get all folders in the bucket
// callback is passed (error, arrayOfFolderNames)

connection.listObjects = function(prefix, callback)
// list all objects in this S3 bucket (starting with `prefix`)
// prefix can be `*`of `''` to get all folders in the bucket
// callback is passed (error, arrayOfObjectNames)

connection.deleteFolder = function(prefix, callback)
// delete the folder starging with `prefix`, and all objects contatined within
// like `rm -rf prefix`
// prefix can be `*`of `''` to delete all folders and files in the bucket
// callback is passed (error)

connection.objectExists = function(filename, callback)
// check if a file exists in this bucket
// callback is passed (error, exists) where exists is a boolean

connection.delete = function(filename, callback)
// delete a file from this bucket
// callback is passed (error)

connection.streamingUpload = function(inputStream, filename, callback)
// upload a file* to S3 with the filename `filename`
// the file you are uploading should be a readableStream created with fs.createReadStream
// callback is passed (error)

FTP

var connection = book.connections.ftp.connection;

connection.connect = function(callback)
// Connection method; handled by book.connect();
// callback is passed (error)

connection.get = function(file, callback)
// donwload a file from the FTP server
// callback is passed (error, stream)
//  -> `stream` which you can pipe to a file on disk or S3, etc

connection.listFiles = function(dir, callback)
// list files from a remote directory
// callback is passed (error, files)
//  -> `files` is an array of remote file names

Amazon Redshift

var connection = book.connections.redshift.connection;

connection.connect = function(callback)
// Connection method; handled by book.connect();
// callback is passed (error)

connection.showTables = function(callback)
// list tables
// callback is returned error, array of table names

connection.showColumns = function(table, callback)
// list the columns + metadata for each column
// callback is returned error, hash of columns + metadata

connection.query = function(query, callback)
// query the table
// callback is returned error, rows (array of hashes col-value)

connection.getAll = function(queryBase, chunkSize, dataCallback, doneCallback)
// fetch data from the cluster; normalized as an array of hashes.  Data is already typecast.
// queryBase -> the base mySQL query (Limit and offset will be appended automatically)
// chunkSize -> number of results to return (IE: limit)
// dataCallback -> callback called with each collection of data
//   -> (error, data, next)
//   -> data is normalized
//   -> next() must be called to continue
// doneCallback is passed (error, rowsFound)

connection.insertData = function(table, data, callback)
// add data to an table; create the index if needed.  Data should be normalized (IE results from #getAll)
// callback is passed (error)

connection.mergeTables = function(sourceTable, destinationTable, callback)
// merge the data from sourceTable into destinationTable
// destinationTable will be created if if doesn't exist
// destinationTable will be erased and recreated from sourceTable if there is no primary key present
// callback is returned error

connection.addColumn = function(table, column, rowData, callback)
// add a column to a table.
// RowData is an array of data to insert into the column which can be used to determine the column data type
// callback is returned error

connection.alterColumn = function(table, column, definition, callback)
// change the datatype of a column
// definition is a mySQL statment
// callback is returned error

connection.copyTableSchema = function(sourceTable, destinationTable, callback)
// create a new table (destinationTable) with the same schema as (sourceTable)
// callback is returned error

connection.getMax = function(table, column, callback)
// list the maximum value for a column in a table
// callback is returned error, maximum value from the table or null

Creating your own connections.

It's easy to add your own connections to empujar. All you need is a /connections folder in your project, and to follow some conventions. The basic building block of a connection looks like this:

var connection = function(name, type, options, book){
  this.name       = name;
  this.type       = type;
  this.options    = options;
  this.book       = book;
  this.connection = null;
};

connection.prototype.connect = function(callback){
  var self = this;
  // connection logic
  callback();
};

/// Your Methods...

exports.connection = connection;

... and then extend your connection model with more prototypes.

For example, here'e a connection, delighted.js which TaskRabbit uses to import NPS survey data from our partner Delighted. We extend their library to match the getAll method of the built-in connections above.

var dateformat = require('dateformat');
var Delighted  = require('delighted');

var connection = function(name, type, options, book){
  this.name       = name;
  this.type       = type;
  this.options    = options;
  this.book       = book;
  this.connection = null;
};

connection.prototype.connect = function(callback){
  var self = this;
  self.connection = Delighted(self.options.apiKey);
  callback();
};

connection.prototype.getAll = function(since, dataCallback, doneCallback, page, rowsFound){
  var self = this;
  var data = [];
  if(page === undefined || page === null){ page = 1; }
  if(!rowsFound){ rowsFound = 0; }

  var options = {
    per_page : 100,
    since    : since, // in unix timestamps (not JS timestamps)
    page     : page,
    expand   : 'person',
  };

  self.connection.surveyResponse.all(options).then(function(responses) {

    if(responses.length === 0){
      doneCallback(null, rowsFound);
    }else{
      rowsFound = rowsFound + responses.length;

      responses.forEach(function(resp){
        data.push({
          id:            parseInt(resp.id),
          person:        parseInt(resp.person.id),
          score:         parseInt(resp.score),
          comment:       resp.comment,
          permalink:     resp.permalink,
          created_at:    dateformat(resp.created_at * 1000, 'yyyy-mm-dd HH:MM:ss'),
          updated_at:    dateformat(resp.updated_at * 1000, 'yyyy-mm-dd HH:MM:ss'),
          customer_type: resp.customer_type,
          email:         resp.person.email,
          name:          resp.person.name,
        });
      });

      dataCallback(null, data, function(){
        if(self.book.options.getAllLimit > rowsFound){
          self.getAll(since, dataCallback, doneCallback, (page + 1), rowsFound);
        }else{
          doneCallback(null, rowsFound);
        }
      });

    }
  });
};

exports.connection = connection;

More Repositories

1

react-native-parsed-text

Parse text and make them into multiple React Native Text elements
JavaScript
1,193
star
2

ReactNativeSampleApp

Example app in React Native: sort of like twitter/tumblr
JavaScript
929
star
3

rails_engines_example

Ruby
399
star
4

forklift

Forklift: Moving big databases around. A ruby ETL tool.
Ruby
136
star
5

react-native-zendesk-chat

React Native Wrapper around Zendesk Chat v2
Java
130
star
6

resque-bus

Use resque as a message bus!
127
star
7

demoji

Replace emojis as to not blow up utf8 MySQL.
Ruby
41
star
8

offshore

Remote Factories for your tests
Ruby
29
star
9

waistband

Ruby tools for Elastic Search
Ruby
29
star
10

storehouse

Distributed Rack Caching
Ruby
25
star
11

statusbot

Monitor your uptime automagically with statuspage.io
JavaScript
18
star
12

playbook

Api Groundwork: Versioning, Routing, Adapter Pattern, and API Standards
Ruby
12
star
13

sudojs

JavaScript
9
star
14

react-component-extension

Namespaced Higher Order Components
JavaScript
9
star
15

tests_doc

Rspec request specs recorded as Markdown files
Ruby
8
star
16

pie

apple, for now.
JavaScript
8
star
17

screenpress

Visual regression prevention for Capybara
Ruby
6
star
18

dashing-screenshots

Record your dashboard
Ruby
6
star
19

geojson

public geojson shapes and such
6
star
20

node-resquebus

It's ResqueBus... in Node!
JavaScript
5
star
21

smyte

For fraud checking
Ruby
4
star
22

esid

Simple command-line tool to dump document _source from ElasticSearch Lucene indexes.
Java
4
star
23

shrinker

Shrink urls inside a text and store those urls inside a backend.
Ruby
4
star
24

icloud-photo

Use a mac and this gem to upload photos it iCloud photo stream
Ruby
3
star
25

route_counter

Count on your Rails routes.
Ruby
2
star
26

dv8

Hit Rails.cache instead of your DB
Ruby
2
star
27

taskrabbit.github.com

CSS
2
star
28

sailthru-proxy

hacking Sailthru link tracking
JavaScript
2
star
29

tresque

Patterns for Resque usage at TaskRabbit
Ruby
1
star
30

state_event

State machine with logging
Ruby
1
star
31

doc_tests

Ruby
1
star
32

interview_public

Ruby
1
star