• This repository has been archived on 12/Feb/2022
  • Stars
    star
    148
  • Rank 249,983 (Top 5 %)
  • Language
    Java
  • License
    Apache License 2.0
  • Created almost 6 years ago
  • Updated almost 3 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

[Deprecated] Load data from Kafka to any data warehouse. BQ sink is being supported in Firehose now. https://github.com/odpf/firehose

Beast is deprecated. Big Query sink is supported in Firehose now.

Beast is not supported. We recommend using Firehose for sinking Kafka data to BigQuery.

Architecture

  • Consumer: Consumes messages from kafka in batches, and pushes these batches to Read & Commit queues. These queues are blocking queues, i.e, no more messages will be consumed if the queue is full. (This is configurable based on poll timeout)
  • BigQuery Worker: Polls messages from the read queue, and pushes them to BigQuery. If the push operation was successful, BQ worker sends an acknowledgement to the Committer.
  • Committer: Committer receives the acknowledgements of successful push to BigQuery from BQ Workers. All these acknowledgements are stored in a set within the committer. Committer polls the commit queue for message batches. If that batch is present in the set, i.e., the batch has been successfully pushed to BQ, then it commits the max offset for that batch, back to Kafka, and pops it from the commit queue & set.



  • Dead Letters: Beast provides a plugable GCS (Google Cloud Storage) component to store invalid out of bounds messages that are rejected by BigQuery. Primarily all messages that are partitioned on a timestamp field and those that contain out of ranges timestamps (year old data or 6 months in future) on the partition key are considered as invalid. Without an handler for these messages, Beast stops processing. The default behaviour is to stop processing on these out of range data. GCS component can be turned on by supplying an environment field as below.
    ENABLE_GCS_ERROR_SINK=true
    GCS_BUCKET=<google cloud store bucket name>
    GCS_PATH_PREFIX=<prefix path under the bucket>
    GCS_WRITER_PROJECT_NAME=<google project having bucket>
    
    The handler partitions the invalid messages on GCS based on the message arrival date in the format <dt=yyyy-MM-dd>. The location of invalid messages on GCS would ideally be <GCS_WRITER_PROJECT_NAME>/<GCS_BUCKET>/<GCS_PATH_PREFIX>/<dt=yyyy-MM-dd>/<topicName>/<random-uuid> where
    • <topicName> - is the topic that has the invalid messages
    • <random-uuid> - name of the file

Building & Running

Prerequisite

  • A kafka cluster which has messages pushed in proto format, which beast can consume
  • should have BigQuery project which has streaming permission
  • create a table for the message proto
  • create configuration with column mapping for the above table and configure in env file
  • env file should be updated with bigquery, kafka, and application parameters

Run locally:

git clone https://github.com/odpf/beast
export $(cat ./env/sample.properties | xargs -L1) && gradle clean runConsumer

Run with Docker

The image is available in odpf dockerhub.

export TAG=release-0.1.1
docker run --env-file beast.env -v ./local_dir/project-secret.json:/var/bq-secret.json -it odpf/beast:$TAG
  • -v mounts local secret file project-sercret.json to the docker mentioned location, and GOOGLE_CREDENTIALS should match the same /var/bq-secret.json which is used for BQ authentication.
  • TAGYou could update the tag if you want the latest image, the mentioned tag is tested well.

Running on Kubernetes

Create a beast deployment for a topic in kafka, which needs to be pushed to BigQuery.

  • Deployment can have multiple instance of beast
  • A beast container consists of the following threads:
    • A kafka consumer
    • Multiple BQ workers
    • A committer
  • Deployment also includes telegraf container which pushes stats metrics Follow the instructions in chart for helm deployment

BQ Setup:

Given a TestMessage proto file, you can create bigtable with schema

# create new table from schema
bq mk --table <project_name>:dataset_name.test_messages ./docs/test_messages.schema.json

# query total records
bq query --nouse_legacy_sql 'SELECT count(*) FROM `<project_name>:dataset_name.test_messages LIMIT 10'

