• Stars
    star
    344
  • Rank 123,066 (Top 3 %)
  • Language
    Scala
  • License
    Other
  • Created over 10 years ago
  • Updated over 7 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

REST job server for Spark. Note that this is *not* the mainline open source version. For that, go to https://github.com/spark-jobserver/spark-jobserver. This fork now serves as a semi-private repo for Ooyala.

***This is not the official open source version. For that, please go to spark-jobserver/spark-jobserver. All issues and pull requests should start there.

spark-jobserver provides a RESTful interface for submitting and managing Apache Spark jobs, jars, and job contexts. This repo contains the complete Spark job server project, including unit tests and deploy scripts.

Features

  • "Spark as a Service": Simple REST interface for all aspects of job, context management
  • Supports sub-second low-latency jobs via long-running job contexts
  • Start and stop job contexts for RDD sharing and low-latency jobs; change resources on restart
  • Kill running jobs via stop context
  • Separate jar uploading step for faster job startup
  • Asynchronous and synchronous job API. Synchronous API is great for low latency jobs!
  • Works with Standalone Spark as well as Mesos
  • Job and jar info is persisted via a pluggable DAO interface
  • Named RDDs to cache and retrieve RDDs by name, improving RDD sharing and reuse among jobs.

Version Information

Version Spark Version
0.3.1 0.9.1
0.4.0 1.0.2

Quick start / development mode

You need to have SBT installed.

From SBT shell, simply type "re-start". This uses a default configuration file. An optional argument is a path to an alternative config file. You can also specify JVM parameters after "---". Including all the options looks like this:

re-start /path/to/my.conf --- -Xmx8g

Note that re-start (SBT Revolver) forks the job server in a separate process. If you make a code change, simply type re-start again at the SBT shell prompt, it will compile your changes and restart the jobserver. It enables very fast turnaround cycles.

For example jobs see the job-server-tests/ project / folder.

When you use re-start, the log file goes to job-server/job-server-local.log. There is also an environment variable EXTRA_JAR for adding a jar to the classpath.

WordCountExample walk-through

First, to package the test jar containing the WordCountExample: sbt job-server-tests/package. Then go ahead and start the job server using the instructions above.

Let's upload the jar:

curl --data-binary @job-server-tests/target/job-server-tests-0.4.0.jar localhost:8090/jars/test
OK⏎

The above jar is uploaded as app test. Next, let's start an ad-hoc word count job, meaning that the job server will create its own SparkContext, and return a job ID for subsequent querying:

curl -d "input.string = a b c a b see" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample'
{
  "status": "STARTED",
  "result": {
    "jobId": "5453779a-f004-45fc-a11d-a39dae0f9bf4",
    "context": "b7ea0eb5-spark.jobserver.WordCountExample"
  }
}⏎

NOTE: If you want to feed in a text file config and POST using curl, you want the --data-binary option, otherwise curl will munge your line separator chars. Like:

curl --data-binary @my-job-config.json 'localhost:8090/jobs?appNam=...'

From this point, you could asynchronously query the status and results:

curl localhost:8090/jobs/5453779a-f004-45fc-a11d-a39dae0f9bf4
{
  "status": "OK",
  "result": {
    "a": 2,
    "b": 2,
    "c": 1,
    "see": 1
  }
}⏎

Note that you could append &sync=true when you POST to /jobs to get the results back in one request, but for real clusters and most jobs this may be too slow.

Another way of running this job is in a pre-created context. Start a new context:

curl -d "" 'localhost:8090/contexts/test-context?num-cpu-cores=4&mem-per-node=512m'
OK⏎

You can verify that the context has been created:

curl localhost:8090/contexts
["test-context"]⏎

Now let's run the job in the context and get the results back right away:

curl -d "input.string = a b c a b see" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample&context=test-context&sync=true'
{
  "status": "OK",
  "result": {
    "a": 2,
    "b": 2,
    "c": 1,
    "see": 1
  }
}⏎

Note the addition of context= and sync=true.

Create a Job Server Project

In your build.sbt, add this to use the job server jar:

resolvers += "Job Server Bintray" at "http://dl.bintray.com/spark-jobserver/maven"

libraryDependencies += "spark.jobserver" % "job-server-api" % "0.4.0" % "provided"

For most use cases it's better to have the dependencies be "provided" because you don't want SBT assembly to include the whole job server jar.

To create a job that can be submitted through the job server, the job must implement the SparkJob trait. Your job will look like:

