• Stars
    star
    170
  • Rank 223,357 (Top 5 %)
  • Language
    Java
  • License
    Apache License 2.0
  • Created over 4 years ago
  • Updated 2 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

MQTT on Pulsar implemented using Pulsar Protocol Handler

Codacy Badge LICENSE

MQTT on Pulsar (MoP)

MQTT-on-Pulsar (aka MoP) is developed to support MQTT protocol natively on Apache Pulsar.

Get started

Download or build MoP protocol handler

  1. Clone the MoP project from GitHub to your local.

    git clone https://github.com/streamnative/mop.git
    cd mop
  2. Build the project.

    mvn clean install -DskipTests
  3. The NAR file can be found at this location.

    ./mqtt-impl/target/pulsar-protocol-handler-mqtt-${version}.nar

Install MoP protocol handler

Configure the Pulsar broker to run the MoP protocol handler as a plugin by adding configurations to the Pulsar configuration file, such as broker.conf or standalone.conf.

  1. Set the configuration of the MoP protocol handler.

    Add the following properties and set their values in the Pulsar configuration file, such as conf/broker.conf or conf/standalone.conf.

    Property Suggested value Default value
    messagingProtocols mqtt null
    protocolHandlerDirectory Location of MoP NAR file ./protocols

    Example

    messagingProtocols=mqtt
    protocolHandlerDirectory=./protocols
    
  2. Set the MQTT server listeners.

    Example

    mqttListeners=mqtt://127.0.0.1:1883
    advertisedAddress=127.0.0.1
    

    Note

    The default hostname of advertisedAddress is InetAddress.getLocalHost().getHostName(). If you'd like to config this, please keep the same as Pulsar broker's advertisedAddress.

Load MoP protocol handler

After you install the MoP protocol handler to Pulsar broker, you can restart the Pulsar brokers to load the MoP protocol handler.

How to use Proxy

To use the proxy, follow the following steps. For detailed steps, refer to Deploy a cluster on bare metal.

  1. Prepare a ZooKeeper cluster.

  2. Initialize the cluster metadata.

  3. Prepare a BookKeeper cluster.

  4. Copy the pulsar-protocol-handler-mqtt-${version}.nar to the $PULSAR_HOME/protocols directory.

  5. Start the Pulsar broker.

    Here is an example of the Pulsar broker configuration.

    messagingProtocols=mqtt
    protocolHandlerDirectory=./protocols
    brokerServicePort=6651
    mqttListeners=mqtt://127.0.0.1:1883
    advertisedAddress=127.0.0.1
    
    mqttProxyEnabled=true
    mqttProxyPort=5682

Verify MoP protocol handler

There are many MQTT client that can be used to verify the MoP protocol handler, such as MQTTBox, MQTT Toolbox. You can choose a CLI tool or interface tool to verify the MoP protocol handler.

The following example shows how to verify the MoP protocol handler with FuseSource MqttClient.

  1. Add the dependency.

    <dependency>
        <groupId>org.fusesource.mqtt-client</groupId>
        <artifactId>mqtt-client</artifactId>
        <version>1.16</version>
    </dependency>
  2. Publish messages and consume messages.

    MQTT mqtt = new MQTT();
    mqtt.setHost("127.0.0.1", 1883);
    BlockingConnection connection = mqtt.blockingConnection();
    connection.connect();
    Topic[] topics = { new Topic("persistent://public/default/my-topic", QoS.AT_LEAST_ONCE) };
    connection.subscribe(topics);
    
    // publish message
    connection.publish("persistent://public/default/my-topic", "Hello MOP!".getBytes(), QoS.AT_LEAST_ONCE, false);
    
    // receive message
    Message received = connection.receive();

Security

Enabling Authentication

MoP currently supports basic and token authentication methods. The token authentication method works with any of the token based Pulsar authentication providers such as the built-in JWT provider and external token authentication providers like biscuit-pulsar.

To use authentication for MQTT connections your Pulsar cluster must already have authentication enabled with your chosen authentication provider(s) configured.

You can then enable MQTT authentication with the following configuration properties:

mqttAuthenticationEnabled=true
mqttAuthenticationMethods=token

