• Stars
    star
    114
  • Rank 308,031 (Top 7 %)
  • Language
    Java
  • License
    Apache License 2.0
  • Created over 4 years ago
  • Updated about 2 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

AMQP on Pulsar protocol handler

LICENSE

AMQP on Pulsar (AoP)

AoP stands for AMQP on Pulsar. AoP broker supports AMQP0-9-1 protocol, and is backed by Pulsar.

AoP is implemented as a Pulsar ProtocolHandler with protocol name "amqp". ProtocolHandler is built as a nar file, and is loaded when Pulsar Broker starts.

Limitations

AoP is implemented based on Pulsar features. However, the methods of using Pulsar and using AMQP are different. The following are some limitations of AoP.

  • Currently, the AoP protocol handler supports AMQP0-9-1 protocol and only supports durable exchange and durable queue.
  • A Vhost is backed by a namespace which can only have one bundle. You need to create a namespace in advance for the Vhost.
  • AoP is supported on Pulsar 2.6.1 or later releases.

Get started

In this guide, you will learn how to use the Pulsar broker to serve requests from AMQP client.

Download Pulsar

Download Pulsar 2.6.1 binary package apache-pulsar-2.6.1-bin.tar.gz. and unzip it.

Download and Build AoP Plugin

You can download aop nar file from the AoP releases.

To build from code, complete the following steps:

  1. Clone the project from GitHub to your local.
git clone https://github.com/streamnative/aop.git
cd aop
  1. Build the project.
mvn clean install -DskipTests

You can find the nar file in the following directory.

./amqp-impl/target/pulsar-protocol-handler-amqp-${version}.nar

Configuration

Name Description Default
amqpTenant AMQP on Pulsar broker tenant public
amqpListeners AMQP service port amqp://127.0.0.1:5672
amqpMaxNoOfChannels The maximum number of channels which can exist concurrently on a connection 64
amqpMaxFrameSize The maximum frame size on a connection 4194304 (4MB)
amqpHeartBeat The default heartbeat timeout of AoP connection 60 (s)
amqpProxyPort The AMQP proxy service port 5682
amqpProxyEnable Whether to start proxy service false

Configure Pulsar broker to run AoP protocol handler as Plugin

As mentioned above, AoP module is loaded with Pulsar broker. You need to add configs in Pulsar's config file, such as broker.conf or standalone.conf.

  1. Protocol handler configuration

You need to add messagingProtocols(the default value is null) and protocolHandlerDirectory (the default value is "./protocols"), in Pulsar configuration files, such as broker.conf or standalone.conf. For AoP, the value for messagingProtocols is amqp; the value for protocolHandlerDirectory is the directory of AoP nar file.

The following is an example.

messagingProtocols=amqp
protocolHandlerDirectory=./protocols
  1. Set AMQP service listeners

Set AMQP service listeners. Note that the hostname value in listeners is the same as Pulsar broker's advertisedAddress.

The following is an example.

amqpListeners=amqp://127.0.0.1:5672
advertisedAddress=127.0.0.1

Run Pulsar broker

With the above configuration, you can start your Pulsar broker. For details, refer to Pulsar Get started guides.

cd apache-pulsar-2.6.1
bin/pulsar standalone

Run AMQP Client to verify

Log level configuration

In Pulsar log4j2.yaml config file, you can set AoP log level.

The following is an example.

    Logger:
      - name: io.streamnative.pulsar.handlers.amqp
        level: debug
        additivity: false
        AppenderRef:
          - ref: Console

AoP configuration

There is also other configs that can be changed and placed into Pulsar broker config file. <!what's the "other configs"?>

Contribute

Prerequisite

If you want to make contributions to AMQP on Pulsar, follow the following steps.

  1. Install system dependency.

From version 2.11.0, the AoP need JDK 17.

Dependency Installation guide
Java 17 https://openjdk.java.net/install/
Maven https://maven.apache.org/
  1. Clone code to your machine.

    [email protected]:streamnative/aop.git
  2. Build the project.

    mvn install -DskipTests

Contribution workflow

Step 1: Fork

  1. Visit https://github.com/streamnative/aop
  2. Click Fork button (top right) to establish a cloud-based fork.

Step 2: Clone fork to local machine

Create your clone.

$ cd $working_dir
$ git clone https://github.com/$user/aop

Set your clone to track upstream repository.

