• Stars
    star
    229
  • Rank 174,666 (Top 4 %)
  • Language
    Scala
  • License
    Apache License 2.0
  • Created over 6 years ago
  • Updated 4 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Avro SerDe for Apache Spark structured APIs.

ABRiS - Avro Bridge for Spark

  • Pain free Spark/Avro integration.

  • Seamlessly integrate with Confluent platform, including Schema Registry with all available naming strategies and schema evolution.

  • Seamlessly convert your Avro records from anywhere (e.g. Kafka, Parquet, HDFS, etc) into Spark Rows.

  • Convert your Dataframes into Avro records without even specifying a schema.

  • Go back-and-forth Spark Avro (since Spark 2.4).

Coordinates for Maven POM dependency

Scala Abris
2.11 Maven Central
2.12 Maven Central
2.13 Maven Central

Supported versions

Abris Spark Scala
6.2.0 - 6.x.x 3.2.1 - 3.2.x 2.12 / 2.13
6.0.0 - 6.1.1 3.2.0 2.12 / 2.13
5.0.0 - 5.x.x 3.0.x / 3.1.x 2.12
5.0.0 - 5.x.x 2.4.x 2.11 / 2.12

From version 6.0.0, ABRiS only supports Spark 3.2.x.

ABRiS 5.0.x is still supported for older versions of Spark (see branch-5)

Older Versions

This is documentation for Abris version 6. Documentation for older versions is located in corresponding branches: branch-5, branch-4, branch-3.2.

Confluent Schema Registry Version

Abris by default uses Confluent client version 6.2.0.

Usage

ABRiS API is in it's most basic form almost identical to Spark built-in support for Avro, but it provides additional functionality. Mainly it's support of schema registry and also seamless integration with confluent Avro data format.

The API consists of two Spark SQL expressions (to_avro and from_avro) and fluent configurator (AbrisConfig)

Using the configurator you can choose from four basic config types:

  • toSimpleAvro, toConfluentAvro, fromSimpleAvro and fromConfluentAvro

And configure what you want to do, mainly how to get the avro schema.

Example of usage:

val abrisConfig = AbrisConfig
  .fromConfluentAvro
  .downloadReaderSchemaByLatestVersion
  .andTopicNameStrategy("topic123")
  .usingSchemaRegistry("http://localhost:8081")