mqttAuthenticationMethods can be set to a comma delimited list if you wish to enable multiple authentication providers. MoP will attempt each in order when authenticating client connections.

With authentication enabled MoP will not allow anonymous connections currently.

Authenticating client connections

Basic Authentication

Set the MQTT username and password client settings.

Token Authentication

Set the MQTT password to the token body, currently username will be disregarded but MUST be set to some value as this is required by the MQTT specification.

Enabling Authorization

MoP currently supports authorization. When authorization enabled, MoP will check the authenticated role if it has the ability to pub/sub topics, eg: When sending messages, you need to have the produce permission of the topic. When subscribing to a topic, you need to have the consume permission of the topic. You can reference here to grant permissions.

You can then enable MQTT authorization with the following configuration properties:

mqttAuthorizationEnabled=true

If MoP proxy enabled, following configuration needs to be configured and brokerClientAuthenticationParameters should configure lookup permission at least:

brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationBasic
brokerClientAuthenticationParameters={"userId":"superUser","password":"superPass"}

Enabling TLS

MoP currently supports TLS transport encryption.

Generate crt and key file :

openssl genrsa 2048 > server.key
chmod 400 server.key
openssl req -new -x509 -nodes -sha256 -days 365 -key server.key -out server.crt

TLS with broker

  1. Config mqtt broker to load tls config.

    mqttListeners=mqtt+ssl://127.0.0.1:8883
    mqttTlsCertificateFilePath=/xxx/server.crt
    mqttTlsKeyFilePath=/xxx/server.key
    
  2. Config client to load tls config.

    MQTT mqtt = new MQTT();
    // default tls port
    mqtt.setHost(URI.create("ssl://127.0.0.1:8883")); 
    File crtFile = new File("server.crt");
    Certificate certificate = CertificateFactory.getInstance("X.509").generateCertificate(new FileInputStream(crtFile));
    KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
    keyStore.load(null, null);
    keyStore.setCertificateEntry("server", certificate);
    TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
    trustManagerFactory.init(keyStore);
    SSLContext sslContext = SSLContext.getInstance("TLS");
    sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
    mqtt.setSslContext(sslContext);
    BlockingConnection connection = mqtt.blockingConnection();
    connection.connect();

TLS with proxy

  1. Config mqtt broker to load tls config.

    mqttProxyEnable=true
    mqttProxyTlsEnabled=true
    mqttTlsCertificateFilePath=/xxx/server.crt
    mqttTlsKeyFilePath=/xxx/server.key
    
  2. Config client to load tls config.

    MQTT mqtt = new MQTT();
    // default proxy tls port
    mqtt.setHost(URI.create("ssl://127.0.0.1:5683")); 
    File crtFile = new File("server.crt");
    Certificate certificate = CertificateFactory.getInstance("X.509").generateCertificate(new FileInputStream(crtFile));
    KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
    keyStore.load(null, null);
    keyStore.setCertificateEntry("server", certificate);
    TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
    trustManagerFactory.init(keyStore);
    SSLContext sslContext = SSLContext.getInstance("TLS");
    sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
    mqtt.setSslContext(sslContext);
    BlockingConnection connection = mqtt.blockingConnection();
    connection.connect();

TLS PSK with broker

