• Stars
    star
    25
  • Rank 957,573 (Top 19 %)
  • Language
    Go
  • License
    Other
  • Created over 8 years ago
  • Updated about 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Publish and subscribe functionality within a single process in Go.

pubsub

import "github.com/juju/pubsub/v2"

Package pubsub provides publish and subscribe functionality within a single process.

A message as far as a hub is concerned is defined by a topic, and a data blob. All subscribers that match the published topic are notified, and have their callback function called with both the topic and the data blob.

All subscribers get their own goroutine. This way slow consumers do not slow down the act of publishing, and slow consumers do not inferfere with other consumers. Subscribers are guaranteed to get the messages that match their topic matcher in the order that the messages were published to the hub.

This package defines two types of hubs.

  • Simple hubs
  • Structured hubs

Simple hubs just pass the datablob to the subscribers untouched. Structuctured hubs will serialize the datablob into a map[string]interface{} using the marshaller that was defined to create it. The subscription handler functions for structured hubs allow the handlers to define a structure for the datablob to be marshalled into.

Hander functions for a structured hub can get all the published data available by defining a callback with the signature:

func (Topic, map[string]interface{})

Or alternatively, define a struct type, and use that type as the second argument.

func (Topic, SomeStruct, error)

The structured hub will try to serialize the published information into the struct specified. If there is an error marshalling, that error is passed to the callback as the error parameter.

Variables

var JSONMarshaller = &jsonMarshaller{}

JSONMarshaller simply wraps the json.Marshal and json.Unmarshal calls for the Marshaller interface.

type Marshaller

type Marshaller interface {
    // Marshal converts the argument into a byte streem that it can then Unmarshal.
    Marshal(interface{}) ([]byte, error)

    // Unmarshal attempts to convert the byte stream into type passed in as the
    // second arg.
    Unmarshal([]byte, interface{}) error
}

Marshaller defines the Marshal and Unmarshal methods used to serialize and deserialize the structures used in Publish and Subscription handlers of the structured hub.

type Multiplexer

type Multiplexer interface {
    TopicMatcher
    Add(matcher TopicMatcher, handler interface{}) error
}

Multiplexer allows multiple subscriptions to be made sharing a single message queue from the hub. This means that all the messages for the various subscriptions are called back in the order that the messages were published. If more than one handler is added to the Multiplexer that matches any given topic, the handlers are called back one after the other in the order that they were added.

type RegexpMatcher

type RegexpMatcher regexp.Regexp

RegexpMatcher allows standard regular expressions to be used as TopicMatcher values. RegexpMatches can be created using the short-hand function MatchRegexp function that wraps regexp.MustCompile.

func (*RegexpMatcher) Match

func (m *RegexpMatcher) Match(topic Topic) bool

Match implements TopicMatcher.

The topic matches if the regular expression matches the topic.

type SimpleHub

type SimpleHub struct {
    // contains filtered or unexported fields
}

SimpleHub provides the base functionality of dealing with subscribers, and the notification of subscribers of events.

func NewSimpleHub

func NewSimpleHub(config *SimpleHubConfig) *SimpleHub

NewSimpleHub returns a new SimpleHub instance.

A simple hub does not touch the data that is passed through to Publish. This data is passed through to each Subscriber. Note that all subscribers are notified in parallel, and that no modification should be done to the data or data races will occur.

func (*SimpleHub) Publish

func (h *SimpleHub) Publish(topic Topic, data interface{}) <-chan struct{}

Publish will notifiy all the subscribers that are interested by calling their handler function.

The data is passed through to each Subscriber untouched. Note that all subscribers are notified in parallel, and that no modification should be done to the data or data races will occur.

The channel return value is closed when all the subscribers have been notified of the event.

func (*SimpleHub) Subscribe

func (h *SimpleHub) Subscribe(matcher TopicMatcher, handler func(Topic, interface{})) Unsubscriber

Subscribe takes a topic matcher, and a handler function. If the matcher matches the published topic, the handler function is called with the published Topic and the associated data.

The handler function will be called with all maching published events until the Unsubscribe method on the Unsubscriber is called.

