• This repository has been archived on 20/Nov/2020
  • Stars
    star
    162
  • Rank 232,284 (Top 5 %)
  • Language
    F#
  • License
    Other
  • Created almost 9 years ago
  • Updated about 4 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Kafunk: F# Kafka client

NOTICE: SUPPORT FOR THIS PROJECT ENDED ON 18 November 2020

This projected was owned and maintained by Jet.com (Walmart). This project has reached its end of life and Walmart no longer supports this project.

We will no longer be monitoring the issues for this project or reviewing pull requests. You are free to continue using this project under the license terms or forks of this project at your own risk. This project is no longer subject to Jet.com/Walmart's bug bounty program or other security monitoring.

Actions you can take

We recommend you take the following action:

  • Review any configuration files used for build automation and make appropriate updates to remove or replace this project
  • Notify other members of your team and/or organization of this change
  • Notify your security team to help you evaluate alternative options

Forking and transition of ownership

For security reasons, Walmart does not transfer the ownership of our primary repos on Github or other platforms to other individuals/organizations. Further, we do not transfer ownership of packages for public package management systems.

If you would like to fork this package and continue development, you should choose a new name for the project and create your own packages, build automation, etc.

Please review the licensing terms of this project, which continue to be in effect even after decommission.

ORIGINAL README BELOW


UPDATE

We found a bug in the implementation of the v0.11+ protocol wherein messages were skipped during consumption. The bug only manifests when using the newer protocol version (the default). Due to this bug and for long term maintenance, we've started investing into the Confluent.Kafka client: https://github.com/jet/confluent-kafka-fsharp.

Kafunk

NuGet Status Build status Build status

Kafunk is a Kafka client written in F#.

See the home page for details.

Please also join the F# Open Source Group

Version Support

Version Status Notes
0.9.0 Complete
0.10.0 Complete
0.10.1 Complete
0.11+auto Protocol Protocol implementation bug found (skipped messages)

Feature Support

Feature Status
GZip Complete
Snappy Complete
LZ4 #125
TLS #66
SASL #139
ACL #140
TXNS #214

Hello World

#r "kafunk.dll"
#r "FSharp.Control.AsyncSeq.dll"

open Kafunk
open System

let conn = Kafka.connHost "existential-host"

// metadata

let metadata = 
  Kafka.metadata conn (MetadataRequest([|"absurd-topic"|])) 
  |> Async.RunSynchronously

for b in metadata.brokers do
  printfn "broker|host=%s port=%i nodeId=%i" b.host b.port b.nodeId

for t in metadata.topicMetadata do
  printfn "topic|topic_name=%s topic_error_code=%i" t.topicName t.topicErrorCode
  for p in t.partitionMetadata do
    printfn "topic|topic_name=%s|partition|partition_id=%i" t.topicName p.partitionId


// producer

let producerCfg =
  ProducerConfig.create (
    topic = "absurd-topic", 
    partition = Partitioner.roundRobin, 
    requiredAcks = RequiredAcks.Local)

let producer =
  Producer.createAsync conn producerCfg
  |> Async.RunSynchronously

// produce single message

let prodRes =
  Producer.produce producer (ProducerMessage.ofBytes ("hello world"B))
  |> Async.RunSynchronously

printfn "partition=%i offset=%i" prodRes.partition prodRes.offset






// consumer

let consumerCfg = 
  ConsumerConfig.create ("consumer-group", "absurd-topic")

let consumer =
  Consumer.create conn consumerCfg

// commit on every message set

Consumer.consume consumer 
  (fun (s:ConsumerState) (ms:ConsumerMessageSet) -> async {
    printfn "member_id=%s topic=%s partition=%i" s.memberId ms.topic ms.partition
    do! Consumer.commitOffsets consumer (ConsumerMessageSet.commitPartitionOffsets ms) })
|> Async.RunSynchronously


// commit periodically


