• Stars
    star
    379
  • Rank 113,004 (Top 3 %)
  • Language
    Go
  • Created over 6 years ago
  • Updated over 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

tiny but powerful rpc framework

qrpc, 轻量级的通用长链接框架

qrpc 提供完整的服务端及客户端功能,并支持以下4种特性使得rpc变得极为容易:

  • 阻塞非阻塞
  • 流式非流式
  • 主动推送
  • 双向调用

默认是阻塞模式,也就是同一个长链接的请求是串行处理,类似http/1.1,但是通过微小的改动就可以切换到其他模式。

此外,qrpc还提供了桥接网络的特性,该特性使得多协议支持不费吹灰之力,同样的一套代码可以同时跑在tcpwebsocket及任意已有的协议之下,详情参考ws/README.md;以及代码生成功能,使得服务的注册和调用极大地便利化,详情参考codegen demo

这里ppt版。


协议设计

qrpc提供请求->响应以及主动推送两大类的交互能力。

请求->响应又分为「阻塞非阻塞」以及「流式非流式」。

qrpc请求响应有相同的结构:,即代码中的Frame

每个帧包括8字节的唯一标识1字节的flag3字节的命令,以及不超过可配置上限长度的负荷

客户端会为每个请求帧自动生成8字节的唯一标识,服务端对应的响应帧会有相同的唯一标识。

通过这种方式,一个长链接可以同时发起多个请求,并且精确地知道每个请求对应的响应结果。

此外,请求响应都可以由多个组成,类似http中的chunked传输模式,这就是前面提到的流式非流式

而所有关于是否阻塞、是否流式、是否主动推送的元信息,都包含在头部1字节的flag之中!

具体字段参考frame.md


话不多说,干货开始!

阻塞模式

server.go:

package main
import "github.com/zhiqiangxu/qrpc"

const (
    HelloCmd qrpc.Cmd = iota
    HelloRespCmd
)
func main() {
    // handler的作用是路由,根据请求帧的命令,分发到不同的处理子函数
    handler := qrpc.NewServeMux()
    // 注册HelloCmd命令对应的子函数
    handler.HandleFunc(HelloCmd, func(writer/*用于回写响应*/ qrpc.FrameWriter, request/*当前请求的相关信息*/ *qrpc.RequestFrame) {
        // 响应帧和请求帧有相同的唯一标识,并且这里把响应帧的命令设置为HelloRespCmd,会更方便调试
        writer.StartWrite(request.RequestID, HelloRespCmd, 0)

        // 负荷部分为:hello world + 请求帧的原始负荷
        writer.WriteBytes(append([]byte("hello world "), request.Payload...))

        // 前面的StartWrite和WriteBytes其实是构建响应帧的过程
        // 构建完毕后通过EndWrite触发实际的回写
        writer.EndWrite()
    })
    // ServerBinding用于配制想监听的端口以及对应的处理函数,如果想监听多个端口,提供多个即可
    bindings := []qrpc.ServerBinding{
        qrpc.ServerBinding{Addr: "0.0.0.0:8080", Handler: handler}}
    // 构建server
    server := qrpc.NewServer(bindings)
    // 开始监听
    server.ListenAndServe()
}

client.go:

package main
import (
    "fmt"
    "github.com/zhiqiangxu/qrpc"
)

const (
    HelloCmd qrpc.Cmd = iota
)
func main() {
    // 采用默认配置
    conf := qrpc.ConnectionConfig{}

    // 建立一个qrpc的长链接
    conn, _ := qrpc.NewConnection("0.0.0.0:8080", conf, nil)

    // 发起一个命令为HelloCmd的请求帧,flag空,负荷为xu
    _, resp, _ := conn.Request(HelloCmd, 0/*no flags*/, []byte("xu"))
    // 获取响应
    frame, _ := resp.GetFrame()
    // 打印响应负荷
    fmt.Println("resp is", string(frame.Payload))
}

上面的例子中,由于flag为空,所以服务端会采用默认的串行处理模式。


非阻塞模式

要使用该模式,只需要修改client.go的一行代码:

-    _, resp, _ := conn.Request(HelloCmd, 0/*no flags*/, []byte("xu"))
+    _, resp, _ := conn.Request(HelloCmd, qrpc.NBFlag, []byte("xu"))

这样服务端便会并行处理这个长链接发来的请求!


流式

类似http中的chunked传输模式,不论请求还是响应,都可以拆成多个帧。

下面按流式请求流式响应分别介绍。

流式请求:

streamclient.go:

package main
import (
    "fmt"
    "github.com/zhiqiangxu/qrpc"
)

const (
    HelloCmd qrpc.Cmd = iota
)
func main() {
     // 采用默认配置
    conf := qrpc.ConnectionConfig{}

    // 建立一个qrpc的长链接
    conn, _ := qrpc.NewConnection("0.0.0.0:8080", conf, nil)

    // 采用流式发送HelloCmd请求,第一个请求帧的负荷是first frame
    writer, resp, _ := conn.StreamRequest(HelloCmd, 0, []byte("first frame"))
    // 构建第二个请求帧,负荷是last frame
    writer.StartWrite(HelloCmd)
    writer.WriteBytes([]byte("last frame"))
    // 发送请求,并标记流式结束
    writer.EndWrite(true) // will attach StreamEndFlag
    // 获取响应
    frame, _ := resp.GetFrame()
    // 打印响应负荷
    fmt.Println("resp is", string(frame.Payload))
}

streamserver.go:

package main
import (
    "github.com/zhiqiangxu/qrpc"
)

