• Stars
    star
    173
  • Rank 212,212 (Top 5 %)
  • Language
    Java
  • License
    Apache License 2.0
  • Created over 12 years ago
  • Updated about 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

RabbitMQ River Plugin for elasticsearch (STOPPED)

Important: This project has been stopped since elasticsearch 2.0.


RabbitMQ River Plugin for Elasticsearch

The RabbitMQ River plugin allows index bulk format messages into elasticsearch. RabbitMQ River allows to automatically index a RabbitMQ queue. The format of the messages follows the bulk api format:

{ "index" : { "_index" : "twitter", "_type" : "tweet", "_id" : "1" } }
{ "tweet" : { "text" : "this is a tweet" } }
{ "delete" : { "_index" : "twitter", "_type" : "tweet", "_id" : "2" } }
{ "create" : { "_index" : "twitter", "_type" : "tweet", "_id" : "1" } }
{ "tweet" : { "text" : "another tweet" } }

Rivers are deprecated and will be removed in the future. Have a look at logstash rabbitmq input.

In order to install the plugin, run:

bin/plugin install elasticsearch/elasticsearch-river-rabbitmq/2.6.0

You need to install a version matching your Elasticsearch version:

Elasticsearch RabbitMQ River Docs
master Build from source See below
es-1.x Build from source 2.7.0-SNAPSHOT
es-1.6 2.6.0 2.6.0
es-1.5 2.5.0 2.5.0
es-1.4 2.4.1 2.4.1
es-1.3 2.3.0 2.3.0
es-1.2 2.2.0 2.2.0
es-1.1 2.0.0 2.0.0
es-1.0 2.0.0 2.0.0
es-0.90 1.6.0 1.6.0

To build a SNAPSHOT version, you need to build it with Maven:

mvn clean install
plugin --install river-rabbitmq \ 
       --url file:target/releases/elasticsearch-river-rabbitmq-X.X.X-SNAPSHOT.zip

Create river

Creating the rabbitmq river is as simple as (all configuration parameters are provided, with default values):

curl -XPUT 'localhost:9200/_river/my_river/_meta' -d '{
    "type" : "rabbitmq",
    "rabbitmq" : {
        "host" : "localhost",
        "port" : 5672,
        "user" : "guest",
        "pass" : "guest",
        "vhost" : "/",
        "queue" : "elasticsearch",
        "exchange" : "elasticsearch",
        "routing_key" : "elasticsearch",
        "exchange_declare" : true,
        "exchange_type" : "direct",
        "exchange_durable" : true,
        "queue_declare" : true,
        "queue_bind" : true,
        "queue_durable" : true,
        "queue_auto_delete" : false,
        "heartbeat" : "30m",
        "qos_prefetch_size" : 0,
        "qos_prefetch_count" : 10,
        "nack_errors" : true
    },
    "index" : {
        "bulk_size" : 100,
        "bulk_timeout" : "10ms",
        "ordered" : false,
        "replication" : "default"
    }
}'

You can disable exchange or queue declaration by setting exchange_declare or queue_declare to false (true by default). You can disable queue binding by setting queue_bind to false (true by default).