type SimpleHubConfig

type SimpleHubConfig struct {
    // LogModule allows for overriding the default logging module.
    // The default value is "pubsub.simple".
    LogModule string
}

SimpleHubConfig is the argument struct for NewSimpleHub.

type StructuredHub

type StructuredHub struct {
    // contains filtered or unexported fields
}

StructuredHub allows the hander functions to accept either structures or map[string]interface{}. The published structure does not need to match the structures of the subscribers. The structures are marshalled using the Marshaller defined in the StructuredHubConfig. If one is not specified, the marshalling is handled by the standard json library.

func NewStructuredHub

func NewStructuredHub(config *StructuredHubConfig) *StructuredHub

NewStructuredHub returns a new StructuredHub instance.

func (*StructuredHub) NewMultiplexer

func (h *StructuredHub) NewMultiplexer() (Unsubscriber, Multiplexer, error)

NewMultiplexer creates a new multiplexer for the hub and subscribes it. Unsubscribing the multiplexer stops calls for all handlers added. Only structured hubs support multiplexer.

func (*StructuredHub) Publish

func (h *StructuredHub) Publish(topic Topic, data interface{}) (<-chan struct{}, error)

Publish will notifiy all the subscribers that are interested by calling their handler function.

The data is serialized out using the marshaller and then back into a map[string]interface{}. If there is an error marshalling the data, Publish fails with an error. The resulting map is then updated with any annotations provided. The annotated values are only set if the specified field is missing or empty. After the annotations are set, the PostProcess function is called if one was specified. The resulting map is then passed to each of the subscribers.

Subscribers are notified in parallel, and that no modification should be done to the data or data races will occur.

The channel return value is closed when all the subscribers have been notified of the event.

func (*StructuredHub) Subscribe

func (h *StructuredHub) Subscribe(matcher TopicMatcher, handler interface{}) (Unsubscriber, error)

Subscribe takes a topic matcher, and a handler function. If the matcher matches the published topic, the handler function is called with the published Topic and the associated data.

The handler function will be called with all maching published events until the Unsubscribe method on the Unsubscriber is called.

The hander function must have the signature:

`func(Topic, map[string]interface{})`

or

`func(Topic, SomeStruct, error)`

where SomeStruct is any structure. The map[string]interface{} from the Publish call is unmarshalled into the SomeStruct structure. If there is an error unmarshalling the handler is called with a zerod structure and an error with the marshalling error.

type StructuredHubConfig

type StructuredHubConfig struct {
    // LogModule allows for overriding the default logging module.
    // The default value is "pubsub.structured".
    LogModule string

    // Marshaller defines how the structured hub will convert from structures to
    // a map[string]interface{} and back. If this is not specified, the
    // `JSONMarshaller` is used.
    Marshaller Marshaller

    // Annotations are added to each message that is published if and only if
    // the values are not already set.
    Annotations map[string]interface{}

    // PostProcess allows the caller to modify the resulting
    // map[string]interface{}. This is useful when a dynamic value, such as a
    // timestamp is added to the map, or when other type conversions are
    // necessary across all the values in the map.
    PostProcess func(map[string]interface{}) (map[string]interface{}, error)
}

StructuredHubConfig is the argument struct for NewStructuredHub.

type Topic

type Topic string

Topic represents a message that can be subscribed to.

func (Topic) Match

func (t Topic) Match(topic Topic) bool

Match implements TopicMatcher. One topic matches another if they are equal.

type TopicMatcher

type TopicMatcher interface {
    Match(Topic) bool
}

TopicMatcher defines the Match method that is used to determine if the subscriber should be notified about a particular message.

var MatchAll TopicMatcher = (*allMatcher)(nil)

MatchAll is a topic matcher that matches all topics.

func MatchRegexp

func MatchRegexp(expression string) TopicMatcher

MatchRegexp expects a valid regular expression. If the expression passed in is not valid, the function panics. The expected use of this is to be able to do something like:

hub.Subscribe(pubsub.MatchRegex("prefix.*suffix"), handler)

type Unsubscriber