object SampleJob  extends SparkJob {
    override def runJob(sc:SparkContext, jobConfig: Config): Any = ???
    override def validate(sc:SparkContext, config: Config): SparkJobValidation = ???
}
  • runJob contains the implementation of the Job. The SparkContext is managed by the JobServer and will be provided to the job through this method. This releaves the developer from the boiler-plate configuration management that comes with the creation of a Spark job and allows the Job Server to manage and re-use contexts.
  • validate allows for an initial validation of the context and any provided configuration. If the context and configuration are OK to run the job, returning spark.jobserver.SparkJobValid will let the job execute, otherwise returning spark.jobserver.SparkJobInvalid(reason) prevents the job from running and provides means to convey the reason of failure. In this case, the call immediatly returns an HTTP/1.1 400 Bad Request status code.
    validate helps you preventing running jobs that will eventually fail due to missing or wrong configuration and save both time and resources.

Let's try running our sample job with an invalid configuration:

curl -i -d "bad.input=abc" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample'

HTTP/1.1 400 Bad Request
Server: spray-can/1.2.0
Date: Tue, 10 Jun 2014 22:07:18 GMT
Content-Type: application/json; charset=UTF-8
Content-Length: 929

{
  "status": "VALIDATION FAILED",
  "result": {
    "message": "No input.string config param",
    "errorClass": "java.lang.Throwable",
    "stack": ["spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:212)", 
    "scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)", 
    "scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)", 
    "akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42)",
    "akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)", 
    "scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)", 
    "scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)", 
    "scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)", 
    "scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)"]
  }
}

Using Named RDDs

Named RDDs are a way to easily share RDDs among job. Using this facility, computed RDDs can be cached with a given name and later on retrieved. To use this feature, the SparkJob needs to mixin NamedRddSupport:

object SampleNamedRDDJob  extends SparkJob with NamedRddSupport {
    override def runJob(sc:SparkContext, jobConfig: Config): Any = ???
    override def validate(sc:SparkContext, config: Contig): SparkJobValidation = ???
}

Then in the implementation of the job, RDDs can be stored with a given name:

this.namedRdds.update("french_dictionary", frenchDictionaryRDD)

Other job running in the same context can retrieve and use this RDD later on:

val rdd = this.namedRdds.get[(String, String)]("french_dictionary").get 

(note the explicit type provided to get. This will allow to cast the retrieved RDD that otherwise is of type RDD[_])

For jobs that depends on a named RDDs it's a good practice to check for the existence of the NamedRDD in the validate method as explained earlier:

def validate(sc:SparkContext, config: Contig): SparkJobValidation = {
  ...
  val rdd = this.namedRdds.get[(Long, scala.Seq[String])]("dictionary")
  if (rdd.isDefined) SparkJobValid else SparkJobInvalid(s"Missing named RDD [dictionary]")
}

Deployment

  1. Copy config/local.sh.template to <environment>.sh and edit as appropriate.
  2. bin/server_deploy.sh <environment> -- this packages the job server along with config files and pushes it to the remotes you have configured in <environment>.sh
  3. On the remote server, start it in the deployed directory with server_start.sh and stop it with server_stop.sh

Note: to test out the deploy to a local staging dir, or package the job server for Mesos, use bin/server_package.sh <environment>.

Architecture

The job server is intended to be run as one or more independent processes, separate from the Spark cluster (though it very well may be colocated with say the Master).

At first glance, it seems many of these functions (eg job management) could be integrated into the Spark standalone master. While this is true, we believe there are many significant reasons to keep it separate:

  • We want the job server to work for Mesos and YARN as well
  • Spark and Mesos masters are organized around "applications" or contexts, but the job server supports running many discrete "jobs" inside a single context
  • We want it to support Shark functionality in the future
  • Loose coupling allows for flexible HA arrangements (multiple job servers targeting same standalone master, or possibly multiple Spark clusters per job server)

Flow diagrams are checked in in the doc/ subdirectory. .diagram files are for websequencediagrams.com... check them out, they really will help you understand the flow of messages between actors.

API

Jars

GET /jars            - lists all the jars and the last upload timestamp
POST /jars/<appName> - uploads a new jar under <appName>

Contexts

GET /contexts         - lists all current contexts
POST /contexts/<name> - creates a new context
DELETE /contexts/<name> - stops a context and all jobs running in it

Jobs

Jobs submitted to the job server must implement a SparkJob trait. It has a main runJob method which is passed a SparkContext and a typesafe Config object. Results returned by the method are made available through the REST API.