Addresses(host-port pairs) also available. it is useful to taking advantage rabbitmq HA(active/active) without any rabbitmq load balancer. (http://www.rabbitmq.com/ha.html)

    ...
    "rabbitmq" : {
        "addresses" : [
            {
                "host" : "rabbitmq-host1",
                "port" : 5672
            },
            {
                "host" : "rabbitmq-host2",
                "port" : 5672
            }
        ],
        "user" : "guest",
        "pass" : "guest",
        "vhost" : "/",
        ...
    }
    ...

The river is automatically bulking queue messages if the queue is overloaded, allowing for faster catchup with the messages streamed into the queue. The ordered flag allows to make sure that the messages will be indexed in the same order as they arrive in the query by blocking on the bulk request before picking up the next data to be indexed. It can also be used as a simple way to throttle indexing.

You can set heartbeat option to define heartbeat to RabbitMQ river even if no more messages are intended to be consumed (default to 30m).

Replication mode is set to node default value. You can change it by forcing replication to async or sync.

By default, when exception happens while executing bulk, failing messages are marked as rejected. You can ignore errors and ack messages in any case setting nack_errors to false.

Setting qos_prefetch_size will define maximum amount of content (measured in octets) that the server will deliver (0 if unlimited - default).

Setting qos_prefetch_count will define maximum number of messages that the server will deliver (0 if unlimited). Default to bulk_size*2.

Scripting

RabbitMQ river can call scripts to modify or filter messages.

Full bulk scripting

To enable bulk scripting use the following configuration options:

curl -XPUT 'localhost:9200/_river/my_river/_meta' -d '{
    "type" : "rabbitmq",
    "rabbitmq" : {
        ...
    },
    "index" : {
        ...
    },
    "bulk_script_filter" : {
        "script" : "myscript",
        "script_lang" : "native",
        "script_params" : {
            "param1" : "val1",
            "param2" : "val2"
            ...
        }
    }
}'
  • script is optional and is the name of the registered script in elasticsearch.yml. Basically, add the following property: script.native.myscript.type: sample.MyNativeScriptFactory and provide this class to elasticsearch classloader.
  • script_lang is by default native.
  • script_params are optional configuration arguments for the script.

The script will receive a variable called body which contains a String representation of RabbitMQ's message body. That body can be modified by the script, and it must return the new body as a String as well. If the returned body is null, that message will be skipped from the indexing flow.

For more information see Scripting module

Doc per doc scripting

You may also want to apply scripts document per document. It will only works for index or create operations.

To enable scripting use the following configuration options:

curl -XPUT 'localhost:9200/_river/my_river/_meta' -d '{
    "type" : "rabbitmq",
    "rabbitmq" : {
        ...
    },
    "index" : {
        ...
    },
    "script_filter" : {
        "script" : "ctx.type1.field1 += param1",
        "script_lang" : "mvel",
        "script_params" : {
          "param1" : 1
        }
    }
}'
  • script is your javascript code if you use mvel scripts.
  • script_lang is by default mvel.
  • script_params are optional configuration arguments for the script.

The script will receive a variable called ctx which contains a String representation of the current document meant to be indexed or created.

For more information see Scripting module

Tests

Integrations tests in this plugin require working RabbitMQ service and therefore disabled by default. You need to launch locally rabbitmq-server before starting integration tests.

To run test:

mvn clean test -Dtests.rabbitmq=true 

License

This software is licensed under the Apache 2 license, quoted below.

Copyright 2009-2014 Elasticsearch <http://www.elasticsearch.org>

Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.

More Repositories

1

elasticsearch

Free and Open, Distributed, RESTful Search Engine
Java
65,029
star
2

kibana

Your window into the Elastic Stack
TypeScript
19,124
star
3

logstash

Logstash - transport and process your logs, events, or other data
Java
13,615
star
4

beats

🐠 Beats - Lightweight shippers for Elasticsearch & Logstash
Go
11,967
star
5

elasticsearch-php

Official PHP client for Elasticsearch.
PHP
5,190
star
6

elasticsearch-js

Official Elasticsearch client library for Node.js
TypeScript
5,174
star
7

go-elasticsearch

The official Go client for Elasticsearch
Go
4,933
star
8

elasticsearch-py

Official Python client for Elasticsearch
Python
4,034
star
9

elasticsearch-dsl-py

High level Python client for Elasticsearch
Python
3,695
star
10

elasticsearch-definitive-guide

The Definitive Guide to Elasticsearch
HTML
3,521
star
11

elasticsearch-net

This strongly-typed, client library enables working with Elasticsearch. It is the official client maintained and supported by Elastic.
C#
3,469
star
12

curator

Curator: Tending your Elasticsearch indices
Python
3,020
star
13

elasticsearch-rails

Elasticsearch integrations for ActiveModel/Record and Ruby on Rails
Ruby
3,017
star
14

examples

Home for Elasticsearch examples available to everyone. It's a great way to get started.
Jupyter Notebook
2,587
star
15

cloud-on-k8s

Elastic Cloud on Kubernetes
Go
2,461
star
16

elasticsearch-ruby

Ruby integrations for Elasticsearch
Ruby
1,928
star
17