type Unsubscriber interface {
    Unsubscribe()
}

Unsubscriber provides a way to stop receiving handler callbacks. Unsubscribing from a hub will also mark any pending notifications as done, and the handler will not be called for them.


Generated by godoc2md

More Repositories

1

ratelimit

Efficient token-bucket-based rate limiter package.
Go
2,679
star
2

juju

Orchestration engine that enables the deployment, integration and lifecycle management of applications at any scale, on any infrastructure (Kubernetes or otherwise).
Go
2,288
star
3

errors

Common juju errors and functions to annotate errors. Based on juju/errgo
Go
1,382
star
4

errgo

Error tracing and annotation.
Go
228
star
5

utils

General utility functions
Go
213
star
6

juju-gui

Juju-GUI is a web-based GUI for Juju <https://jujucharms.com/>.
JavaScript
182
star
7

loggo

A logging library for Go. Doesn't use the built in go log standard library, but instead offers a replacement.
Go
134
star
8

fslock

Go
129
star
9

persistent-cookiejar

cookiejar is a fork of net/http/cookiejar that allows serialisation of the stored cookies
Go
113
star
10

python-libjuju

Python library for the Juju API
Python
59
star
11

cheatsheet

A Juju Quicksheet with some common usage examples
58
star
12

charm-championship

Submissions for the Juju Charm Championship
43
star
13

charm-tools

Tools for charm authors and maintainers
Python
42
star
14

httprequest

JSON-oriented HTTP server and client helpers
Go
38
star
15

mutex

Provides a named machine level mutex shareable between processes.
Go
28
star
16

cmd

A command line implementation framework
Go
27
star
17

plugins

Basic collection of the first few plugins for Juju
Python
27
star
18

gomaasapi

Go bindings for talking to MAAS
Go
27
star
19

gnuflag

GNU-compatible flag handling with a stdlib-like API for Go
Go
25
star
20

docs

Juju documentation, edited on https://discourse.charmhub.io/, and published on https://juju.is/docs
Python
23
star
21

ansiterm

Colored writers and tabwriters.
Go
22
star
22

terraform-provider-juju

A Terraform provider for Juju
Go
21
star
23

gocharm

Write your charms in Go!
Go
21
star
24

layer-index

Index of layers for building charms
Python
21
star
25

testing

Testing gocheck suites and checkers used across juju projects
Go
19
star
26

retry

The retry package encapsulates the mechanism around retrying commands.
Go
19
star
27

charm-helpers

Python
18
star
28

amulet

Testing harness and tools for Juju Charms
Python
17
star
29

zaputil

Utility functions related to the zap logging package
Go
16
star
30

charmstore

The charm store server.
Go
15
star
31

clock

Clock definition and a testing clock.
Go
13
star
32

charm

Parsing and testing Juju charms
Go
13
star
33

xml

A fork of the Go xml package with fixed marshaling
Go
11
star
34

worker

Utilities for handling long lived Go workers
Go
10
star
35

juju-academy

Learn to use Juju
JavaScript
10
star
36

mgosession

Session pooling for the mgo package
Go
10
star
37

juju-crashdump

Script to assist in gathering logs and other debugging info from a Juju model
Python
10
star
38

charmstore-client

Client for charmstore.
Go
9
star
39

juju-talks

Presentations about Juju, pull requests welcome!
HTML
9
star
40

js-libjuju

JavaScript API client for Juju
TypeScript
9
star
41

replicaset

Create and manage mongodb replicasets.
Go
8
star
42

firestealer

A command line tool for parsing Prometheus metrics
Python
8
star
43

zip

Fork of Go's zip package with append feature.
Go
8
star
44

httpgovernor

HTTP request concurrency limiter
Go
7
star
45

packaging

An abstraction of different linux packaging systems.
Go
6
star
46

chaos-monkey

A tool to instrument chaos into a Juju environment.
Python
6
star
47

names

A package to deal with juju names (services, units, machines, etc)
Go
6
star
48

schema

coerce dynamically typed data structures into known forms.
Go
6
star
49

hello-juju-charm

