• Stars
    star
    758
  • Rank 59,918 (Top 2 %)
  • Language
    Ruby
  • License
    Apache License 2.0
  • Created over 11 years ago
  • Updated about 1 month ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A framework to implement robust multiprocess servers like Unicorn

ServerEngine

ServerEngine is a framework to implement robust multiprocess servers like Unicorn.

Main features:

                  Heartbeat via pipe
                      & auto-restart
                 /                \               ---+
+------------+  /   +----------+   \  +--------+     |
| Supervisor |------|  Server  |------| Worker |     |
+------------+      +----------+\     +--------+     | Multi-process
                        /         \                  | or multi-thread
                       /            \ +--------+     |
      Dynamic reconfiguration         | Worker |     |
     and live restart support         +--------+     |
                                                  ---+

ServerEngine also provides useful options and utilities such as logging, signal handlers, changing process names shown by ps command, chuser, stacktrace and heap dump on signal.

Examples

Simplest server

What you need to implement at least is a worker module which has run and stop methods.

require 'serverengine'

module MyWorker
  def run
    until @stop
      logger.info "Awesome work!"
      sleep 1
    end
  end

  def stop
    @stop = true
  end
end

se = ServerEngine.create(nil, MyWorker, {
  daemonize: true,
  log: 'myserver.log',
  pid_path: 'myserver.pid',
})
se.run

Send TERM signal (or KILL on Windows) to kill the daemon. See also Signals section bellow for details.

Multiprocess server

Simply set worker_type: "process" or worker_type: "thread" parameter, and set number of workers to workers parameter.

se = ServerEngine.create(nil, MyWorker, {
  daemonize: true,
  log: 'myserver.log',
  pid_path: 'myserver.pid',
  worker_type: 'process',
  workers: 4,
})
se.run

See also Worker types section bellow.

Multiprocess TCP server

One of the typical implementation styles of TCP servers is that a parent process listens socket and child processes accept connections from clients.

ServerEngine allows you to optionally implement a server module to control the parent process:

# Server module controls the parent process
module MyServer
  def before_run
    @sock = TCPServer.new(config[:bind], config[:port])
  end

  attr_reader :sock
end

# Worker module controls child processes
module MyWorker
  def run
    until @stop
      # you should use Cool.io or EventMachine actually
      c = server.sock.accept
      c.write "Awesome work!"
      c.close
    end
  end

  def stop
    @stop = true
  end
end

se = ServerEngine.create(MyServer, MyWorker, {
  daemonize: true,
  log: 'myserver.log',
  pid_path: 'myserver.pid',
  worker_type: 'process',
  workers: 4,
  bind: '0.0.0.0',
  port: 9071,
})
se.run

Multiprocess server on Windows and JRuby platforms

Above worker_type: "process" depends on fork system call, which doesn't work on Windows or JRuby platform. ServerEngine provides worker_type: "spawn" for those platforms (This is still EXPERIMENTAL). However, unfortunately, you need to implement different worker module because worker_type: "spawn" is not compatible with worker_type: "process" in terms of API.

What you need to implement at least to use worker_type: "spawn" is def spawn(process_manager) method. You will call process_manager.spawn at the method, where spawn is same with Process.spawn excepting return value.

In addition, Windows does not support signals. ServerEngine provides command_sender: "pipe" for Windows (and for other platforms, if you want to use it). When using command_sender: "pipe", the child process have to handle commands sent from parent process via STDIN. On Windows, command_sender: "pipe" is default.

You can call Server#stop(stop_graceful) and Server#restart(stop_graceful) instead of sending signals.