#  update bq schema from local schema json file
bq update --format=prettyjson <project_name>:dataset_name.test_messages  booking.schema

# dump the schema of table to file
bq show --schema --format=prettyjson <project_name>:dataset_name.test_messages > test_messages.schema.json

Produce messages to Kafka

You can generate messages with TestMessage.proto with sample-kafka-producer, which pushes N messages

Running Stencil Server

  • run shell script ./run_descriptor_server.sh to build descriptor in build directory, and python server on :8000
  • stencil url can be configured to curl http://localhost:8000/messages.desc

Contribution

  • You could raise issues or clarify the questions
  • You could raise a PR for any feature/issues
  • You could help us with documentation

To run and test locally:

git clone https://github.com/odpf/beast
export $(cat ./env/sample.properties | xargs -L1) && gradlew test

More Repositories

1

go-ratelimit

Ratelimit your methods using Redis
Go
40
star
2

ziggurat

Go
34
star
3

hospital

An automated runbook system for treating known failures
Go
34
star
4

kubehandler

A framework for writing Kubernetes controllers
Go
29
star
5

probed

A unix daemon to perform health check on proxy backends.
Go
26
star
6

gojek-commons

Dropwizard based micro-service development framework
JavaScript
25
star
7

palantir

Yet Another Go Config Library
Go
18
star
8

s2-calc

S2 Geometry Calculator
JavaScript
15
star
9

iap_auth

Sidecar/Proxy for talking to an IAP enabled service
Go
13
star
10

xrun

Go
12
star
11

stevedore

The Declarative Kubernetes Package Manager
Go
12
star
12

albatross

Go
9
star
13

envoy-lb-operator

Easy to use envoy load balancer control plane for k8s.
Go
8
star
14

xtools

XTools is a submodule based repo to host re-usable Golang code.
Go
7
star
15

heimdall

[Deprecated]
Go
7
star
16

prattle

A Distributed In-Memory Cache
Go
5
star
17

go-multierror

Wrapper for handling zero or more errors
Go
5
star
18

client-authenticator-rb

Ruby gem for Client Authentication for Api requests
Ruby
4
star
19

async-worker

An asynchronous job worker library for Clojure, implemented on RabbitMQ.
Clojure
4
star
20

kong-dynamic-upstream

An kong plugin for dynamic upstreams
Go
3
star
21

gocd-gcr-poller-plugin

Plugin to provide GCR docker images as package materials in Go-CD
Java
3
star
22

rubyconf-2017-contest

3
star
23

sample-kafka-producer

Kafka Producer to push sample proto message
Go
2
star
24

meajurements

Clojure statsd client that supports tags in the dogstatsd format
Clojure
2
star
25

kong-google-group-membership

Kong plugin using go-pdk for google group membership
Go
2
star
26

client-id-auth-middleware

Go
2
star
27

ogi

utility to enable flexible ETL scenarios, supports golang plug-in for built-in consumer|transformer|producer options
Go
2
star
28

gojektech.github.io

GO-JEK Tech's Free and Open Source Software projects. πŸ‘» πŸ•Έ 🚒
HTML
2
star
29

go-run

Go
1
star
30

grafonnet-bigquery-panel

Grafonnet Plugin for the bigquery panel
Jsonnet
1
star
31

albatross-client-go

Go client for albatross
Go
1
star
32

kingsly-certbot-cookbook

Ruby
1
star
33

rabbit-retry

RabbitMQ based retries for Ziggurat
Go
1
star
34

go-clientauth

TO BE DEPRECATED
Go
1
star
35

gojektech

Middleman-Powered GO-JEK Tech Website πŸ‘» πŸ•Έ hosted with ❀️ by GitHub Pages
Ruby
1
star
36

iap-auth-cookbook

Installs IAP auth (https://github.com/gojekfarm/iap_auth) as a systemd service on your VM box
Ruby
1
star