import za.co.absa.abris.avro.functions.from_avro
val deserialized = dataFrame.select(from_avro(col("value"), abrisConfig) as 'data)

Detailed instructions for many use cases are in separated documents:

Full runnable examples can be found in the za.co.absa.abris.examples package. You can also take a look at unit tests in package za.co.absa.abris.avro.sql.

IMPORTANT: Spark dependencies have provided scope in the pom.xml, so when running the examples, please make sure that you either, instruct your IDE to include dependencies with provided scope, or change the scope directly.

Confluent Avro format

The format of Avro binary data is defined in Avro specification. Confluent format extends it and prepends the schema id before the actual record. The Confluent expressions in this library expect this format and add the id after the Avro data are generated or remove it before they are parsed.

You can find more about Confluent and Schema Registry in Confluent documentation.

Schema Registry security and other additional settings

Only Schema registry client setting that is mandatory is the url, but if you need to provide more the configurer allows you to provide a whole map.

For example, you may want to provide basic.auth.user.info and basic.auth.credentials.source required for user authentication. You can do it this way:

val registryConfig = Map(
  AbrisConfig.SCHEMA_REGISTRY_URL -> "http://localhost:8081",
  "basic.auth.credentials.source" -> "USER_INFO",
  "basic.auth.user.info" -> "srkey:srvalue"
)

val abrisConfig = AbrisConfig
  .fromConfluentAvro
  .downloadReaderSchemaByLatestVersion
  .andTopicNameStrategy("topic123")
  .usingSchemaRegistry(registryConfig) // use the map instead of just url

Other Features

Generating Avro schema from Spark data frame column

There is a helper method that allows you to generate schema automatically from spark column. Assuming you have a data frame containing column "input". You can generate schema for data in that column like this:

val schema = AvroSchemaUtils.toAvroSchema(dataFrame, "input")

Using schema manager to directly download or register schema

You can use SchemaManager directly to do operations with schema registry. The configuration is identical to Schema Registry Client. The SchemaManager is just a wrapper around the client providing helpful methods and abstractions.

val schemaRegistryClientConfig = Map( ...configuration... )
val schemaManager = SchemaManagerFactory.create(schemaRegistryClientConfig)

// Downloading schema:
val schema = schemaManager.getSchemaById(42)

// Registering schema:
val schemaString = "{...avro schema json...}"
val subject = SchemaSubject.usingTopicNameStrategy("fooTopic")
val schemaId = schemaManager.register(subject, schemaString)

// and more, check SchemaManager's methods

De-serialisation Error Handling

There are 2 ways ABRiS handles de-serialisation errors:

FailFast (Default)

Given no provided de-serialisation handler, a failure will result in a spark exception being thrown and with the error being outputted. This is the default procedure.

SpecificRecordHandler

The second option requires providing a default record that will be outputted in the event of a failure. This should be used as a flag to be deleted outside ABRiS that should mean the spark job will not stop. Beware however, a null or empty record will also result in an error so a record with a different input should be chosen.

This can be provided as such:

val abrisConfig = AbrisConfig
  .fromConfluentAvro
  .downloadReaderSchemaByLatestVersion
  .andTopicNameStrategy("topic123")
  .usingSchemaRegistry(registryConfig)
  .withSchemaConverter("custom")
  .withExceptionHandler(new SpecificRecordExceptionHandler(providedDefaultRecord))

This is only for confluent-based configuration, not for standard avro.

Data Conversions

This library also provides convenient methods to convert between Avro and Spark schemas.

If you have an Avro schema which you want to convert into a Spark SQL one - to generate your Dataframes, for instance - you can do as follows:

val avroSchema: Schema = AvroSchemaUtils.load("path_to_avro_schema")
val sqlSchema: StructType = SparkAvroConversions.toSqlType(avroSchema) 

You can also do the inverse operation by running:

val sqlSchema = new StructType(new StructField ....
val avroSchema = SparkAvroConversions.toAvroSchema(sqlSchema, avro_schema_name, avro_schema_namespace)

Custom data conversions

If you would like to use custom logic to convert from Avro to Spark, you can implement the SchemaConverter trait. The custom class is loaded in ABRiS using the service provider interface (SPI), so you need to register your class in your META-INF/services resource directory. You can then configure the custom class with its short name or the fully qualified name.

Example

Custom schema converter implementation

package za.co.absa.abris.avro.sql
import org.apache.avro.Schema
import org.apache.spark.sql.types.DataType

class CustomSchemaConverter extends SchemaConverter {
  override val shortName: String = "custom"
  override def toSqlType(avroSchema: Schema): DataType = ???
}

Provider configuration file META-INF/services/za.co.absa.abris.avro.sql.SchemaConverter:

za.co.absa.abris.avro.sql.CustomSchemaConverter

Abris configuration

val abrisConfig = AbrisConfig
  .fromConfluentAvro
  .downloadReaderSchemaByLatestVersion
  .andTopicNameStrategy("topic123")
  .usingSchemaRegistry(registryConfig)
  .withSchemaConverter("custom")

Multiple schemas in one topic

The naming strategies RecordName and TopicRecordName allow for a one topic to receive different payloads, i.e. payloads containing different schemas that do not have to be compatible, as explained here.

When you read such data from Kafka they will be stored as binary column in a dataframe, but once you convert them to Spark types they cannot be in one dataframe, because all rows in dataframe must have the same schema.

So if you have multiple incompatible types of avro data in a dataframe you must first sort them out to several dataframes. One for each schema. Then you can use Abris and convert the avro data.

How to measure code coverage

./mvn clean verify -Pcode-coverage,scala-2.12
or
./mvn clean verify -Pcode-coverage,scala-2.13

Code coverage reports will be generated on paths:

{local-path}\ABRiS\target\jacoco

Copyright 2018 ABSA Group Limited

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

More Repositories

1

spline

Data Lineage Tracking And Visualization Solution
Scala
603
star
2

spline-spark-agent

Spline agent for Apache Spark
Scala
185
star
3

k3d-action

A GitHub Action to run lightweight ephemeral Kubernetes clusters during workflow. Fundamental advantage of this action is a full customization of embedded k3s clusters. In addition, it provides a private image registry and multi-cluster support.
Shell
177
star
4

cobrix

A COBOL parser and Mainframe/EBCDIC data source for Apache Spark
Scala
136
star
5

golic

GoLic, injects license into source code files
Go
95
star
6

hyperdrive

Extensible streaming ingestion pipeline on top of Apache Spark
Scala
43
star
7

spark-hats

Nested array transformation helper extensions for Apache Spark
Scala
36
star
8

enceladus

Dynamic Conformance Engine
Scala
31
star
9

atum

A dynamic data completeness and accuracy library at enterprise scale for Apache Spark
Scala
30
star
10

spline-getting-started

Scala
24
star
11

vcxagencynode

NodeJS implementation of Hyperledger Indy LibVCX Agency
JavaScript
24
star
12

pramen

Resilient data pipeline framework running on Apache Spark
Scala
23
star
13

samlet

saml2aws k8s operator
Go
17
star
14

spark-hofs

Scala API for Apache Spark SQL high-order functions
Scala
14
star
15

py2k

A Python to Kafka Serialization Tool
Python
13
star
16

spline-ui

TypeScript
12
star
17

rn-indy-sdk

This code was starting point of https://github.com/hyperledger/indy-sdk-react-native where the work continues.
Java
11
star
18

env-binder

Binding environment variables to GO structures
Makefile
10
star
19

external-dns-infoblox-webhook

Infoblox provider based on in-tree provider for ExternalDNS
Go
10
star
20

hermes

A E2E test tool for Enceladus. Also general dataframe comparison tool
Scala
8
star
21

Jdbc2S

A JDBC streaming source for Spark
Scala
8
star
22

spark-partition-sizing

Sizing partitions in Spark
Scala
8
star
23

commons

Selection of useful reusable components
Scala
8
star
24

spark-commons

Scala
7
star
25

hyperdrive-trigger

Event based workflow manager.
TypeScript
7
star
26

spark-metadata-tool

Tool to fix _spark_metadata from Structured Streaming queries
Scala
6
star
27

atum-service

Scala
6
star
28

spline-openlineage

Open Lineage Integration for Spline
Scala
5
star
29

ghpages-to-tf-provider-registry

5
star
30

spot

Aggregate and analyze Spark history, export to elasticsearch, visualize and monitor with Kibana.
Python
5
star
31

fixed-width

Scala
5
star
32

gh-pages-skeleton

Serve as a starter of github pages for any project of AbsaOSS
Ruby
5
star
33

generate-release-notes

Efficiently automate your release note generation with 'generate-release-notes'. This tool scans your target GitHub repository's issues, sorting and organizing them into well-formatted release notes. Perfect for maintaining a streamlined and organized release process.
JavaScript
4
star
34

cert-manager-webhook-externaldns

A cert-manager webhook completing DNS01 challenge by using External DNS
Go
4
star
35

spark-data-standardization

A library for Spark that helps to stadardize any input data (DataFrame) to adhere to the provided schema.
Scala
4
star
36

absaoss.github.io

CSS
3
star
37

balta

Scala library to write Postgres DB code tests with
Scala
3
star
38

fa-db

Functional Access to Database
Scala
3
star
39

login-service

AbsaOSS Common Login gateway using JWT Public key signatures
Scala
3
star
40

rest-api-doc-generator

Scala
3
star
41

ultet

Database deployment tool
Scala
3
star
42

rialto

An open source data science framework for feature and model deployment
Python
2
star
43

manabu-ui

Front-end application for the Manabu Learning Portal
TypeScript
2
star
44

tf-provider-registry-generator

Go
2
star
45

inception

Java
2
star
46

CertMe

A NodeJS tool to generate and sign TLS certificates blazing fast with HashiCorp Vault and import them into AWS ACM
JavaScript
2
star
47

aries-oob-shortener

Shortener for URL-formatted Aries OOB messages.
Rust
2
star
48

cps-shared-ui

Angular shared components library
TypeScript
2
star
49

filename-inspector

The Test File Suffix Inspector GitHub Action scans test files in src/test directories to ensure naming conventions, reporting any files missing specified suffixes.
Python
2
star
50

pgdump-lambda

HCL
2
star
51

root-pom

1
star
52

springdoc-openapi-scala

Enhancement of springdoc-openapi for Scala
Scala
1
star
53

artifactory-registry-meta-generator

Builds a metadata for serving Docker registry with checksum-based storage
Go
1
star
54

pit-junit-cucumber-upstream

Java
1
star
55

manabu-backend

JavaScript
1
star
56

provider-jet-rancher

Crossplane terrajet based rancher provider
Makefile
1
star
57

hackathon-turbo

Team Turbo Hackathon Repo
Jupyter Notebook
1
star
58

reusable-workflows

A collection of AbsaOSS reusable workflows for GitHub action
1
star
59

driver-did-sov

DID resolver HTTP(s) binding for Sovrin DID method.
Rust
1
star
60

cap-infra-dns

CAPI Cluster controlplaneEndpoint DNS registrator
Go
1
star
61

simba-athena-login-service-support

Credentials provider for Simba Athena driver using Absa Login Service
Scala
1
star
62

version-tag-check

A GH action for validating version tag sequences and ensuring compliance with versioning standards in repositories.
1
star
63

EventGate

HCL
1
star
64

spline-python-agent

Python
1
star
65

datasets-similarity

Computes the similarity between tabular datasets.
HTML
1
star
66

cps-mdoc-viewer

Angular library utilized in constructing websites that host tutorials and documentation, providing reference materials to help users learn about a particular topic, tool, or technology. The content is supplied in the format of markdown documents. This repository comprises both the library itself and an example website constructed using it.
TypeScript
1
star
67

living-doc-generator

"living-doc-generator: A GitHub Action designed to data-mine GitHub repositories for issues containing project documentation (e.g., tagged with feature-related labels). This action automatically generates fresh documentation in markdown format, providing detailed feature overview pages and in-depth feature descriptions.
Python
1
star
68

sbt-git-hooks

SBT plugin to sync git hooks as part of the project
Scala
1
star