Please reference here to learn more about TLS-PSK.

  1. Config mqtt broker to load tls psk config.

    mqttTlsPskEnabled=true
    mqttListeners=mqtt+ssl+psk://127.0.0.1:8884
    // any string can be specified
    mqttTlsPskIdentityHint=alpha
    // identity is semicolon list of string with identity:secret format
    mqttTlsPskIdentity=mqtt:mqtt123
    

    Optional configs

    Config key Comment
    mqttTlsPskIdentityFile When you want identities in a single file with many pairs, you can config this. Identities will load from both tlsPskIdentity and tlsPskIdentityFile
    mqttTlsProtocols TLS PSK protocols, default are [ TLSv1, TLSv1.1, TLSv1.2 ]
    mqttTlsCiphers TLS PSK ciphers, default are [ TLS_ECDHE_PSK_WITH_CHACHA20_POLY1305_SHA256, TLS_ECDHE_PSK_WITH_AES_128_CBC_SHA, TLS_ECDHE_PSK_WITH_AES_256_CBC_SHA, TLS_PSK_WITH_AES_128_CBC_SHA, TLS_PSK_WITH_AES_256_CBC_SHA ]
  2. As current known mqtt Java client does not support TLS-PSK, it's better to verify this by mosquitto cli

    # Default with tlsv1.2
    mosquitto_pub --psk-identity mqtt --psk 6d717474313233 -p 8884 -t "/a/b/c" -m "hello mqtt"
    
    # Test with tlsv1.1
    mosquitto_pub --psk-identity mqtt --psk 6d717474313233 -p 8884 -t "/a/b/c" -m "hello mqtt" --tls-version tlsv1.1
    
    # Test with tlsv1
    mosquitto_pub --psk-identity mqtt --psk 6d717474313233 -p 8884 -t "/a/b/c" -m "hello mqtt" --tls-version tlsv1
    

TLS PSK with proxy

  1. Config mqtt proxy to load tls psk config.

    mqttProxyEnable=true
    mqttProxyTlsPskEnabled=true
    // default tls psk port
    mqttProxyTlsPskPort=5684
    // any string can be specified
    mqttTlsPskIdentityHint=alpha
    // identity is semicolon list of string with identity:secret format
    mqttTlsPskIdentity=mqtt:mqtt123
    
  2. Test with mosquitto cli

    mosquitto_pub --psk-identity mqtt --psk 6d717474313233 -p 5684 -t "/a/b/c" -m "hello mqtt"
    
  3. Add PSK identities dynamically.

    You can add psk identities dynamically by REST API in proxy mode.

    curl -X POST http://pulsar-broker-webservice-address:port/mop/add_psk_identity -d "identity=mqtt2:mqtt222;mqtt3:mqtt333"
    

Topic Names & Filters

For Apache Pulsar, The topic name consists of 4 parts:

<domain>://<tenant>/<namespace>/<local-name>

And / is not allowed in the local topic name. But for the MQTT topic name can have multiple levels such as:

/a/b/c/d/e/f

MoP mapping the MQTT topic name to Pulsar topic name as follows:

  1. If the MQTT topic name does not start with the topic domain, MoP treats the URL encoded MQTT topic name as the Pulsar local topic name, and the default tenant and default namespace will be used to map the Pulsar topic name.
  2. If the MQTT topic name starts with the topic domain, MoP will treat the first level topic name as the tenant and the second level topic name as the namespace and the remaining topic name levels will be covert as the local topic name with URL encoded.

Examples:

MQTT topic name Apache Pulsar topic name
/a/b/c persistent://public/default/%2Fa%2Fb%2Fc
a persistent://public/default/a
persistent://my-tenant/my-ns/a/b/c persistent://my-tenant/my-ns/a%2Fb%2Fc
persistent://my-tenant/my-ns/a persistent://my-tenant/my-ns/a
non-persistent://my-tenant/my-ns/a non-persistent://my-tenant/my-ns/a
non-persistent://my-tenant/my-ns/a/b/c non-persistent://my-tenant/my-ns/a%2Fb%2Fc

So if you want to consume messages by Pulsar Client from the topic /a/b/c, the topic name for the Pulsar consumer should be persistent://public/default/%2Fa%2Fb%2Fc. If you want to consume messages from a Pulsar topic by the MQTT client, use the Pulsar topic name as the MQTT topic name directly.

MoP topic supports single-level wildcard + and multi-level wildcard #. The topic name filter also follows the above topic name mapping rules.

  1. If the topic filter starts with the topic domain, MoP only filters the topic under the namespace that the topic filter provided.
  2. If the topic filter does not start with the topic domain, MoP only filters the topic name under the default namespace.

Examples:

MQTT topic name Topic filter Is match
/a/b/c /a/+/c Yes
/a/b/c /a/# Yes
/a/b/c a/# No
/a/b/c persistent://my-tenant/my-namespace//a/# No
/a/b/c persistent://public/default//a/# Yes
persistent://public/default/a/b/c persistent://public/default/a/# Yes
persistent://public/default/a/b/c persistent://public/default/a/+/c Yes
persistent://public/default/a/b/c persistent://public/default//a/+/c No
persistent://public/default/a/b/c persistent://my-tenant/my-namespace/a/+/c No