module MyWorker
  def spawn(process_manager)
    env = {
      'SERVER_ENGINE_CONFIG' => config.to_json
    }
    script = %[
      require 'serverengine'
      require 'json'

      conf = JSON.parse(ENV['SERVER_ENGINE_CONFIG'], symbolize_names: true)
      logger = ServerEngine::DaemonLogger.new(conf[:log] || STDOUT, conf)

      @stop = false
      command_pipe = STDIN.dup
      STDIN.reopen(File::NULL)
      Thread.new do
        until @stop
          case command_pipe.gets.chomp
          when "GRACEFUL_STOP"
            @stop = true
          when "IMMEDIATE_STOP"
            @stop = true
          when "GRACEFUL_RESTART", "IMMEDIATE_RESTART"
            # do something...
          end
        end
      end

      until @stop
        logger.info 'Awesome work!'
        sleep 1
      end
    ]
    process_manager.spawn(env, "ruby", "-e", script)
  end
end

se = ServerEngine.create(nil, MyWorker, {
  worker_type: 'spawn',
  command_sender: 'pipe',
  log: 'myserver.log',
})
se.run

Logging

ServerEngine logger rotates logs by 1MB and keeps 5 generations by default.

se = ServerEngine.create(MyServer, MyWorker, {
  log: 'myserver.log',
  log_level: 'debug',
  log_rotate_age: 5,
  log_rotate_size: 1*1024*1024,
})
se.run

ServerEngine's default logger extends from Ruby's standard Logger library to:

  • support multiprocess aware log rotation
  • support reopening of log file
  • support 'trace' level, which is lower level than 'debug'

See also Configuration section bellow.

Supervisor auto restart

Server programs running 24x7 hours need to survive even if a process stalled because of unexpected memory swapping or network errors.

Supervisor process runs as the parent process of the server process and monitor it to restart automatically. You can enable supervisor process by setting supervisor: true parameter:

se = ServerEngine.create(nil, MyWorker, {
  daemonize: true,
  pid_path: 'myserver.pid',
  supervisor: true,  # enables supervisor process
})
se.run

This auto restart reature will be suppressed for workers which exits with exit code specified by unrecoverable_exit_codes. At this case, whole process will exit without error (exit code 0).

Live restart

You can restart a server process without waiting for completion of all workers using INT signal (supervisor: true and enable_detach: true parameters must be enabled). This feature allows you to minimize downtime where workers take long time to complete a task.

# 1. starts server
+------------+    +----------+    +-----------+
| Supervisor |----|  Server  |----| Worker(s) |
+------------+    +----------+    +-----------+

# 2. receives SIGINT and waits for shutdown of the server for server_detach_wait
+------------+    +----------+    +-----------+
| Supervisor |    |  Server  |----| Worker(s) |
+------------+    +----------+    +-----------+

# 3. starts new server if the server doesn't exit in server_detach_wait time
+------------+    +----------+    +-----------+
| Supervisor |\   |  Server  |----| Worker(s) |
+------------+ |  +----------+    +-----------+
               |
               |  +----------+    +-----------+
               \--|  Server  |----| Worker(s) |
                  +----------+    +-----------+

# 4. old server exits eventually
+------------+
| Supervisor |\
+------------+ |
               |
               |  +----------+    +-----------+
               \--|  Server  |----| Worker(s) |
                  +----------+    +-----------+

Note that network servers (which listen sockets) shouldn't use live restart because it causes "Address already in use" error at the server process. Instead, simply use worker_type: "process" configuration and send USR1 to restart workers instead of the server. It restarts a worker without waiting for shutdown of the other workers. This way doesn't cause downtime because server process doesn't close listening sockets and keeps accepting new clients (See also restart_server_process parameter if necessary).

Dynamic config reloading

Robust servers should not restart only to update configuration parameters.

module MyWorker
  def initialize
    reload
  end

  def reload
    @message = config[:message] || "Awesome work!"
    @sleep = config[:sleep] || 1
  end

  def run
    until @stop
      logger.info @message
      sleep @sleep
    end
  end

  def stop
    @stop = true
  end
end

se = ServerEngine.create(nil, MyWorker) do
  YAML.load_file("config.yml").merge({
    daemonize: true,
    worker_type: 'process',
  })