The charm for the hello-juju application.
Python
5
star
50

jujusvg

Generate svgs from Juju bundles and environment.
Go
5
star
51

theblues

Python library for the juju charmstore (v4)
Python
5
star
52

1.25-upgrade

Tools to upgrade and move a 1.25 environment to a 2.2.4+ controller
Go
4
star
53

jenkins-github-lander

Web service to aid in landing approved branches automatically with a final test run through jenkins.
Python
4
star
54

juju-tosca

Juju Tosca Translator
Python
4
star
55

go-oracle-cloud

Go client interfacing with the oracle IAAS cloud API.
Go
4
star
56

mgo

The MongoDB driver for Go
Go
3
star
57

aclstore

A simple persistent store for ACLs, with HTTP API
Go
3
star
58

txjuju

A Twisted-based Juju client
Python
3
star
59

bakeryjs

Javascript implementation of the Macaroon Bakery
TypeScript
3
star
60

qthttptest

Check that JSON HTTP endpoints respond appropriately; compatible with quicktest.
Go
3
star
61

juju-restore

Restore script for Juju controllers
Go
3
star
62

version

Go
3
star
63

collections

Deque and set implementations
Go
3
star
64

bundlechanges

A Go library to generate the list of changes required to deploy a bundle
Go
3
star
65

postgrestest

Go support for testing against a live Postgres database
Go
2
star
66

concurrency-limiter

Limit the number of asynchronous concurrent tasks running
JavaScript
2
star
67

juju-gui-charm

Charm for Juju GUI.
Python
2
star
68

autopilot-log-collector

Python
2
star
69

mgopurge

A tool to repair broken mgo/txn transaction references in a Juju MongoDB instance.
Go
2
star
70

httpprof

httpprof is a fork of net/http/pprof which works correctly when not at the server's root
Go
2
star
71

hello-juju

A simple application used to demonstrate juju relations.
HTML
2
star
72

blobstore

This package provides a Mongo GridFS-backed blob storage engine.
Go
2
star
73

mgoutil

A Go package holding utilities related to the mgo package
Go
2
star
74

lru

A Go implementation of a least-recently-used cache
Go
2
star
75

description

Describes the Juju 2.x and 3.x serialization format of a model
Go
2
star
76

charmrepo

Charm repositories and charmstore client packages
Go
2
star
77

webbrowser

Go helpers for interacting with Web browsers.
Go
2
star
78

fake-juju

A juju binary using the dummy provider for integration test purposes.
Go
2
star
79

juju-qa-jenkins

Jenkins configuration for Juju CI
Python
2
star
80

termserver

LXD image builder for the jujushell service
Makefile
1
star
81

rfc

Go implementations of various standards, particularly IETF RFCs.
Go
1
star
82

juju-bundlelib

A Python library for working with Juju bundles.
Python
1
star
83

idmclient

client for USSO to macaroons bridge server
Go
1
star
84

mgomonitor

prometheus stats for gopkg.in/mgo.v2
Go
1
star
85

jknife

jknife are juju db surgery tools - this should only be used with direction of a Juju engineer
Go
1
star
86

jasp

CSS
1
star
87

jujuapidoc

Generate information on the Juju API
Go
1
star
88

juju-controller

A Juju controller charm
Python
1
star
89

jaaslibjs

JavaScript library for interacting with the JAAS services
JavaScript
1
star
90

http

Juju wrapper for the standard go HTTP library.
Go
1
star
91

naturalsort

Sort strings according to natural sort order.
Go
1
star
92

charm-developer-docs

Documenting how to write a Juju charm
Shell
1
star
93

lxc

Fork of lxd/lxc to add Juju specific tweaks
Go
1
star
94

romulus

Go
1
star
95

usso

Go
1
star
96

juju-process-docker

a plugin to allow juju to interface with docker
Go
1
star
97

charm-base-images

Shell
1
star
98

proxy

A golang type for grouping information about proxy variables.
Go
1
star
99

jaas-monitor

Monitor all your jaas models (prototype)
Shell
1
star
100

os

Host OS and series abstractions for Go.
Go
1
star