$ cd $working_dir/aop
$ git remote add upstream https://github.com/streamnative/aop.git

Use the git remote -v command, you find the output looks as follows:

origin    https://github.com/$user/aop.git (fetch)
origin    https://github.com/$user/aop.git (push)
upstream  https://github.com/streamnative/aop (fetch)
upstream  https://github.com/streamnative/aop (push)

Step 3: Keep your branch in sync

Get your local master up to date.

$ cd $working_dir/aop
$ git checkout master
$ git fetch upstream
$ git rebase upstream/master
$ git push origin master 

Step 4: Create your branch

Branch from master.

$ git checkout -b myfeature

Step 5: Edit the code

You can now edit the code on the myfeature branch.

Step 6: Commit

Commit your changes.

$ git add <filename>
$ git commit -m "$add a comment"

Likely you'll go back and edit-build-test in a few cycles.

The following commands might be helpful for you.

$ git add <filename> (used to add one file)
git add -A (add all changes, including new/delete/modified files)
git add -a -m "$add a comment" (add and commit modified and deleted files)
git add -u (add modified and deleted files, not include new files)
git add . (add new and modified files, not including deleted files)

Step 7: Push

When your commit is ready for review (or just to establish an offsite backup of your work), push your branch to your fork on github.com:

$ git push origin myfeature

Step 8: Create a pull request

  1. Visit your fork at https://github.com/$user/aop (replace $user obviously).
  2. Click the Compare & pull request button next to your myfeature branch.

Step 9: Get a code review

Once you open your pull request, at least two reviewers will participate in reviewing. Those reviewers will conduct a thorough code review, looking for correctness, bugs, opportunities for improvement, documentation and comments, and style.

Commit changes made in response to review comments to the same branch on your fork.

Very small PRs are easy to review. Very large PRs are very difficult to review.

How to use Pulsar standalone

  1. Clone this project from GitHub to your local.

    git clone https://github.com/streamnative/aop.git
    cd aop
  2. Build the project.

    mvn clean install -DskipTests
  3. Copy the nar package to Pulsar protocols directory.

    cp ./amqp-impl/target/pulsar-protocol-handler-amqp-${version}.nar $PULSAR_HOME/protocols/pulsar-protocol-handler-amqp-${version}.nar
  4. Modify Pulsar standalone configuration

    # conf file: $PULSAR_HOME/conf/standalone.conf
    
    # add amqp configs
    messagingProtocols=amqp
    protocolHandlerDirectory=./protocols
    
    amqpListeners=amqp://127.0.0.1:5672
    advertisedAddress=127.0.0.1
    
  5. Start Pulsar in standalone mode.

    $PULSAR_HOME/bin/pulsar standalone
    
  6. Add namespace for vhost.

    # for example, the vhost name is `vhost1`
    bin/pulsar-admin namespaces create -b 1 public/vhost1
    # set retention for the namespace
    bin/pulsar-admin namespaces set-retention -s 100M -t 2d public/vhost1
    
  7. Use RabbitMQ client test

    # add RabbitMQ client dependency in your project
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.8.0</version>
    </dependency>
    
    // Java Code
    
    // create connection
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setVirtualHost("vhost1");
    connectionFactory.setHost("127.0.0.1");
    connectionFactory.setPort(5672);
    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();
    
    String exchange = "ex";
    String queue = "qu";
    
    // exchage declare
    channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT, true, false, false, null);
    
    // queue declare and bind
    channel.queueDeclare(queue, true, false, false, null);
    channel.queueBind(queue, exchange, "");
    
    // publish some messages
    for (int i = 0; i < 100; i++) {
        channel.basicPublish(exchange, "", null, ("hello - " + i).getBytes());
    }
    
    // consume messages
    CountDownLatch countDownLatch = new CountDownLatch(100);
    channel.basicConsume(queue, true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("receive msg: " + new String(body));
            countDownLatch.countDown();
        }
    });
    countDownLatch.await();
    
    // release resource
    channel.close();
    connection.close();
    

How to use Proxy