end
se.run

Send USR2 signal to reload configuration file.

Signals

  • TERM: graceful shutdown
  • QUIT: immediate shutdown (available only when worker_type is "process")
  • USR1: graceful restart
  • HUP: immediate restart (available only when worker_type is "process")
  • USR2: reload config file and reopen log file
  • INT: detach process for live restarting (available only when supervisor and enable_detach parameters are true. otherwise graceful shutdown)
  • CONT: dump stacktrace and memory information to /tmp/sigdump-.log file. This can be disabled by including disable_sigdump: true in the configuration.

Immediate shutdown and restart send SIGQUIT signal to worker processes which kills the processes. Graceful shutdown and restart call Worker#stop method and wait for completion of Worker#run method.

Note that signals are not supported on Windows. You have to use piped command instead of signals on Windows. See also Multiprocess server on Windows and JRuby platforms section.

Utilities

BlockingFlag

ServerEngine::BlockingFlag is recommended to stop workers because stop method is called by a different thread from the run thread.

module MyWorker
  def initialize
    @stop_flag = ServerEngine::BlockingFlag.new
  end

  def run
    until @stop_flag.wait_for_set(1.0)  # or @stop_flag.set?
      logger.info @message
    end
  end

  def stop
    @stop_flag.set!
  end
end

se = ServerEngine.create(nil, MyWorker) do
  YAML.load_file(config).merge({
    daemonize: true,
    worker_type: 'process'
  })
end
se.run

SocketManager

ServerEngine::SocketManager is a powerful library to listen on the same port across multiple worker processes dynamically.

module MyServer
  def before_run
    @socket_manager_server = ServerEngine::SocketManager::Server.open
    @socket_manager_path = @socket_manager_server.path
  end

  def after_run
    @socket_manager_server.close
  end

  attr_reader :socket_manager_path
end

module MyWorker
  def initialize
    @stop_flag = ServerEngine::BlockingFlag.new
    @socket_manager = ServerEngine::SocketManager::Client.new(server.socket_manager_path)
  end

  def run
    lsock = @socket_manager.listen_tcp('0.0.0.0', 12345)
    until @stop
      c = lsock.accept
      c.write "Awesome work!"
      c.close
    end
  end

  def stop
    @stop = true
  end
end

se = ServerEngine.create(MyServer, MyWorker, {
  daemonize: true,
  log: 'myserver.log',
  pid_path: 'myserver.pid',
  worker_type: 'process',
  workers: 4,
  bind: '0.0.0.0',
  port: 9071,
})
se.run

See also examples.

Module API

Available methods are different depending on worker_type. ServerEngine supports 3 worker types:

  • embedded: uses a thread to run worker module (default). This type doesn't support immediate shutdown or immediate restart.
  • thread: uses threads to run worker modules. This type doesn't support immediate shutdown or immediate restart.
  • process: uses processes to run worker modules. This type doesn't work on Windows or JRuby platform.
  • spawn: uses processes to run worker modules. This type works on Windows and JRuby platform but available interface of worker module is limited (See also Worker module section).

Worker module

  • interface
    • initialize is called in the parent process (or thread) in contrast to the other methods
    • before_fork is called before fork for each worker process [worker_type = "thread", "process"]
    • run is the required method for worker_type = "embedded", "thread", "process"
    • spawn(process_manager) is the required method for worker_type = "spawn". Should call process_manager.spawn([env,] command... [,options]).
    • stop is called when TERM signal is received [worker_type = "embedded", "thread", "process"]
    • reload is called when USR2 signal is received [worker_type = "embedded", "thread", "process"]
    • after_start is called after starting the worker process in the parent process (or thread) [worker_type = "thread", "process", "spawn"]
  • api
    • server server instance
    • config configuration
    • logger logger
    • worker_id serial id of workers beginning from 0