Notice:

The default tenant and the default namespace for the MoP are configurable, by default, the default tenant is public and the default namespace is default.

Metrics

MoP will uniformly output its own metrics to Prometheus.

Name Type Description
mop_active_client_count Gauge The active client count
mop_total_client_count Counter The total client count
mop_maximum_client_count Counter The maximum client count
mop_sub_count Gauge The subscription count
mop_send_count Counter The total send msg count
mop_send_bytes Counter The total send msg in bytes
mop_received_count Counter The total received msg count
mop_received_bytes Counter The total received msg in bytes

MoP can also expose metrics through the http interface. Add below configs first and then restart pulsar broker.

additionalServlets=mqtt-servlet
additionalServletDirectory=[protocolHandlerDir]

Then you can obtain mop information in json format through /mop/stats:

curl http://pulsar-broker-webservice-address:port/mop/stats
{"cluster":"test","subscriptions":{"subs":["/a/b/c"],"count":1},"clients":{"total":1,"maximum":1,"active":0,"active_clients":[]},"namespace":"default","messages":{"received_bytes":57351,"received_count":10,"send_count":20,"send_bytes":60235},"version":"2.9.0-SNAPSHOT","tenant":"public","uptime":"46 seconds"}

MoP available configurations

Please refer here

Project maintainers

More Repositories

1

kop

Kafka-on-Pulsar - A protocol handler that brings native Kafka protocol to Apache Pulsar
Java
450
star
2

pulsar-rs

Rust Client library for Apache Pulsar
Rust
358
star
3

pulsar-flink

Elastic data processing with Apache Pulsar and Apache Flink
Java
278
star
4

function-mesh

The serverless framework purpose-built for event streaming applications.
Go
210
star
5

oxia

Oxia - Metadata store and coordination system
Go
203
star
6

pulsarctl

a CLI for Apache Pulsar written in Go
Go
152
star
7

aop

AMQP on Pulsar protocol handler
Java
114
star
8

pulsar-spark

Spark Connector to read and write with Pulsar
Scala
111
star
9

tgip-cn

TGIP-CN (Thank God Its Pulsar) is a weekly live video streaming about Apache Pulsar in Chinese.
105
star
10

rop

RocketMQ-on-Pulsar - A protocol handler that brings native RocketMQ protocol to Apache Pulsar
Java
98
star
11

apache-pulsar-grafana-dashboard

Apache Pulsar Grafana Dashboard
Jinja
89
star
12

charts

StreamNative Helm Charts Repository: Apache Pulsar, Pulsar Operators, StreamNative Platform, Function Mesh
Smarty
82
star
13

awesome-pulsar

A curated list of Pulsar tools, integrations and resources.
79
star
14

pulsar-beat-output

Elastic Beats Output to Apache Pulsar
Go
55
star
15

examples

Apache Pulsar examples and demos
Java
52
star
16

terraform-provider-pulsar

Terraform provider for managing Apache Pulsar entities
Go
42
star
17

pulsar-resources-operator

Go
30
star
18

pulsar-io-lakehouse

pulsar lakehouse connector
Java
28
star
19

pulsar-io-cloud-storage

Cloud Storage Connector integrates Apache Pulsar with cloud storage.
Java
27
star
20

pulsar-io-kafka

Pulsar IO Kafka Connector
Java
24
star
21

pulsar-user-group-loc-cn

Workspace for China local user group.
22
star
22

pulsar-tracing

Tracing instrumentation for Apache Pulsar clients.
Java
21
star
23

pulsar-flume-ng-sink

An Apache Flume Sink implementation to publish data to Apache pulsar
Java
20
star
24

pulsar-hub

The canonical source of StreamNative Hub.
JavaScript
18
star
25

tgip

TGIP (TGI Pulsar) is a weekly live video streaming about Apache Pulsar and its ecosystem.
Shell
18
star
26