elasticsearch-hadoop

🐘 Elasticsearch real-time search and analytics natively integrated with Hadoop
Java
1,915
star
18

helm-charts

You know, for Kubernetes
Python
1,807
star
19

search-ui

Search UI. Libraries for the fast development of modern, engaging search experiences.
TypeScript
1,796
star
20

logstash-forwarder

An experiment to cut logs in preparation for processing elsewhere. Replaced by Filebeat: https://github.com/elastic/beats/tree/master/filebeat
Go
1,788
star
21

detection-rules

Python
1,751
star
22

ansible-elasticsearch

Ansible playbook for Elasticsearch
Ruby
1,567
star
23

otel-profiling-agent

The production-scale datacenter profiler
Go
1,231
star
24

stack-docker

Project no longer maintained.
Shell
1,189
star
25

apm-server

APM Server
Go
1,100
star
26

ecs

Elastic Common Schema
Python
920
star
27

protections-artifacts

Elastic Security detection content for Endpoint
YARA
848
star
28

ember

Elastic Malware Benchmark for Empowering Researchers
Jupyter Notebook
799
star
29

elasticsearch-docker

Official Elasticsearch Docker image
Python
790
star
30

elasticsearch-rs

Official Elasticsearch Rust Client
Rust
612
star
31

elasticsearch-cloud-aws

AWS Cloud Plugin for Elasticsearch
580
star
32

apm-agent-dotnet

Elastic APM .NET Agent
C#
540
star
33

apm-agent-nodejs

Elastic APM Node.js Agent
JavaScript
540
star
34

apm-agent-java

Elastic APM Java Agent
Java
536
star
35

eland

Python Client and Toolkit for DataFrames, Big Data, Machine Learning and ETL in Elasticsearch
Python
516
star
36

elasticsearch-mapper-attachments

Mapper Attachments Type plugin for Elasticsearch
Java
503
star
37

elasticsearch-servicewrapper

A service wrapper on top of elasticsearch
Shell
489
star
38

apm-agent-go

Official Go agent for Elastic APM
Go
390
star
39

sense

A JSON aware developer's interface to Elasticsearch. Comes with handy machinery such as syntax highlighting, autocomplete, formatting and code folding.
JavaScript
382
star
40

apm-agent-python

Official Python agent for Elastic APM
Python
381
star
41

elastic-charts

📊 Elastic Charts library
TypeScript
362
star
42

stream2es

Stream data into ES (Wikipedia, Twitter, stdin, or other ESes)
Clojure
356
star
43

timelion

Timelion was absorbed into Kibana 5. Don't use this. Time series composer for Elasticsearch and beyond.
JavaScript
347
star
44

elasticsearch-labs

Notebooks & Example Apps for Search & AI Applications with Elasticsearch
Jupyter Notebook
341
star
45

apm

Elastic Application Performance Monitoring - resources and general issue tracking for Elastic APM.
Gherkin
317
star
46

elasticsearch-net-example

A tutorial repository for Elasticsearch and NEST
305
star
47

elasticsearch-migration

This plugin will help you to check whether you can upgrade directly to the next major version of Elasticsearch, or whether you need to make changes to your data and cluster before doing so.
291
star
48

logstash-docker

Official Logstash Docker image
Python
286
star
49

elasticsearch-py-async

Backend for elasticsearch-py based on python's asyncio module.
Python
283
star
50

support-diagnostics

Support diagnostics utility for elasticsearch and logstash
Java
278
star
51

elasticsearch-java

Official Elasticsearch Java Client
Java
274
star
52

es2unix

Command-line ES
Clojure
274
star
53

elasticsearch-analysis-smartcn

Smart Chinese Analysis Plugin for Elasticsearch
268
star
54

dockerfiles

Dockerfiles for the official Elastic Stack images
Shell
253
star
55

go-sysinfo

go-sysinfo is a library for collecting system information.
Go
249
star
56

kibana-docker

Official Kibana Docker image
Python
243
star
57

elasticsearch-metrics-reporter-java

Metrics reporter, which reports to elasticsearch
Java
232
star
58

apm-agent-php