Consumer.consumePeriodicCommit consumer
  (TimeSpan.FromSeconds 10.0) 
  (fun (s:ConsumerState) (ms:ConsumerMessageSet) -> async {
    printfn "member_id=%s topic=%s partition=%i" s.memberId ms.topic ms.partition })
|> Async.RunSynchronously


// commit consumer offsets explicitly

Consumer.commitOffsets consumer [| 0, 1L |]
|> Async.RunSynchronously

// commit consumer offsets explicitly to a relative time

Consumer.commitOffsetsToTime consumer Time.EarliestOffset
|> Async.RunSynchronously


// get current consumer state

let consumerState = 
  Consumer.state consumer
  |> Async.RunSynchronously

printfn "generation_id=%i member_id=%s leader_id=%s assignment_stratgey=%s partitions=%A" 
  consumerState.generationId consumerState.memberId consumerState.leaderId 
  consumerState.assignmentStrategy consumerState.assignments 



// fetch offsets of a consumer group for all topics

let consumerOffsets =
  Consumer.fetchOffsets conn "consumer-group" [||]
  |> Async.RunSynchronously

for (t,os) in consumerOffsets do
  for (p,o) in os do
    printfn "topic=%s partition=%i offset=%i" t p o


// fetch topic offset information

let offsets = 
  Offsets.offsets conn "absurd-topic" [] [ Time.EarliestOffset ; Time.LatestOffset ] 1
  |> Async.RunSynchronously

for kvp in offsets do
  for (tn,offsets) in kvp.Value.topics do
    for p in offsets do
      printfn "time=%i topic=%s partition=%i offsets=%A" kvp.Key tn p.partition p.offsets

Maintainers

More Repositories

1

equinox

.NET event sourcing library with CosmosDB, DynamoDB, EventStoreDB, message-db, SqlStreamStore and integration test backends. Focused at stream level; see https://github.com/jet/propulsion for cross-stream projections/subscriptions/reactions
F#
472
star
2

propulsion

.NET event stream projection and scheduling platform with CosmosDB, DynamoDB, EventStoreDB, MemoryStore, message-db, Equinox and Kafka integrations
F#
178
star
3

falanx

Generates F# code from protobuf schema for binary and json format
F#
142
star
4

kube-webhook-certgen

Tools to help with self signed cert generation for Kubernetes test environment
Go
134
star
5

FsKafka

Minimal F# wrappers for Confluent.Kafka+librdkafka.redist 1.x
F#
87
star
6

FsCodec

F# Event-Union Contract Encoding with versioning tolerant converters supporting System.Text.Json and Newtonsoft.Json
F#
84
star
7

damon

Supervisor program to constrain Windows executables running under Nomad's raw_exec driver
Go
83
star
8

dotnet-templates

Example app and service templates `dotnet new -i Equinox.Templates; dotnet new eqx*/pro*` https://github.com/jet/equinox https://github.com/jet/FsCodec
F#
65
star
9

nomad-service-alerter

Alerting for Nomad Jobs
Go
36
star
10

XRay

Our distributed tracing library https://jet.github.io/XRay/
F#
16
star
11

CallPolly

Apply systemwide resilience strategies consistently across subsystems, standing on Polly's shoulders
F#
16
star
12

Microservice-Edge-Testing-Example

A sample implementation of complex microservice edge testing using distributable fakes
F#
13
star
13

Vertigo.Json

A reflection-based JSON (de)serialization library written in and for F#
F#
13
star
14

baybars

Common Python library for interacting various third party infrastructure(Azure, Nomad, Kafka, Vault, Consul) that we use at Jet
Python
8
star
15

apidiff

Check your API surface
F#
5
star
16

wiremock-admin-api-client

An F# client for the WireMock Admin API
F#
5
star
17

go-interstellar

A Go client for interacting with the REST/SQL API of CosmosDB
Go
4
star
18

oms.infrastructure-talk

Demo code for Open F# in San Francisco
F#
3
star
19

go-mantis

A "standard library" for Jet's Golang codebase
Go
2
star