Rabbit Hole, a RabbitMQ HTTP API Client for Go
This library is a RabbitMQ HTTP API client for the Go language.
Supported Go Versions
Rabbit Hole targets two latests stable Go versions available at the time of the library release. Older versions may work but this is not guaranteed.
Supported RabbitMQ Versions
- All currently supported RabbitMQ versions
- RabbitMQ
3.9.x
will be the minimum supported release series starting with Rabbit Hole 3.0 - Almost all API operations work against RabbitMQ
3.8.x
nodes. Some metrics and stats may be missing. - RabbitMQ
3.8.x
and older versions have reached end of general support
All versions require RabbitMQ Management UI plugin to be installed and enabled.
Build Status
Project Maturity
Rabbit Hole is a mature library (first released in late 2013) designed after a couple of other RabbitMQ HTTP API clients with stable APIs. Breaking API changes are not out of the question but not without a reasonable version bump.
It is largely feature complete and decently documented.
Change Log
If upgrading from an earlier release, please consult with the change log.
Installation
go get github.com/michaelklishin/rabbit-hole/v2
# or, for v1.x:
# go get github.com/michaelklishin/rabbit-hole
Documentation
API Reference
API reference is available on godoc.org.
Continue reading for a list of example snippets.
Overview
To import the package:
import (
"github.com/michaelklishin/rabbit-hole/v2"
)
All HTTP API operations are accessible via rabbithole.Client
, which
should be instantiated with rabbithole.NewClient
:
// URI, username, password
rmqc, _ = NewClient("http://127.0.0.1:15672", "guest", "guest")
TLS (HTTPS) can be enabled by adding an HTTP transport to the parameters
of rabbithole.NewTLSClient
:
transport := &http.Transport{TLSClientConfig: tlsConfig}
rmqc, _ := NewTLSClient("https://127.0.0.1:15672", "guest", "guest", transport)
RabbitMQ HTTP API has to be configured to use TLS.
Getting Overview
resp, err := rmqc.Overview()
Node and Cluster Status
xs, err := rmqc.ListNodes()
// => []NodeInfo, err
node, err := rmqc.GetNode("rabbit@mercurio")
// => NodeInfo, err
Operations on Connections
xs, err := rmqc.ListConnections()
// => []ConnectionInfo, err
conn, err := rmqc.GetConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
// => ConnectionInfo, err
// Forcefully close connection
_, err := rmqc.CloseConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
// => *http.Response, err
Operations on Channels
xs, err := rmqc.ListChannels()
// => []ChannelInfo, err
ch, err := rmqc.GetChannel("127.0.0.1:50545 -> 127.0.0.1:5672 (1)")
// => ChannelInfo, err
Operations on Vhosts
xs, err := rmqc.ListVhosts()
// => []VhostInfo, err
// information about individual vhost
x, err := rmqc.GetVhost("/")
// => VhostInfo, err
// creates or updates individual vhost
resp, err := rmqc.PutVhost("/", VhostSettings{Tracing: false})
// => *http.Response, err
// deletes individual vhost
resp, err := rmqc.DeleteVhost("/")
// => *http.Response, err
Managing Users
xs, err := rmqc.ListUsers()
// => []UserInfo, err
// information about individual user
x, err := rmqc.GetUser("my.user")
// => UserInfo, err
// creates or updates individual user
resp, err := rmqc.PutUser("my.user", UserSettings{Password: "s3krE7", Tags: "management,policymaker"})
// => *http.Response, err
// creates or updates individual user with no password
resp, err := rmqc.PutUserWithoutPassword("my.user", UserSettings{Tags: "management,policymaker"})
// => *http.Response, err
// deletes individual user
resp, err := rmqc.DeleteUser("my.user")
// => *http.Response, err
// creates or updates individual user with a Base64-encoded SHA256 password hash
hash := Base64EncodedSaltedPasswordHashSHA256("password-s3krE7")
resp, err := rmqc.PutUser("my.user", UserSettings{
PasswordHash: hash,
HashingAlgorithm: HashingAlgorithmSHA256,
Tags: "management,policymaker"})
// => *http.Response, err
Managing Permissions
xs, err := rmqc.ListPermissions()
// => []PermissionInfo, err
// permissions of individual user
x, err := rmqc.ListPermissionsOf("my.user")
// => []PermissionInfo, err
// permissions of individual user in vhost
x, err := rmqc.GetPermissionsIn("/", "my.user")
// => PermissionInfo, err
// updates permissions of user in vhost
resp, err := rmqc.UpdatePermissionsIn("/", "my.user", Permissions{Configure: ".*", Write: ".*", Read: ".*"})
// => *http.Response, err
// revokes permissions in vhost
resp, err := rmqc.ClearPermissionsIn("/", "my.user")
// => *http.Response, err
Operations on Exchanges
xs, err := rmqc.ListExchanges()
// => []ExchangeInfo, err
// list exchanges in a vhost
xs, err := rmqc.ListExchangesIn("/")
// => []ExchangeInfo, err
// information about individual exchange
x, err := rmqc.GetExchange("/", "amq.fanout")
// => ExchangeInfo, err
// declares an exchange
resp, err := rmqc.DeclareExchange("/", "an.exchange", ExchangeSettings{Type: "fanout", Durable: false})
// => *http.Response, err
// deletes individual exchange
resp, err := rmqc.DeleteExchange("/", "an.exchange")
// => *http.Response, err
Operations on Queues
qs, err := rmqc.ListQueues()
// => []QueueInfo, err
// list queues in a vhost
qs, err := rmqc.ListQueuesIn("/")
// => []QueueInfo, err
// information about individual queue
q, err := rmqc.GetQueue("/", "a.queue")
// => QueueInfo, err
// declares a queue
resp, err := rmqc.DeclareQueue("/", "a.queue", QueueSettings{Durable: false})
// => *http.Response, err
// deletes individual queue
resp, err := rmqc.DeleteQueue("/", "a.queue")
// => *http.Response, err
// purges all messages in queue
resp, err := rmqc.PurgeQueue("/", "a.queue")
// => *http.Response, err
// synchronises all messages in queue with the rest of mirrors in the cluster
resp, err := rmqc.SyncQueue("/", "a.queue")
// => *http.Response, err
// cancels queue synchronisation process
resp, err := rmqc.CancelSyncQueue("/", "a.queue")
// => *http.Response, err
Operations on Bindings
bs, err := rmqc.ListBindings()
// => []BindingInfo, err
// list bindings in a vhost
bs, err := rmqc.ListBindingsIn("/")
// => []BindingInfo, err
// list bindings of a queue
bs, err := rmqc.ListQueueBindings("/", "a.queue")
// => []BindingInfo, err
// list all bindings having the exchange as source
bs1, err := rmqc.ListExchangeBindingsWithSource("/", "an.exchange")
// => []BindingInfo, err
// list all bindings having the exchange as destinattion
bs2, err := rmqc.ListExchangeBindingsWithDestination("/", "an.exchange")
// => []BindingInfo, err
// declare a binding
resp, err := rmqc.DeclareBinding("/", BindingInfo{
Source: "an.exchange",
Destination: "a.queue",
DestinationType: "queue",
RoutingKey: "#",
})
// => *http.Response, err
// deletes individual binding
resp, err := rmqc.DeleteBinding("/", BindingInfo{
Source: "an.exchange",
Destination: "a.queue",
DestinationType: "queue",
RoutingKey: "#",
PropertiesKey: "%23",
})
// => *http.Response, err
Operations on Feature Flags
xs, err := rmqc.ListFeatureFlags()
// => []FeatureFlag, err
// enable a feature flag
_, err := rmqc.EnableFeatureFlag("drop_unroutable_metric")
// => *http.Response, err
Operations on Shovels
qs, err := rmqc.ListShovels()
// => []ShovelInfo, err
// list shovels in a vhost
qs, err := rmqc.ListShovelsIn("/")
// => []ShovelInfo, err
// information about an individual shovel
q, err := rmqc.GetShovel("/", "a.shovel")
// => ShovelInfo, err
// declares a shovel
shovelDetails := rabbithole.ShovelDefinition{
SourceURI: URISet{"amqp://sourceURI"},
SourceProtocol: "amqp091",
SourceQueue: "mySourceQueue",
DestinationURI: "amqp://destinationURI",
DestinationProtocol: "amqp10",
DestinationAddress: "myDestQueue",
DestinationAddForwardHeaders: true,
AckMode: "on-confirm",
SrcDeleteAfter: "never",
}
resp, err := rmqc.DeclareShovel("/", "a.shovel", shovelDetails)
// => *http.Response, err
// deletes an individual shovel
resp, err := rmqc.DeleteShovel("/", "a.shovel")
// => *http.Response, err
Operations on Runtime (vhost-scoped) Parameters
// list all runtime parameters
params, err := rmqc.ListRuntimeParameters()
// => []RuntimeParameter, error
// list all runtime parameters for a component
params, err := rmqc.ListRuntimeParametersFor("federation-upstream")
// => []RuntimeParameter, error
// list runtime parameters in a vhost
params, err := rmqc.ListRuntimeParametersIn("federation-upstream", "/")
// => []RuntimeParameter, error
// information about a runtime parameter
p, err := rmqc.GetRuntimeParameter("federation-upstream", "/", "name")
// => *RuntimeParameter, error
// declare or update a runtime parameter
resp, err := rmqc.PutRuntimeParameter("federation-upstream", "/", "name", FederationDefinition{
Uri: URISet{"amqp://server-name"},
})
// => *http.Response, error
// remove a runtime parameter
resp, err := rmqc.DeleteRuntimeParameter("federation-upstream", "/", "name")
// => *http.Response, error
Operations on Federation Upstreams
// list all federation upstreams
ups, err := rmqc.ListFederationUpstreams()
// => []FederationUpstream, error
// list federation upstreams in a vhost
ups, err := rmqc.ListFederationUpstreamsIn("/")
// => []FederationUpstream, error
// information about a federated upstream
up, err := rmqc.GetFederationUpstream("/", "name")
// => *FederationUpstream, error
// declare or update a federation upstream
resp, err := rmqc.PutFederationUpstream("/", "name", FederationDefinition{
Uri: URISet{"amqp://server-name"},
})
// => *http.Response, error
// delete an upstream
resp, err := rmqc.DeleteFederationUpstream("/", "name")
// => *http.Response, error
Managing Global Parameters
// list all global parameters
params, err := rmqc.ListGlobalParameters()
// => []GlobalRuntimeParameter, error
// get a global parameter
p, err := rmqc.GetGlobalParameter("name")
// => *GlobalRuntimeParameter, error
// declare or update a global parameter
resp, err := rmqc.PutGlobalParameter("name", map[string]interface{
endpoints: "amqp://server-name",
})
// => *http.Response, error
// delete a global parameter
resp, err := rmqc.DeleteGlobalParameter("name")
// => *http.Response, error
Managing Policies
mypolicy := Policy{
Vhost: "/",
Pattern: "^.*$",
ApplyTo: "queues",
Name: "mypolicy",
Priority: 0,
Definition: PolicyDefinition{
// map[string] interface{}
"max-length-bytes": 1048576,
},
}
resp, err := rmqc.PutPolicy("/", "mypolicy", mypolicy)
// => *http.Response, error
xs, err := rmqc.ListPolicies()
// => []Policy, error
x, err := rmqc.GetPolicy("/", "mypolicy")
// => *Policy, error
resp, err := rmqc.DeletePolicy("/", "mypolicy")
// => *http.Response, error
Operations on cluster name
// Get cluster name
cn, err := rmqc.GetClusterName()
// => ClusterName, err
// Rename cluster
resp, err := rmqc.SetClusterName(ClusterName{Name: "rabbitmq@rabbit-hole"})
// => *http.Response, err
HTTPS Connections
var tlsConfig *tls.Config
...
transport := &http.Transport{TLSClientConfig: tlsConfig}
rmqc, err := NewTLSClient("https://127.0.0.1:15672", "guest", "guest", transport)
Changing Transport Layer
var transport http.RoundTripper
...
rmqc.SetTransport(transport)
Contributing
See CONTRIBUTING.md
License & Copyright
2-clause BSD license.
(c) Michael S. Klishin and contributors, 2013-2023.