• Stars
    star
    168
  • Rank 224,033 (Top 5 %)
  • Language
    Go
  • License
    Other
  • Created almost 2 years ago
  • Updated about 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

EvHub supports the distribution of delayed, transaction, real-time and cyclic events. It is used in scenarios such as system decoupling, asynchronous calling and distributed transactions.

Evhub

English | 简体中文

Infra

img.png EvHub is an event-driven runtime that provides unified event model, retrieves asynchronous events, and distributes events to application scenarios to decouple upstream and downstream systems. Support real-time, delay, loop and transaction event scenarios, to achieve high real-time, high reliable general event components. It makes it easy for developers to build event-driven architecture stateless microservices.

Features

  • Multi-protocol: Supports multiple protocols, such as HTTP and gRPC
  • Support for transactional messages
  • Supports delay events, including normal delay events and delay processing events
  • Recurring events are supported, including regular recurring events and crontab events
  • Supports multiple event stores: Kafka, Pulsar, Mysql, Redis, etc
  • Support for multiple microservice architectures
  • Supports high availability and easy horizontal expansion

Application scenarios.

EvHub can be applied to data consistency problems in a large number of scenarios,here are a few common ones

  • Peak cut
  • Broadcast
  • Cache management
  • System decoupling, event driven: Greatly simplifying the architectural complexity
  • Distributed transaction

run EvHub

make start

stop EvHub

make stop

Configuration

curl --location --request POST '127.0.0.1:8081/v1/producer' \
--header 'Content-Type: application/json' \
--data-raw '{
    "producer_conf_info":{
        "app_id":"eh",
        "topic_id":"test",
        "tx_protocol_type":0,
        "tx_address":"addr",
        "tx_callback_interval":5000
    }
}'

curl --location --request POST '127.0.0.1:8081/v1/processor' \
--header 'Content-Type: application/json' \
--data-raw '{
    "processor_conf_info":{
        "dispatcher_id":"evhub_eh_test_addr1",
        "app_id":"eh",
        "topic_id":"test",
        "timeout":5000,
        "protocol_type":"grpcSend",
        "addr":"ip:6001",
        "retry_strategy":{
            "retry_interval": 5000,
            "retry_count":3
        }
    }
}'

Code

package main

import (
	"context"
	"time"

	"github.com/tencentmusic/evhub/pkg/gen/proto/comm"
	"github.com/tencentmusic/evhub/pkg/grpc"
	"github.com/tencentmusic/evhub/pkg/grpc/interceptor"
	"github.com/tencentmusic/evhub/pkg/log"

	eh_pc "github.com/tencentmusic/evhub/pkg/gen/proto/processor"
	eh_pd "github.com/tencentmusic/evhub/pkg/gen/proto/producer"
	ggrpc "google.golang.org/grpc"
)

func main() {
	serverAddr := ":6001"
	addr := "127.0.0.1:9000"
	timeout := time.Second * 5
	conn, err := grpc.Dial(&grpc.ClientConfig{Addr: addr, Timeout: timeout})
	if err != nil {
		log.Panicf("dial err:%v", err)
	}
	defer conn.Close()
	// grpc client
	rsp, err := eh_pd.NewevhubProducerClient(conn).Report(context.Background(), &eh_pd.ReportReq{
		Event: &comm.Event{
			AppId:   "eh",
			TopicId: "test",
		},
		Trigger: &comm.EventTrigger{
			TriggerType: comm.EventTriggerType_EVENT_TRIGGER_TYPE_REAL_TIME,
		},
	})
	if err != nil {
		log.Panicf("report err:%v", err)
	}
	if rsp.Ret != nil && rsp.Ret.Code != 0 {
		log.Panicf("report failed rsp:%+v", rsp)
	}
	log.Infof("report success rsp:%+v", rsp)
	StartGrpcServer(serverAddr, &Svc{})
}

type Svc struct{}

func (s *Svc) Dispatch(ctx context.Context, req *eh_pc.DispatchReq) (*eh_pc.DispatchRsp, error) {
	log.Infof("ctx:%v req:%+v", ctx, req)
	return &eh_pc.DispatchRsp{}, nil
}

func StartGrpcServer(serverAddr string, s *Svc) {
	opts := []ggrpc.ServerOption{
		ggrpc.ChainUnaryInterceptor(interceptor.DefaultUnaryServerInterceptors()...),
	}
	server := grpc.NewServer(&grpc.ServerConfig{
		Addr: serverAddr,
	}, opts...)

	eh_pc.RegisterevhubProcessorServer(server.Server(), s)
	if err := server.Serve(); err != nil {
		log.Panicf("grpc failed to serve: %v", err)
	}
}

More examples

If you want more quick start examples, please refer to quick-start-sample

Give a star!

If you think this project is interesting, or helpful to you, please give a star!

More Repositories

1

cube-studio

cube studio开源云原生一站式机器学习/深度学习/大模型AI平台,支持sso登录,多租户,大数据平台对接,notebook在线开发,拖拉拽任务流pipeline编排,多机多卡分布式训练,超参搜索,推理服务VGPU,边缘计算,serverless,标注平台,自动化标注,数据集管理,大模型微调,vllm大模型推理,llmops,私有知识库,AI模型应用商店,支持模型一键开发/推理/微调,支持国产cpu/gpu/npu芯片,支持RDMA,支持pytorch/tf/mxnet/deepspeed/paddle/colossalai/horovod/spark/ray/volcano分布式
Jupyter Notebook
3,156
star
2

supersonic

SuperSonic is the next-generation BI platform that integrates Chat BI (powered by LLM) and Headless BI (powered by semantic layer) paradigms.
Java
1,556
star
3

argo-workflow

基于argo的云原生调度,项目管理,在线notebook,在线镜像构建,拖拉拽编排pipeline,定时调度,实例管理
Python
62
star
4

fab

低代码平台,前端低代码,兼后端低代码, python后端框架 react前端框架
Python
48
star
5

QQMusic_Innovation_QPlay_AIDL_OpenID_Demo

24
star
6

qqmusic-innovocation-openapi-sdk-android-demo

17
star
7

TME-Audio-Super-Resolution-Samples

Audio samples for the paper 'Phase-aware music super-resolution using generative adversarial networks'
13
star
8

cube-job-template

机器学习平台 cube-studio 中的任务模板
Python
12
star
9

QQMusic_Innovation_QPlay_Auto_Demo_iOS

Objective-C
6
star
10

QQMusic_Innovation_QPlay_OpenAPI_Demo

The project demonstrates that how the arguments works
Python
5
star
11

QQMusic_Innovation_QPlay_OpenID_Demo_iOS

Objective-C
4
star
12

QQMusic_Innovation_QPlay_OpenID_Demo_Golang

Go
1
star