Server module

  • interface
    • initialize is called in the parent process in contrast to the other methods
    • before_run is called before starting workers
    • after_run is called before shutting down
    • after_start is called after starting the server process in the parent process (available if supervisor parameter is true)
  • hook points (call super in these methods)
    • reload_config
    • stop(stop_graceful)
    • restart(stop_graceful)
  • api
    • config configuration
    • logger logger

List of all configurations

  • Daemon
    • daemonize enables daemonize (default: false)
    • pid_path sets the path to pid file (default: don't create pid file)
    • supervisor enables supervisor if it's true (default: false)
    • disable_sigdump disables the handling of the SIGCONT signal and dumping of the thread (default: false)
    • daemon_process_name changes process name ($0) of server or supervisor process
    • chuser changes execution user
    • chgroup changes execution group
    • chumask changes umask
    • daemonize_error_exit_code exit code when daemonize, changing user or changing group fails (default: 1)
  • Supervisor: available only when supervisor parameters is true
    • server_process_name changes process name ($0) of server process
    • restart_server_process restarts server process when it receives USR1 or HUP signal. (default: false)
    • enable_detach enables INT signal (default: true)
    • exit_on_detach exits supervisor after detaching server process instead of restarting it (default: false)
    • disable_reload disables USR2 signal (default: false)
    • server_restart_wait sets wait time before restarting server after last restarting (default: 1.0) [dynamic reloadable]
    • server_detach_wait sets wait time before starting live restart (default: 10.0) [dynamic reloadable]
  • Multithread server and multiprocess server: available only when worker_type is "thread" or "process" or "spawn"
    • workers sets number of workers (default: 1) [dynamic reloadable]
    • start_worker_delay sets the delay between each worker-start when starting/restarting multiple workers at once (default: 0) [dynamic reloadable]
    • start_worker_delay_rand randomizes start_worker_delay at this ratio (default: 0.2) [dynamic reloadable]
    • restart_worker_interval sets wait time before restarting a stopped worker (default: 0) [dynamic reloadable]
  • Multiprocess server: available only when worker_type is "process"
    • worker_process_name changes process name ($0) of workers [dynamic reloadable]
    • worker_heartbeat_interval sets interval of heartbeats in seconds (default: 1.0) [dynamic reloadable]
    • worker_heartbeat_timeout sets timeout of heartbeat in seconds (default: 180) [dynamic reloadable]
    • worker_graceful_kill_interval sets the first interval of TERM signals in seconds (default: 15) [dynamic reloadable]
    • worker_graceful_kill_interval_increment sets increment of TERM signal interval in seconds (default: 10) [dynamic reloadable]
    • worker_graceful_kill_timeout sets promotion timeout from TERM to QUIT signal in seconds. -1 means no timeout (default: 600) [dynamic reloadable]
    • worker_immediate_kill_interval sets the first interval of QUIT signals in seconds (default: 10) [dynamic reloadable]
    • worker_immediate_kill_interval_increment sets increment of QUIT signal interval in seconds (default: 10) [dynamic reloadable]
    • worker_immediate_kill_timeout sets promotion timeout from QUIT to KILL signal in seconds. -1 means no timeout (default: 600) [dynamic reloadable]
    • unrecoverable_exit_codes handles worker processes, which exits with code included in this value, as unrecoverable (not to restart) (default: [])
    • stop_immediately_at_unrecoverable_exit stops server immediately if a worker exits with unrecoverable exit status (default: false)
  • Multiprocess spawn server: available only when worker_type is "spawn"
    • all parameters of multiprocess server excepting worker_process_name
    • worker_reload_signal sets the signal to notice configuration reload to a spawned process. Set nil to disable (default: nil)
  • Logger
    • log sets path to log file. Set "-" for STDOUT (default: STDERR) [dynamic reloadable]
    • log_level log level: trace, debug, info, warn, error or fatal. (default: debug) [dynamic reloadable]
    • log_rotate_age generations to keep rotated log files (default: 5), 0 will disable log rotate.
    • log_rotate_size sets the size to rotate log files (default: 1048576), when log_rotate_age is 0, the log_rotate_size value will be ignored.
    • log_stdout hooks STDOUT to log file (default: true)
    • log_stderr hooks STDERR to log file (default: true)
    • logger_class class of the logger instance (default: ServerEngine::DaemonLogger)

Author:    Sadayuki Furuhashi
Copyright: Copyright (c) 2012-2013 Sadayuki Furuhashi
License:   Apache License, Version 2.0

More Repositories

1

digdag

Workload Automation System
Java
1,305
star
2

prestogres

PostgreSQL protocol gateway for Presto distributed SQL query engine
C
292
star
3

chef-td-agent

Chef Cookbook for td-agent (Treasure Agent or Fluentd)
Ruby
127
star
4

perfectqueue

Highly available distributed queue built on RDBMS
Ruby
124
star
5

td-agent

This repository is OBSOLETE, check gh/treasure-data/omnibus-td-agent
Shell
109
star
6

treasure-boxes

Treasure Boxes - pre-built pieces of code for developing, optimizing, and analyzing your data.
Python
109
star
7

perfectsched

Highly available distributed cron built on RDBMS
Ruby
97
star
8

omnibus-td-agent

td-agent (Fluentd) Packaging Scripts
Shell
82
star
9

trino-client-ruby

Trino/Presto client library for Ruby
Ruby
70
star
10

td-js-sdk

JavaScript SDK for Treasure Data
JavaScript
70
star
11

digdag-docs

Documents for Digdag Workflow Engine
HTML
50
star
12

td

CUI Interface
Ruby
49
star
13

elastic-beanstalk-td-agent

Example of installing td-agent on AWS Elastic Beanstalk (see .ebextentions directory)
Ruby
49
star
14

td-client-python

Treasure Data API library for Python
Python
47
star
15

pandas-td

Interactive data analysis with Pandas and Treasure Data.
Python
38
star
16

angular-treasure-overlay-spinner

Add a spinner to an element when binding is truthy.
JavaScript
36
star
17

kafka-fluentd-consumer

Kafka Consumer for Fluentd
Java
32
star
18

td-logger-ruby

Treasure Data logging library for Ruby / Rails
Ruby
27
star
19

luigi-td-example

Example Repository for Building Complex Data Pipeline with Luigi +TD
Python
24
star
20

td-ios-sdk

iOS SDK for Treasure Data
Objective-C
23
star
21

td-client-ruby

Ruby Client Library for Treasure Data
Ruby
23
star
22

td-android-sdk

Android SDK for Treasure Data
Java
22
star
23

heroku-td-agent

Treasure Agent on Heroku platform (accept HTTP logging)
Ruby
20
star
24

pytd

Treasure Data Driver for Python
Jupyter Notebook
18
star
25

luigi-td

Luigi Workflow Engine integration for Treasure Data
Python
16
star
26

td-logger-java

Treasure Data Logging Library for Java
Java
12
star
27

fluent-plugin-metricsense

MetricSense - application metrics collection plugin
Ruby
12
star
28

td-client-java

Java Client Library for Treasure Data
Java
12
star
29

td-client-node

Node.js Client Library for Treasure Data
JavaScript
12
star
30

metricsense

MetricSense for Ruby - application metrics collection API
Ruby
11
star
31

td-jdbc

JDBC Driver for Treasure Data
Java
11
star
32

embulk-input-google_analytics

Embulk Input Plugin for Google Analytics
Ruby
11
star
33

td-client-go

Go Client Library for Treasure Data
Go
11
star
34

sqsrun

Generic Amazon SQS Worker Executor Service
Ruby
10
star
35

Lead-List-from-CrunchBase-

Python
10
star
36

embulk-output-td

Embulk output plugin for Treasure Data
Java
10
star
37

td-ue4-sdk

Treasure Data Unreal Engine 4 SDK
C++
10
star
38

fluent-plugin-td-monitoring

Fluentd Plugin for Treasure Agent Monitoring Service
Ruby
10
star
39

stdout-hook

Import your event logs from STDOUT to TD or Fluentd
Ruby
9
star
40

ipython-notebook-examples

iPython notebook examples for Treasure Data
9
star
41

treasuredata_fdw

PostgreSQL Foreign Data Wrapper for Treasure Data
C
8
star
42

embulk-input-zendesk

Embulk Input Plugin for Zendesk
Java
8
star
43

embulk-input-td

Treasure Data Input Plugin for Embulk
Java
8
star
44

embulk-input-mixpanel

Embulk Input Plugin for Mixpanel
Ruby
8
star
45

td-notebooks

Jupyter notebook examples for Treasure Data
Jupyter Notebook
8
star
46

lambda-local-proxy

Local API proxy that calls an AWS Lambda function
Go
7
star
47

embulk-input-marketo

Embulk Input Plugin for Marketo
Java
7
star
48

treasure-academy-cdp

Python
6
star
49

fluent-plugin-td

Fluentd plugin for Treasure Data Service
Ruby
6
star
50

embulk-output-mailchimp

Embulk output plugin for Mailchimp
Java
6
star
51

lda-board

Auto segmentation UI using LDA
Ruby
5
star
52

fluent-plugin-librato-metrics

Librato Metrics output plugin for Fluentd event collector
Ruby
5
star
53

td-logger-python

Python logging module for td-agent
Python
4
star
54

embulk-filter-add_time

Java
4
star
55

embulk-input-jira

Embulk Input Plugin for JIRA
Java
4
star
56

RTD

Simple R client for Treasure Data
HTML
4
star
57

td-unity-sdk-package

Unity SDK for TreasureData
C#
3
star
58

eslint-plugin-td

Stores td-console config so that it can be reused
JavaScript
3
star
59

embulk-parser-query_string

Embulk parser plugin for URL-encoded key value pairs
Ruby
3
star
60

react-treasure-preview-table

console preview table for user data, react component
JavaScript
3
star
61

facebook-open-academy-fluentd-2015

This is the "course page" for Facebook Open Academy 2015 (Winter) for Fluentd
3
star
62

td-cordova-sdk

Treasure Data SDK Cordova Plugin
JavaScript
3
star
63

rsched

Generic Reliable Scheduler
Ruby
3
star
64

js-examples

HTML
3
star
65

hive-udf-neologd

Hive Japanese NLP UDFs with NEologd
Java
2
star
66

angular-treasure-focus-class

Adds a class to an element on focus and removes it when focus is lost.
JavaScript
2
star
67

prestogres-odbc

Fork of PostgreSQL ODBC driver for Prestogres
C
2
star
68

dockerfiles

The collection of Dockerfile
Shell
2
star
69

td-js-consent

This repo is for Treasure Data JavaScript Consent Management UIs
JavaScript
2
star
70

td2slack

Treasure Data to Slack app
Ruby
2
star
71

td-import-java

Treasure Data Import Tool by Java
Java
2
star
72

underwrap

A very thin wrapper of Undertow and Resteasy
Java
2
star
73

TD-API-Documentation-postman-collections

1
star
74

embulk-reporter-fluentd

Java
1
star
75

juju-layer-td-agent

Shell
1
star
76

subtree-deploy

Ruby
1
star
77

heroku-td

Heroku CLI plugin for Treasure Data
Ruby
1
star
78

td-react-native-sdk

Treasure Data React Native SDK
JavaScript
1
star
79

td2email

td2email
Ruby
1
star
80

PodSpecs

Ruby
1
star
81

treasure-academy-sql

1
star
82

td-libyaml

Binary Packaging Scripts for td-libyaml (dependency of td-agent package)
1
star
83

pytd-legacy

[DEPRECATED] This repo is being deprecated. Please check out
Python
1
star