oxia-java

A Java client library for Oxia
Java
17
star
27

pulsar-admin-go

The Go library for pulsar admin operations, providing a unified Go API for managing pulsar resources such as tenants, namespaces and topics, etc.
Go
15
star
28

logstash-input-pulsar

Java
13
star
29

sn-platform

StreamNative Platform Downloads
12
star
30

streamnative-academy

Java
11
star
31

logstash-output-pulsar

Logstash output plugin for pulsar
Java
11
star
32

terraform-helm-charts

HCL
11
star
33

pulsar-recipes

A StreamNative library containing a collection of recipes that are implemented on top of the Pulsar client to provide higher-level functionality closer to the application domain.
Java
11
star
34

pulsar_weekly

Pulsar weekly community update
10
star
35

pulumi-controller-runtime

A prototype of a Kubernetes controller based on Pulumi.
Go
9
star
36

flink-example

Flink Pulsar Integration Related Examples
Java
9
star
37

function-mesh-worker-service

Java
7
star
38

pulsar-io-amqp-1-0

support sink/source for AMQP version 1.0.0
Java
7
star
39

pulsar-io-template

It is a project template for developing an Apache Pulsar connector
Java
7
star
40

pulsar-io-aws-lambda

Java
7
star
41

pulsar-delayed-message

Java
6
star
42

pulsar-io-pulsar-connector

Java
6
star
43

terraform-aws-cloud

Terraform modules for provisioning StreamNative Cloud on aws cloud
HCL
6
star
44

pulsar-io-activemq

ActiveMQ Connector integrates Apache Pulsar with Apache ActiveMQ.
Java
6
star
45

pulsar-io-huawei-function-graph-connector

Java
5
star
46

private-cloud

StreamNative Private Cloud is an enterprise product which brings specific controllers for Kubernetes by providing specific Custom Resource Definitions (CRDs).
Shell
5
star
47

pulsar-io-huawei-dis

Pulsar IO connector for huawei DIS https://www.huaweicloud.com/en-us/product/dis.html
Java
5
star
48

homebrew-streamnative

StreamNative Homebrew Formulae
Ruby
5
star
49

pulsar-datadog

Apache Pulsar and Datadog integration.
5
star
50

pulsar-io-sqs

Java
5
star
51

pulsar-flink-patterns

Java
5
star
52

pulsar-io-google-pubsub

Java
5
star
53

psat_exercise_code

pulsar summit asia workshop execise code
Java
5
star
54

terraform-aws-managed-cloud

This repo contains terraform scripts that can be used to provision resources needed for StreamNative Managed Cloud
HCL
4
star
55

community

StreamNative / Apache Pulsar ecosystem community
4
star
56

pulsar-tutorials

Shell
4
star
57

pulsar-io-iotdb

Data sink connector for IoTDB(https://github.com/apache/iotdb)
Java
4
star
58

pulsar-io-bigquery

BigQuery Connector integrates Apache Pulsar with Google BigQuery.
Java
3
star
59

function-mesh-website

Website for https://functionmesh.io/
JavaScript
3
star
60

pulsar-io-http-connector

Java
3
star
61

terraform-managed-cloud

StreamNative Managed Cloud Vendor Access
HCL
2
star
62

pulsar-io-huawei-lts-connector

Java
2
star
63

pulsar-io-snowflakedb

IO Connector for Snowflakedb
Java
2
star
64

sn-demos

Java
1
star
65

terraform-provider-streamnative

Terraform Provider for StreamNative
Go
1
star
66

pulsar-message-filter

1
star
67

pulsar-flink-state-migrate

pulsar flink state migrate tools
Java
1
star
68

skywalking-pulsar-demo

The demo program for Apache SkyWalking and Apache Pulsar integration
Java
1
star
69

kafka-migration

Collection of examples of Kafka apps using KoP
Java
1
star
70

snp-cn

The repo for StreamNative Platform Document Chinese Version.
1
star
71

terraform-google-cloud

Terraform modules for provisioning StreamNative Cloud on google cloud
HCL
1
star
72

pulsar-io-huawei-obs-connector

Java
1
star