GET /jobs                - Lists the last N jobs
POST /jobs               - Starts a new job, use ?sync=true to wait for results
GET /jobs/<jobId>        - Gets the result or status of a specific job
GET /jobs/<jobId>/config - Gets the job configuration

Context configuration

A number of context-specific settings can be controlled when creating a context (POST /contexts) or running an ad-hoc job (which creates a context on the spot).

When creating a context via POST /contexts, the query params are used to override the default configuration in spark.context-settings. For example,

POST /contexts/my-new-context?num-cpu-cores=10

would override the default spark.context-settings.num-cpu-cores setting.

When starting a job, and the context= query param is not specified, then an ad-hoc context is created. Any settings specified in spark.context-settings will override the defaults in the job server config when it is started up.

Any spark configuration param can be overridden either in POST /contexts query params, or through spark .context-settings job configuration. In addition, num-cpu-cores maps to spark.cores.max, and mem-per- node maps to spark.executor.memory. Therefore the following are all equivalent:

POST /contexts/my-new-context?num-cpu-cores=10

POST /contexts/my-new-context?spark.cores.max=10

or in the job config when using POST /jobs,

spark.context-settings {
    spark.cores.max = 10
}

For the exact context configuration parameters, see JobManagerActor docs as well as application.conf.

Job Result Serialization

The result returned by the SparkJob runJob method is serialized by the job server into JSON for routes that return the result (GET /jobs with sync=true, GET /jobs/). Currently the following types can be serialized properly:

  • String, Int, Long, Double, Float, Boolean
  • Scala Map's with string key values (non-string keys may be converted to strings)
  • Scala Seq's
  • Array's
  • Anything that implements Product (Option, case classes) -- they will be serialized as lists
  • Maps and Seqs may contain nested values of any of the above

If we encounter a data type that is not supported, then the entire result will be serialized to a string.

Contribution and Development

Contributions via Github Pull Request are welcome. See the TODO for some ideas.

  • From the "master" project, please run "test" to ensure nothing is broken.
    • You may need to set SPARK_LOCAL_IP to localhost to ensure Akka port can bind successfully
  • Logging for tests goes to "job-server-test.log"
  • Run scoverage:test to check the code coverage and improve it
  • Please run scalastyle to ensure your code changes don't break the style guide
  • Do "re-start" from SBT for quick restarts of the job server process
  • Please update the g8 template if you change the SparkJob API

Publishing packages

  • Be sure you are in the master project
  • Run test to ensure all tests pass
  • Now just run publish and package will be published to bintray

Contact

For user/dev questions, we are using google group for discussions: https://groups.google.com/forum/#!forum/spark-jobserver

Please report bugs/problems to: https://github.com/ooyala/spark-jobserver/issues

License

Apache 2.0, see LICENSE.md

Copyright(c) 2014, Ooyala, Inc.

TODO

  • Add Swagger support. See the spray-swagger project.

  • Implement an interactive SQL window. See: spark-admin

  • Use SparkContext.setJobGroup with the job ID

  • Support job cancellation via cancelJobGroup

  • Stream the current job progress via a Listener

  • Add routes to return stage info for a job. Persist it via DAO so that we can always retrieve stage / performance info even for historical jobs. This would be pretty kickass.

More Repositories

1

barkeep

The friendly code review system.
Ruby
1,424
star
2

atlantis

Open Source PaaS Built on Docker
Go
391
star
3

retries

A tiny Rubygem for retrying code with randomized, exponential backoff.
Ruby
237
star
4

livecss

Making the browser dance to your CSS
JavaScript
128
star
5

metrics_storm

Easy metrics collection for Storm topologies using Coda Hale Metrics
Scala
100
star
6

hastur-server

Hastur's server
Ruby
42
star
7

android-sample-apps

Java
35
star
8

ios-sample-apps

Ooyala SDK for iOS Sample Apps
Objective-C
33
star
9

go-dogstatsd

Go
30
star
10

go-docker-registry

Docker Registry Written in Go
Go
30
star
11

chromecast-sample-receiver

Chromecast Sample Receiver
JavaScript
24
star
12

html5-skin

JavaScript
22
star
13

hastur

Hastur's Ruby client
Ruby
20
star
14

tarball-chef-cookbook

Ruby based resource provider to manage tarballs in chef.
Ruby
20
star
15

cumulus-linux-cookbook

Cookbook for managing cummulus switches
Ruby
18
star
16

scamr