Elastic APM PHP Agent
PHP
229
star
59

docs

Ruby
229
star
60

elasticsearch-river-twitter

Twitter River Plugin for elasticsearch (STOPPED)
Java
202
star
61

elasticsearch-formal-models

Formal models of core Elasticsearch algorithms
Isabelle
200
star
62

rally-tracks

Track specifications for the Elasticsearch benchmarking tool Rally
Python
197
star
63

beats-dashboards

DEPRECATED. Moved to https://github.com/elastic/beats. Please use the new repository to add new issues.
Shell
192
star
64

elasticsearch-analysis-icu

ICU Analysis plugin for Elasticsearch
189
star
65

elasticsearch-analysis-kuromoji

Japanese (kuromoji) Analysis Plugin
168
star
66

terraform-provider-ec

Terraform provider for the Elasticsearch Service and Elastic Cloud Enterprise
Go
165
star
67

beats-docker

Official Beats Docker images
Python
165
star
68

elasticsearch-river-couchdb

CouchDB River Plugin for elasticsearch (STOPPED)
Java
163
star
69

apm-agent-ruby

Elastic APM agent for Ruby
Ruby
156
star
70

integrations

Elastic Integrations
Handlebars
155
star
71

require-in-the-middle

Module to hook into the Node.js require function
JavaScript
149
star
72

harp

Secret management by contract toolchain
Go
143
star
73

dorothy

Dorothy is a tool to test security monitoring and detection for Okta environments
Python
141
star
74

ml-cpp

Machine learning C++ code
C++
139
star
75

ecs-logging-java

Centralized logging for Java applications with the Elastic stack made easy
Java
137
star
76

SWAT

Simple Workspace Attack Tool (SWAT) is a tool for simulating malicious behavior against Google Workspace in reference to the MITRE ATT&CK framework.
Python
135
star
77

go-libaudit

go-libaudit is a library for communicating with the Linux Audit Framework.
Go
133
star
78

ansible-beats

Ansible Beats Role
Ruby
131
star
79

logstash-contrib

THIS REPOSITORY IS NO LONGER USED.
Ruby
128
star
80

elasticsearch-analysis-phonetic

Phonetic Analysis Plugin for Elasticsearch
127
star
81

azure-marketplace

Elasticsearch Azure Marketplace offering + ARM template
Shell
122
star
82

bpfcov

Source-code based coverage for eBPF programs actually running in the Linux kernel
C
115
star
83

anonymize-it

a general utility for anonymizing data
Python
114
star
84

windows-installers

Windows installers for the Elastic stack
C#
113
star
85

terraform-provider-elasticstack

Terraform provider for Elastic Stack
Go
111
star
86

makelogs

JavaScript
108
star
87

golang-crossbuild

Shell
107
star
88

elasticsearch-lang-python

Python language Plugin for elasticsearch
104
star
89

elastic-agent

Elastic Agent - single, unified way to add monitoring for logs, metrics, and other types of data to a host.
Go
102
star
90

go-freelru

GC-less, fast and generic LRU hashmap library for Go
Go
101
star
91

elasticsearch-lang-javascript

JavaScript language Plugin for elasticsearch
93
star
92

stack-docs

Elastic Stack Documentation
Java
92
star
93

elasticsearch-specification

Elasticsearch full specification
TypeScript
89
star
94

elasticsearch-perl

Official Perl low-level client for Elasticsearch.
Perl
87
star
95

next-eui-starter

Start building Kibana protoypes quickly with the Next.js EUI Starter
TypeScript
87
star
96

vue-search-ui-demo

A demo of implementing Elastic's Search UI and App Search using Vue.js
Vue
87
star
97

elasticsearch-transport-thrift

Thrift Transport for elasticsearch (STOPPED)
Java
84
star
98

ecs-dotnet

.NET integrations that use the Elastic Common Schema (ECS)
HTML
82
star
99

generator-kibana-plugin

DEPRECATED Yeoman Generator for Kibana Plugins, please use https://github.com/elastic/template-kibana-plugin/
JavaScript
79
star
100

hipio

A DNS server that parses a domain for an IPv4 Address
Haskell
76
star