To use proxy, complete the following steps. If you do not know some detailed steps, refer to Deploy a cluster on bare metal.

  1. Prepare ZooKeeper cluster.

  2. Initialize cluster metadata.

  3. Prepare bookkeeper cluster.

  4. Copy the pulsar-protocol-handler-amqp-${version}.nar to the $PULSAR_HOME/protocols directory.

  5. Start broker.

    broker config

    messagingProtocols=amqp
    protocolHandlerDirectory=./protocols
    brokerServicePort=6651
    amqpListeners=amqp://127.0.0.1:5672
    
    amqpProxyEnable=true
    amqpProxyPort=5682
  6. Reset the number of the namespace public/default to 1.

    $PULSAR_HOME/bin/pulsar-admin namespaces delete public/default
    $PULSAR_HOME/bin/pulsar-admin namespaces create -b 1 public/default
    $PULSAR_HOME/bin/pulsar-admin namespaces set-retention -s 100M -t 3d public/default
  7. Prepare exchange and queue for test.

    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setVirtualHost("/");
    connectionFactory.setHost("127.0.0.1");
    connectionFactory.setPort(5682);
    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();
    String ex = "ex-perf";
    String qu = "qu-perf";
    channel.exchangeDeclare(ex, BuiltinExchangeType.DIRECT, true);
    channel.queueDeclare(qu, true, false, false, null);
    channel.queueBind(qu, ex, qu);
    channel.close();
    connection.close();
    
  8. Download RabbitMQ perf tool and test.

    (https://bintray.com/rabbitmq/java-tools/download_file?file_path=perf-test%2F2.11.0%2Frabbitmq-perf-test-2.11.0-bin.tar.gz)

    $RABBITMQ_PERF_TOOL_HOME/bin/runjava com.rabbitmq.perf.PerfTest -e ex-perf -u qu-perf -r 1000 -h amqp://127.0.0.1:5682 -p

Project Maintainer

Licence

This library is licensed under the terms of the Apache License 2.0 and may include packages written by third parties which carry their own copyright notices and license terms.

About StreamNative

Founded in 2019 by the original creators of Apache Pulsar, StreamNative is one of the leading contributors to the open-source Apache Pulsar project. We have helped engineering teams worldwide make the move to Pulsar with StreamNative Cloud, a fully managed service to help teams accelerate time-to-production.

More Repositories

1

kop

Kafka-on-Pulsar - A protocol handler that brings native Kafka protocol to Apache Pulsar
Java
450
star
2

pulsar-rs

Rust Client library for Apache Pulsar
Rust
358
star
3

pulsar-flink

Elastic data processing with Apache Pulsar and Apache Flink
Java
278
star
4

function-mesh

The serverless framework purpose-built for event streaming applications.
Go
210
star
5

oxia

Oxia - Metadata store and coordination system
Go
203
star
6

mop

MQTT on Pulsar implemented using Pulsar Protocol Handler
Java
170
star
7

pulsarctl

a CLI for Apache Pulsar written in Go
Go
152
star
8

pulsar-spark

Spark Connector to read and write with Pulsar
Scala
111
star
9

tgip-cn

TGIP-CN (Thank God Its Pulsar) is a weekly live video streaming about Apache Pulsar in Chinese.
105
star
10

rop

RocketMQ-on-Pulsar - A protocol handler that brings native RocketMQ protocol to Apache Pulsar
Java
98
star
11

apache-pulsar-grafana-dashboard

Apache Pulsar Grafana Dashboard
Jinja
89
star
12

charts

StreamNative Helm Charts Repository: Apache Pulsar, Pulsar Operators, StreamNative Platform, Function Mesh
Smarty
82
star
13

awesome-pulsar

A curated list of Pulsar tools, integrations and resources.
79
star
14

pulsar-beat-output

Elastic Beats Output to Apache Pulsar
Go
55
star
15

examples

Apache Pulsar examples and demos
Java
52
star
16

terraform-provider-pulsar

Terraform provider for managing Apache Pulsar entities
Go
42
star
17

pulsar-resources-operator

Go
30
star
18

pulsar-io-lakehouse

pulsar lakehouse connector
Java
28
star
19

pulsar-io-cloud-storage

Cloud Storage Connector integrates Apache Pulsar with cloud storage.
Java
27
star
20

pulsar-io-kafka

Pulsar IO Kafka Connector
Java
24
star
21

pulsar-user-group-loc-cn

Workspace for China local user group.
22
star
22

pulsar-tracing

Tracing instrumentation for Apache Pulsar clients.
Java
21
star
23

pulsar-flume-ng-sink

An Apache Flume Sink implementation to publish data to Apache pulsar
Java
20
star
24

pulsar-hub

The canonical source of StreamNative Hub.
JavaScript
18
star
25

tgip

TGIP (TGI Pulsar) is a weekly live video streaming about Apache Pulsar and its ecosystem.
Shell
18
star
26

oxia-java

A Java client library for Oxia
Java
17
star
27

pulsar-admin-go

The Go library for pulsar admin operations, providing a unified Go API for managing pulsar resources such as tenants, namespaces and topics, etc.
Go
15
star
28

logstash-input-pulsar

Java
13
star
29

sn-platform

StreamNative Platform Downloads
12
star
30

streamnative-academy

Java
11
star
31

logstash-output-pulsar

Logstash output plugin for pulsar
Java
11
star
32

terraform-helm-charts

HCL
11
star
33

pulsar-recipes

A StreamNative library containing a collection of recipes that are implemented on top of the Pulsar client to provide higher-level functionality closer to the application domain.
Java
11
star
34

pulsar_weekly

Pulsar weekly community update
10
star
35

pulumi-controller-runtime

A prototype of a Kubernetes controller based on Pulumi.
Go
9
star
36

flink-example

Flink Pulsar Integration Related Examples
Java
9
star
37

function-mesh-worker-service

Java
7
star
38

pulsar-io-amqp-1-0

support sink/source for AMQP version 1.0.0
Java
7
star
39

pulsar-io-template

It is a project template for developing an Apache Pulsar connector
Java
7
star
40

pulsar-io-aws-lambda

Java
7
star
41

pulsar-delayed-message

Java
6
star
42

pulsar-io-pulsar-connector

Java
6
star
43

terraform-aws-cloud

Terraform modules for provisioning StreamNative Cloud on aws cloud
HCL
6
star
44

pulsar-io-activemq

ActiveMQ Connector integrates Apache Pulsar with Apache ActiveMQ.
Java
6
star
45

pulsar-io-huawei-function-graph-connector

Java
5
star
46

private-cloud

StreamNative Private Cloud is an enterprise product which brings specific controllers for Kubernetes by providing specific Custom Resource Definitions (CRDs).
Shell
5
star
47

pulsar-io-huawei-dis

Pulsar IO connector for huawei DIS https://www.huaweicloud.com/en-us/product/dis.html
Java
5
star
48

homebrew-streamnative

StreamNative Homebrew Formulae
Ruby
5
star
49

pulsar-datadog

Apache Pulsar and Datadog integration.
5
star
50

pulsar-io-sqs

Java
5
star
51

pulsar-flink-patterns

Java
5
star
52

pulsar-io-google-pubsub

Java
5
star
53

psat_exercise_code

pulsar summit asia workshop execise code
Java
5
star
54

terraform-aws-managed-cloud

This repo contains terraform scripts that can be used to provision resources needed for StreamNative Managed Cloud
HCL
4
star
55

community

StreamNative / Apache Pulsar ecosystem community
4
star
56

pulsar-tutorials

Shell
4
star
57

pulsar-io-iotdb

Data sink connector for IoTDB(https://github.com/apache/iotdb)
Java
4
star
58

pulsar-io-bigquery

BigQuery Connector integrates Apache Pulsar with Google BigQuery.
Java
3
star
59

function-mesh-website

Website for https://functionmesh.io/
JavaScript
3
star
60

pulsar-io-http-connector

Java
3
star
61

terraform-managed-cloud

StreamNative Managed Cloud Vendor Access
HCL
2
star
62

pulsar-io-huawei-lts-connector

Java
2
star
63

pulsar-io-snowflakedb

IO Connector for Snowflakedb
Java
2
star
64

sn-demos

Java
1
star
65

terraform-provider-streamnative

Terraform Provider for StreamNative
Go
1
star
66

pulsar-message-filter

1
star
67

pulsar-flink-state-migrate

pulsar flink state migrate tools
Java
1
star
68

skywalking-pulsar-demo

The demo program for Apache SkyWalking and Apache Pulsar integration
Java
1
star
69

kafka-migration

Collection of examples of Kafka apps using KoP
Java
1
star
70

snp-cn

The repo for StreamNative Platform Document Chinese Version.
1
star
71

terraform-google-cloud

Terraform modules for provisioning StreamNative Cloud on google cloud
HCL
1
star
72

pulsar-io-huawei-obs-connector

Java
1
star