A Hadoop map reduce framework for Scala.
Scala
15
star
17

native-skin

The source of the new Skin UI SDKs for both Android and IOS
JavaScript
13
star
18

pathological

Manage your Ruby project's load path
Ruby
10
star
19

api-sdks

Ooyala V2 API SDKs written in several languages.
Ruby
10
star
20

scope

Concise Ruby unit testing, in the spirit of Shoulda
Ruby
9
star
21

miyamoto

Masterless puppet with S3
Perl
9
star
22

quagga-cookbook

Ruby
8
star
23

html5-ad-plugins

JavaScript
7
star
24

remote_http_testing

A small library for making remote HTTP requests and response assertions in tests.
Ruby
7
star
25

immunity

Continuous integration and staged deployment.
Ruby
7
star
26

hadoopmon

High Availability Hadoop (hadoop 2.x.0) watcher
Go
6
star
27

skin-config

The shared JSON for styling Alice
JavaScript
5
star
28

html5-analytics-plugins

JavaScript
5
star
29

statusz

Write out deploy-time git metadata for your server.
Ruby
5
star
30

flags

Ruby
5
star
31

ruby-v2-sdk

Ruby
5
star
32

php-v2-sdk

PHP
5
star
33

Ooyala-AdobeCQ

Java
5
star
34

backlot-ingestion-library

Create and upload movies to Backlot through JavaScript
JavaScript
4
star
35

html5-video-plugins

HTML5 Video Plugins
JavaScript
4
star
36

atlantis-router

Router for Atlantis
Go
4
star
37

node-ooyala-api-client

Ooyala API wrapper for Node.js
JavaScript
4
star
38

html5-common

JavaScript
3
star
39

m3u8

Go
3
star
40

csharp-v2-sdk

C#
3
star
41

my-sequel-synchrony

A fiber-aware MySQL adapter for Sequel that works with em-synchrony
Ruby
3
star
42

quilt

Javascript stitcher for node.js
JavaScript
3
star
43

java-v2-sdk

Java
3
star
44

oppm

JavaScript
3
star
45

Ooyala-Sharepoint

C#
2
star
46

playback-lab-samples

Sample projects from the playback team.
JavaScript
2
star
47

atlantis-dashboard

Dashboard for Atlantis
JavaScript
2
star
48

ruby-quilt

Ruby
2
star
49

atlantis-aquarium

Ruby
2
star
50

atlantis-manager

Manager for Atlantis
Go
2
star
51

go-jenkins-cli

Jenkins Client in Go
Go
2
star
52

osmf-html5-emulator

2
star
53

node-asset-builder

Image/css bundler for node.js
JavaScript
2
star
54

kitchen-vagrant-disks

Example code for adding extra disks to test-kitchen
Ruby
1
star
55

_legacy_code-samples

List of code samples of various uses of Ooyala technology. From simple pages embedding a player to pages assembled with dynamic content coming from our APIs
PHP
1
star
56

hastur-c

Hastur's C client
C
1
star
57

nodejs-ooyala-v2-sdk

JavaScript
1
star
58

sequel_rails_migrations

Ruby
1
star
59

beIN_playlist-plugin

Custom Ooyala player v4 playlist plugin for beIN.
JavaScript
1
star
60

iq-sdk-roku-sample

Brightscript
1
star
61

nodule

Ruby
1
star
62

atlantis-builder

Builder for Atlantis
Go
1
star
63

mediaaccessinsights

Media Streaming Access insights
Ruby
1
star
64

barkeep_integration_tests

A repo full of test data used for running barkeep's integration tests.
1
star
65

immunity_integration_test_app

A sample app used by the Immunity System's integration tests.
Ruby
1
star
66

getbarkeep.org

The Barkeep Project Website
Ruby
1
star
67

guzzle-rotten-tomatoes

Access the Rotten Tomatoes API in PHP using Guzzle. This client serves as a simple example of how to build web service clients using Guzzle.
PHP
1
star
68

termite

Ruby
1
star
69

ecology

Ruby
1
star
70

atlantis-supervisor

Supervisor for Atlantis
Go
1
star
71

panache

Templated code style checking
Ruby
1
star
72

iq-sdk-roku

Brightscript
1
star
73

hello-atlantis

Sample and test apps to be run on Atlantis
Go
1
star
74

route53

Go
1
star
75

java-cli-uploader-sampleapp

A sample app that demonstrates how to use the Backlot API for uploading assets from the Command Line (CLI).
Java
1
star