• Stars
    star
    111
  • Rank 314,510 (Top 7 %)
  • Language
    Scala
  • License
    Apache License 2.0
  • Created over 5 years ago
  • Updated 8 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Spark Connector to read and write with Pulsar

pulsar-spark

Version License FOSSA Status

Unified data processing with Apache Pulsar and Apache Spark.

Prerequisites

  • Java 8 or later
  • Spark 3.2.2 or later
  • Pulsar 2.10.2 or later

Preparations

Link

Client library

For Scala/Java applications using SBT/Maven project definitions, link your application with the following artifact:

    groupId = io.streamnative.connectors
    artifactId = pulsar-spark-connector_{{SCALA_BINARY_VERSION}}
    version = {{PULSAR_SPARK_VERSION}}

Deploy

Client library

As with any Spark applications, spark-submit is used to launch your application.
pulsar-spark-connector_{{SCALA_BINARY_VERSION}} and its dependencies can be directly added to spark-submit using --packages.

Example

$ ./bin/spark-submit 
  --packages io.streamnative.connectors:pulsar-spark-connector_{{SCALA_BINARY_VERSION}}:{{PULSAR_SPARK_VERSION}}
  ...

CLI

For experimenting on spark-shell (or pyspark for Python), you can also use --packages to add pulsar-spark-connector_{{SCALA_BINARY_VERSION}} and its dependencies directly.

Example

$ ./bin/spark-shell 
  --packages io.streamnative.connectors:pulsar-spark-connector_{{SCALA_BINARY_VERSION}}:{{PULSAR_SPARK_VERSION}}
  ...

When locating an artifact or library, --packages option checks the following repositories in order:

  1. Local maven repository

  2. Maven central repository

  3. Other repositories specified by --repositories

The format for the coordinates should be groupId:artifactId:version.

For more information about submitting applications with external dependencies, see Application Submission Guide.

Usage

Read data from Pulsar

Create a Pulsar source for streaming queries

The following examples are in Scala.

// Subscribe to 1 topic
val df = spark
  .readStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("topic", "topic1")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics
val df = spark
  .readStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("topics", "topic1,topic2")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a topic pattern
val df = spark
  .readStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("topicsPattern", "topic.*")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Tip

For more information on how to use other language bindings for Spark Structured Streaming, see Structured Streaming Programming Guide.

Create a Pulsar source for batch queries

If you have a use case that is better suited to batch processing, you can create a Dataset/DataFrame for a defined range of offsets.

The following examples are in Scala.

// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
  .read
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("topic", "topic1")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics, specifying explicit Pulsar offsets
import org.apache.spark.sql.pulsar.JsonUtils._
val startingOffsets = topicOffsets(Map("topic1" -> messageId1, "topic2" -> messageId2))
val endingOffsets = topicOffsets(...)
val df = spark
  .read
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("endingOffsets", endingOffsets)
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
  .read
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("topicsPattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Write data to Pulsar

The DataFrame written to Pulsar can have arbitrary schema, since each record in DataFrame is transformed as one message sent to Pulsar, fields of DataFrame are divided into two groups: __key and __eventTime fields are encoded as metadata of Pulsar message; other fields are grouped and encoded using AVRO and put in value():

producer.newMessage().key(__key).value(avro_encoded_fields).eventTime(__eventTime)

Create a Pulsar sink for streaming queries

The following examples are in Scala.

// Write key-value data from a DataFrame to a specific Pulsar topic specified in an option
val ds = df
  .selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("topic", "topic1")
  .start()

// Write key-value data from a DataFrame to Pulsar using a topic specified in the data
val ds = df
  .selectExpr("__topic", "CAST(__key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .start()

Write the output of batch queries to Pulsar

The following examples are in Scala.

// Write key-value data from a DataFrame to a specific Pulsar topic specified in an option
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("topic", "topic1")
  .save()

// Write key-value data from a DataFrame to Pulsar using a topic specified in the data
df.selectExpr("__topic", "CAST(__key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .save()

Limitations

Currently, we provide at-least-once semantic. Consequently, when writing either streaming queries or batch queries to Pulsar, some records may be duplicated. A possible solution to remove duplicates when reading the written data could be to introduce a primary (unique) key that can be used to perform de-duplication when reading.

Configurations

OptionValueRequiredDefaultQueryTypeDescription
`service.url` The Pulsar `serviceUrl` String Yes None Streaming and Batch The Pulsar `serviceUrl` configuration for Pulsar service. Example: "pulsar://localhost:6650".
`admin.url` A service HTTP URL of your Pulsar cluster No None Streaming and Batch The Pulsar `serviceHttpUrl` configuration. Only needed when `maxBytesPerTrigger` is specified
`maxBytesPerTrigger` A long value in unit of number of bytes No None Streaming and Batch A soft limit of the maximum number of bytes we want to process per microbatch. If this is specified, `admin.url` also needs to be specified.
`predefinedSubscription` A Subscription name string No None Streaming and Batch The predefined subscription name used by the connector to track spark application progress.
`subscriptionPrefix` A subscription prefix string No None Streaming and Batch A prefix used by the connector to generate a random subscription to track spark application progress.
`topic` A topic name string Yes None Streaming and Batch The topic to be consumed. Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source.
`topics` A comma-separated list of topics Yes None Streaming and Batch The topic list to be consumed. Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source.
`topicsPattern` A Java regex string Yes None Streaming and Batch The pattern used to subscribe to topic(s). Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source.
`poolTimeoutMs` A number string in unit of milliseconds No "120000" Streaming and Batch The timeout for reading messages from Pulsar. Example: `6000`.
`waitingForNonExistedTopic` The following are valid values: true or false
No "false" Streaming and Batch Whether the connector should wait until the desired topics are created. By default, the connector will not wait for the topic
`startingOffsets` The following are valid values:
  • "earliest"(streaming and batch queries)

  • "latest" (streaming query)

  • A JSON string

    Example

    """ {"topic-1":[8,11,16,101,24,1,32,1],"topic-5":[8,15,16,105,24,5,32,5]} """

No
  • "earliest"(batch query)

  • "latest"(streaming query)

Streaming and batch queries

startingOffsets option controls where a reader reads data from.

  • "earliest": lacks a valid offset, the reader reads all the data in the partition, starting from the very beginning.

  • "latest": lacks a valid offset, the reader reads from the newest records written after the reader starts running.

  • A JSON string: specifies a starting offset for each Topic.
    You can use org.apache.spark.sql.pulsar.JsonUtils.topicOffsets(Map[String, MessageId]) to convert a message offset to a JSON string.

Note:

  • For batch query, "latest" is not allowed, either implicitly specified or use MessageId.latest ([8,-1,-1,-1,-1,-1,-1,-1,-1,127,16,-1,-1,-1,-1,-1,-1,-1,-1,127]) in JSON.

  • For streaming query, "latest" only applies when a new query is started, and the resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at "earliest".

`endingOffsets` The following are valid values:
  • "latest" (batch query)

  • A JSON string

Example

{"topic-1":[8,12,16,102,24,2,32,2],"topic-5":[8,16,16,106,24,6,32,6]}

No "latest" Batch query

endingOffsets option controls where a reader stops reading data.

  • "latest": the reader stops reading data at the latest record.

  • A JSON string: specifies an ending offset for each topic.

    Note:

    MessageId.earliest ([8,-1,-1,-1,-1,-1,-1,-1,-1,-1,1,16,-1,-1,-1,-1,-1,-1,-1,-1,-1,1]) is not allowed.

`failOnDataLoss` The following are valid values: true or false No true Streaming query

failOnDataLoss option controls whether to fail a query when data is lost (for example, topics are deleted, or messages are deleted because of retention policy).

This may cause a false alarm. You can set it to false when it doesn't work as you expected.

A batch query always fails if it fails to read any data from the provided offsets due to data loss.

`allowDifferentTopicSchemas` Boolean value No `false` Streaming query If multiple topics with different schemas are read, using this parameter automatic schema-based topic value deserialization can be turned off. In that way, topics with different schemas can be read in the same pipeline - which is then responsible for deserializing the raw values based on some schema. Since only the raw values are returned when this is `true`, Pulsar topic schema(s) are not taken into account during operation.
`pulsar.client.*` Pulsar Client configurations No None Streaming and Batch Client configurations. Example: "pulsar.client.authPluginClassName".

Please check Pulsar Client Configuration for more details

`pulsar.admin.*` Pulsar Admin configurations No None Streaming and Batch Admin configurations. Example: "pulsar.admin.tlsAllowInsecureConnection".

Please check Pulsar Admin Configuration for more details

`pulsar.reader.*` Pulsar Reader configurations No None Streaming and Batch Reader configurations. Example: "pulsar.reader.subscriptionName".

Please check Pulsar Reader Configuration for more details

`pulsar.producer.*` Pulsar Producer configurations No None Streaming and Batch Producer configurations. Example: "pulsar.producer.blockIfQueueFull".

Please check Pulsar Producer Configuration for more details

Authentication

Should the Pulsar cluster require authentication, credentials can be set in the following way.

The following examples are in Scala.

// Secure connection with authentication, using the same credentials on the
// Pulsar client and admin interface (if not given explicitly, the client configuration
// is used for admin as well).
val df = spark
  .readStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("pulsar.client.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
  .option("pulsar.client.authParams","token:<valid client JWT token>")
  .option("topicsPattern", "sensitiveTopic")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Secure connection with client TLS enabled.
// Note that the certificate file has to be present at the specified
// path on every machine of the cluster!
val df = spark
  .readStream
  .format("pulsar")
  .option("service.url", "pulsar+ssl://localhost:6651")
  .option("pulsar.admin.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
  .option("pulsar.admin.authParams","token:<valid admin JWT token>")
  .option("pulsar.client.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
  .option("pulsar.client.authParams","token:<valid client JWT token>")
  .option("pulsar.client.tlsTrustCertsFilePath","/path/to/tls/cert/cert.pem")
  .option("pulsar.client.tlsAllowInsecureConnection","false")
  .option("pulsar.client.tlsHostnameVerificationenable","true")
  .option("topicsPattern", "sensitiveTopic")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Schema of Pulsar source

  • For topics without schema or with primitive schema in Pulsar, messages' payload is loaded to a value column with the corresponding type with Pulsar schema.
  • For topics with Avro or JSON schema, their field names and field types are kept in the result rows.
  • If the topicsPattern matches for topics which have different schemas, then setting allowDifferentTopicSchemas to true will allow the connector to read this content in a raw form. In this case it is the responsibility of the pipeline to apply the schema on this content, which is loaded to the value column.

Besides, each row in the source has the following metadata fields as well.

ColumnType
`__key` Binary
`__topic` String
`__messageId` Binary
`__publishTime` Timestamp
`__eventTime` Timestamp
`__messageProperties` Map < String, String >

Example

The topic of AVRO schema s in Pulsar is as below:

  case class Foo(i: Int, f: Float, bar: Bar)
  case class Bar(b: Boolean, s: String)
  val s = Schema.AVRO(Foo.getClass)

has the following schema as a DataFrame/DataSet in Spark:

root
 |-- i: integer (nullable = false)
 |-- f: float (nullable = false)
 |-- bar: struct (nullable = true)
 |    |-- b: boolean (nullable = false)
 |    |-- s: string (nullable = true)
 |-- __key: binary (nullable = true)
 |-- __topic: string (nullable = true)
 |-- __messageId: binary (nullable = true)
 |-- __publishTime: timestamp (nullable = true)
 |-- __messageProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

For Pulsar topic with Schema.DOUBLE, it's schema as a DataFrame is:

root
|-- value: double (nullable = false)
|-- __key: binary (nullable = true)
|-- __topic: string (nullable = true)
|-- __messageId: binary (nullable = true)
|-- __publishTime: timestamp (nullable = true)
|-- __eventTime: timestamp (nullable = true)
|-- __messageProperties: map (nullable = true)
|    |-- key: string
|    |-- value: string (valueContainsNull = true)

Build Spark Pulsar Connector

If you want to build a Spark-Pulsar connector reading data from Pulsar and writing results to Pulsar, follow the steps below.

  1. Checkout the source code.
$ git clone https://github.com/streamnative/pulsar-spark.git
$ cd pulsar-spark
  1. Install Docker.

Pulsar-spark connector is using Testcontainers for integration tests. In order to run the integration tests, make sure you have installed Docker.

  1. Set a Scala version.

Change scala.version and scala.binary.version in pom.xml.

Note

Scala version should be consistent with the Scala version of Spark you use.

  1. Build the project.
$ mvn clean install -DskipTests

If you get the following error during compilation, try running Maven with Java 8:

[ERROR] [Error] : Source option 6 is no longer supported. Use 7 or later.
[ERROR] [Error] : Target option 6 is no longer supported. Use 7 or later.
  1. Run the tests.
$ mvn clean install

Note: by configuring scalatest-maven-plugin in the usual ways, individual tests can be executed, if that is needed:

mvn -Dsuites=org.apache.spark.sql.pulsar.CachedPulsarClientSuite clean install

This might be handy if test execution is slower, or you get a java.io.IOException: Too many open files exception during full suite run.

Once the installation is finished, there is a fat jar generated under both local maven repo and target directory.

License

FOSSA Status

More Repositories

1

kop

Kafka-on-Pulsar - A protocol handler that brings native Kafka protocol to Apache Pulsar
Java
450
star
2

pulsar-rs

Rust Client library for Apache Pulsar
Rust
358
star
3

pulsar-flink

Elastic data processing with Apache Pulsar and Apache Flink
Java
278
star
4

function-mesh

The serverless framework purpose-built for event streaming applications.
Go
210
star
5

oxia

Oxia - Metadata store and coordination system
Go
203
star
6

mop

MQTT on Pulsar implemented using Pulsar Protocol Handler
Java
170
star
7

pulsarctl

a CLI for Apache Pulsar written in Go
Go
152
star
8

aop

AMQP on Pulsar protocol handler
Java
114
star
9

tgip-cn

TGIP-CN (Thank God Its Pulsar) is a weekly live video streaming about Apache Pulsar in Chinese.
105
star
10

rop

RocketMQ-on-Pulsar - A protocol handler that brings native RocketMQ protocol to Apache Pulsar
Java
98
star
11

apache-pulsar-grafana-dashboard

Apache Pulsar Grafana Dashboard
Jinja
89
star
12

charts

StreamNative Helm Charts Repository: Apache Pulsar, Pulsar Operators, StreamNative Platform, Function Mesh
Smarty
82
star
13

awesome-pulsar

A curated list of Pulsar tools, integrations and resources.
79
star
14

pulsar-beat-output

Elastic Beats Output to Apache Pulsar
Go
55
star
15

examples

Apache Pulsar examples and demos
Java
52
star
16

terraform-provider-pulsar

Terraform provider for managing Apache Pulsar entities
Go
42
star
17

pulsar-resources-operator

Go
30
star
18

pulsar-io-lakehouse

pulsar lakehouse connector
Java
28
star
19

pulsar-io-cloud-storage

Cloud Storage Connector integrates Apache Pulsar with cloud storage.
Java
27
star
20

pulsar-io-kafka

Pulsar IO Kafka Connector
Java
24
star
21

pulsar-user-group-loc-cn

Workspace for China local user group.
22
star
22

pulsar-tracing

Tracing instrumentation for Apache Pulsar clients.
Java
21
star
23

pulsar-flume-ng-sink

An Apache Flume Sink implementation to publish data to Apache pulsar
Java
20
star
24

pulsar-hub

The canonical source of StreamNative Hub.
JavaScript
18
star
25

tgip

TGIP (TGI Pulsar) is a weekly live video streaming about Apache Pulsar and its ecosystem.
Shell
18
star
26

oxia-java

A Java client library for Oxia
Java
17
star
27

pulsar-admin-go

The Go library for pulsar admin operations, providing a unified Go API for managing pulsar resources such as tenants, namespaces and topics, etc.
Go
15
star
28

logstash-input-pulsar

Java
13
star
29

sn-platform

StreamNative Platform Downloads
12
star
30

streamnative-academy

Java
11
star
31

logstash-output-pulsar

Logstash output plugin for pulsar
Java
11
star
32

terraform-helm-charts

HCL
11
star
33

pulsar-recipes

A StreamNative library containing a collection of recipes that are implemented on top of the Pulsar client to provide higher-level functionality closer to the application domain.
Java
11
star
34

pulsar_weekly

Pulsar weekly community update
10
star
35

pulumi-controller-runtime

A prototype of a Kubernetes controller based on Pulumi.
Go
9
star
36

flink-example

Flink Pulsar Integration Related Examples
Java
9
star
37

function-mesh-worker-service

Java
7
star
38

pulsar-io-amqp-1-0

support sink/source for AMQP version 1.0.0
Java
7
star
39

pulsar-io-template

It is a project template for developing an Apache Pulsar connector
Java
7
star
40

pulsar-io-aws-lambda

Java
7
star
41

pulsar-delayed-message

Java
6
star
42

pulsar-io-pulsar-connector

Java
6
star
43

terraform-aws-cloud

Terraform modules for provisioning StreamNative Cloud on aws cloud
HCL
6
star
44

pulsar-io-activemq

ActiveMQ Connector integrates Apache Pulsar with Apache ActiveMQ.
Java
6
star
45

pulsar-io-huawei-function-graph-connector

Java
5
star
46

private-cloud

StreamNative Private Cloud is an enterprise product which brings specific controllers for Kubernetes by providing specific Custom Resource Definitions (CRDs).
Shell
5
star
47

pulsar-io-huawei-dis

Pulsar IO connector for huawei DIS https://www.huaweicloud.com/en-us/product/dis.html
Java
5
star
48

homebrew-streamnative

StreamNative Homebrew Formulae
Ruby
5
star
49

pulsar-datadog

Apache Pulsar and Datadog integration.
5
star
50

pulsar-io-sqs

Java
5
star
51

pulsar-flink-patterns

Java
5
star
52

pulsar-io-google-pubsub

Java
5
star
53

psat_exercise_code

pulsar summit asia workshop execise code
Java
5
star
54

terraform-aws-managed-cloud

This repo contains terraform scripts that can be used to provision resources needed for StreamNative Managed Cloud
HCL
4
star
55

community

StreamNative / Apache Pulsar ecosystem community
4
star
56

pulsar-tutorials

Shell
4
star
57

pulsar-io-iotdb

Data sink connector for IoTDB(https://github.com/apache/iotdb)
Java
4
star
58

pulsar-io-bigquery

BigQuery Connector integrates Apache Pulsar with Google BigQuery.
Java
3
star
59

function-mesh-website

Website for https://functionmesh.io/
JavaScript
3
star
60

pulsar-io-http-connector

Java
3
star
61

terraform-managed-cloud

StreamNative Managed Cloud Vendor Access
HCL
2
star
62

pulsar-io-huawei-lts-connector

Java
2
star
63

pulsar-io-snowflakedb

IO Connector for Snowflakedb
Java
2
star
64

sn-demos

Java
1
star
65

terraform-provider-streamnative

Terraform Provider for StreamNative
Go
1
star
66

pulsar-message-filter

1
star
67

pulsar-flink-state-migrate

pulsar flink state migrate tools
Java
1
star
68

skywalking-pulsar-demo

The demo program for Apache SkyWalking and Apache Pulsar integration
Java
1
star
69

kafka-migration

Collection of examples of Kafka apps using KoP
Java
1
star
70

snp-cn

The repo for StreamNative Platform Document Chinese Version.
1
star
71

terraform-google-cloud

Terraform modules for provisioning StreamNative Cloud on google cloud
HCL
1
star
72

pulsar-io-huawei-obs-connector

Java
1
star