const (
    HelloCmd qrpc.Cmd = iota
    HelloRespCmd
)
func main() {
    handler := qrpc.NewServeMux()
    handler.HandleFunc(HelloCmd, func(writer qrpc.FrameWriter, request *qrpc.RequestFrame) {
        // 首帧的处理类似非流式,只是EndWrite最后才会调用
        writer.StartWrite(request.RequestID, HelloRespCmd, 0)

        writer.WriteBytes(append([]byte("first frame "), request.Payload...))

        // 循环取出流式请求中的剩余帧
        for {
            continueFrames := <-request.FrameCh()
            // continueFrames为nil表示该请求的所有帧获取完毕
            if continueFrames == nil {
                break
            }
            // 将后续帧的负荷追加到响应中去
            writer.WriteBytes(append([]byte(" continue frame "), continueFrames.Payload...))
        }
        // 响应帧构建完毕,回写给客户端
        writer.EndWrite()
    })
    bindings := []qrpc.ServerBinding{
        qrpc.ServerBinding{Addr: "0.0.0.0:8080", Handler: handler}}
    server := qrpc.NewServer(bindings)
    err := server.ListenAndServe()
    if err != nil {
        panic(err)
    }
}

流式响应:

package main
import (
    "github.com/zhiqiangxu/qrpc"
    "fmt"
)

const (
    HelloCmd qrpc.Cmd = iota
    HelloRespCmd
)
func main() {
    handler := qrpc.NewServeMux()
    handler.HandleFunc(HelloCmd, func(writer qrpc.FrameWriter, request *qrpc.RequestFrame) {
        // 构建流式响应的第一帧
        writer.StartWrite(request.RequestID, HelloRespCmd, qrpc.StreamFlag)
        writer.WriteBytes(append([]byte("first frame "), request.Payload...))
        // 第一帧构建完毕,发送
        writer.EndWrite()

        for {
            // 获取流式请求的后续帧
            continueFrames := <-request.FrameCh()
            if continueFrames == nil {
                break
            }

            fmt.Printf("%s\n", continueFrames.Payload)
            // 构建流式响应的后续帧
            if continueFrames.Flags.IsDone() {
                // 最后一帧,flag标记为qrpc.StreamEndFlag
                writer.StartWrite(request.RequestID, HelloRespCmd, qrpc.StreamEndFlag)
            } else {
                // 不是最后一帧,标记为qrpc.StreamFlag
                writer.StartWrite(request.RequestID, HelloRespCmd, qrpc.StreamFlag)
            }
            
            writer.WriteBytes(append([]byte(" continue frame "), continueFrames.Payload...))
            // 后续帧构建完毕,发送
            writer.EndWrite()
        }
    })
    bindings := []qrpc.ServerBinding{
        qrpc.ServerBinding{Addr: "0.0.0.0:8080", Handler: handler}}
    server := qrpc.NewServer(bindings)
    err := server.ListenAndServe()
    if err != nil {
        panic(err)
    }
}

关键是qrpc.StreamFlag!


推送模式

package main
import (
    "github.com/zhiqiangxu/qrpc"
    "github.com/zhiqiangxu/util"
    "sync"
)

const (
    HelloCmd qrpc.Cmd = iota
    HelloRespCmd
)
func main() {
    handler := qrpc.NewServeMux()
    handler.HandleFunc(HelloCmd, func(writer qrpc.FrameWriter, request *qrpc.RequestFrame) {
        var (
            wg    sync.WaitGroup
        )
        qserver := request.ConnectionInfo().Server()
        pushID := qserver.GetPushID()
        // 遍历所有长链接
        qserver.WalkConn(0, func(writer qrpc.FrameWriter, ci *qrpc.ConnectionInfo) bool {
            util.GoFunc(&wg, func() {
                // PushFlag表示主动推送
                writer.StartWrite(pushID, HelloCmd, qrpc.PushFlag)
                writer.WriteBytes([]byte("pushed msg"))
                writer.EndWrite()
            })
            return true
        })
        wg.Wait()

        writer.StartWrite(request.RequestID, HelloRespCmd, 0)
        writer.WriteBytes(append([]byte("push done"), request.Payload...))
        writer.EndWrite()
    })
    bindings := []qrpc.ServerBinding{
        qrpc.ServerBinding{Addr: "0.0.0.0:8080", Handler: handler}}
    server := qrpc.NewServer(bindings)
    err := server.ListenAndServe()
    if err != nil {
        panic(err)
    }
}

上述代码中,HelloCmd的处理子函数将给每个长链接推送一条消息!

客户端处理推送消息的方式如下:

-    conn, _ := qrpc.NewConnection("0.0.0.0:8080", conf, nil)
+    conn, _ := qrpc.NewConnection("0.0.0.0:8080", conf, func(conn *qrpc.Connection, pushedFrame *qrpc.Frame) {
+        fmt.Println(pushedFrame)
+    })

双向调用

前面的demo都是client调用server,其实client也可以注册回调让server调,参考测试用例中的TestClientHandler


Performance

MBP单机压测数据:

avatar

性能大概是http4 倍!

单独施压机,16C32G压测数据:

benchmark

qps达到了203k,且此时cpu使用率为75%


基于qrpc的项目

  1. qchat (趣头条IM中台,未开源)
  2. qpush (趣头条Push中台,未开源)
  3. qfe (趣头条长链网关,未开源)
  4. kvrpc (一个支持事务的kv数据库)

社区

目前我们暂时只针对中国的用户,所以采用了微信群的交流方式,下面是二维码,有兴趣的同学可以扫码加入:

PS:扫码请注明来意,比如:学习qrpc或者go爱好者

Stargazers

Stargazers over time