WPaxos
WPaxos是Paxos一致性算法的生产级高性能Java实现,参考了微信团队C++语言开发的PhxPaxos类库,支持多分组,可用于解决高并发、高可靠分布式系统中多副本数据一致性问题以及分布式共识问题。针对一些网络分区、机器宕机、进程异常(OOM、卡顿、强制关闭)等突发情况,已经过一系列实际应用场景的验证。
功能特性
- 高性能:Multi-Paxos算法与Basic-Paxos算法结合,支持多Paxos分组,有序确定多个值
- 节点间可通过状态机checkpoint或逐条数据流两种方式对落后数据快速对齐
- 具有网络分区容错性,集群少数节点故障服务高可用性
- 提供有Master自动选举功能
- 集群可通过Paxos协议动态、安全的增加节点、删除节点
- 高扩展性:支持存储模块与异步通信模块自定义
- 一个Paxos实例可以同时挂载多个状态机
- 提交的数据支持增量checksum校验
- 可添加不参与提案投票,仅用于备份数据的follower节点
- 默认存储支持按照时间与holdcount两种清理paxoslog方式
需要
编译需要 JDK 8 及以上。
性能
测试运行环境
CPU:20 x Intel(R) Xeon(R) Silver 4114 CPU @ 2.20GHz
内存:192 GB
硬盘:ssd
网卡:万兆网卡
集群机器个数: 3个
测试结果
采用源码中Simple demo为测试样例,为了排除其它干扰,状态机空跑;instance索引采用默认的leveldb方式;直接在服务节点本地调propose接口测试, group数量大于等于3时,master均匀分布于集群3个节点,每个group由master节点发起propose。
场景一:无批量提交
data-size | group-count | qps | data-size | group-count | qps | data-size | group-count | qps |
---|---|---|---|---|---|---|---|---|
100B | 1 | 3835 | 2KB | 1 | 2671 | 100KB | 1 | 337 |
100B | 20 | 29577 | 2KB | 20 | 20259 | 100KB | 20 | 1416 |
100B | 50 | 34165 | 2KB | 50 | 22053 | 100KB | 50 | 1350 |
100B | 100 | 24495 | 2KB | 100 | 17131 | 100KB | 100 | 1300 |
场景二:批量提交
不同size数据在批量提交时,每个group达到最大性能的最佳batch取值及索引方式也有所不同,下面是部分测试数据。
data-size | group-count | batch-count | 索引方式 | qps(W) |
---|---|---|---|---|
100B | 1 | 550 | 文件索引 | 17 |
100B | 20 | 250 | 文件索引 | 137 |
100B | 50 | 150 | 文件索引 | 132 |
100B | 100 | 150 | levelDB索引 | 75 |
2KB | 1 | 250 | 文件索引 | 1 |
2KB | 20 | 20 | 文件索引 | 17 |
2KB | 50 | 10 | 文件索引 | 22 |
2KB | 100 | 20 | levelDB索引 | 16 |
场景三:leveldb索引与文件索引对比
下面是size100B数据非批量提交时,不同group下Leveldb与文件两种索引方式的性能对比,从结果分析,在group数量比较小时,文件索引性能要优于leveldb索引。
data-size | group-count | 文件索引qps | levelDB索引qps |
---|---|---|---|
100B | 1 | 4207 | 3835 |
100B | 3 | 11595 | 9684 |
100B | 9 | 25035 | 21027 |
100B | 18 | 35277 | 29577 |
100B | 36 | 40062 | 34398 |
100B | 54 | 44061 | 36495 |
100B | 72 | 40929 | 54345 |
100B | 100 | 37830 | 42015 |
快速使用
maven依赖
<dependency>
<groupId>io.github.wuba</groupId>
<artifactId>wpaxos.core</artifactId>
<version>1.0.2-Release</version>
</dependency>
下面以sample中的echo为例,说明如何快速使用WPaxos,构建WPaxos集群有以下三个关键步骤:构建状态机、构建上下文信息和初始化WPaxos实例。
创建状态机
首先,状态机为WPaxos数据的最终处理逻辑,instance数据一旦被状态机执行,不会再被修改,集群中所有节点只要初始状态一致,那么状态机最终的状态也保持一致。这里我们实现一个状态机叫EchoSM,该类继承StateMachine,如下:
public class EchoSM implements StateMachine {
public static final int SMID = 1;·
private int groupId;
public EchoSM(int i) {
this.groupId = i;
}
@Override
public int getSMID() {
return SMID;
}
@Override
public boolean execute(int groupIdx, long instanceID, byte[] paxosValue, SMCtx smCtx) {
System.out.println("[SM Execute] ok, smid " + this.getSMID() + " instanceid " + instanceID + " value " + new String(paxosValue));
//only commiter node have SMCtx.
if(smCtx != null && smCtx.getpCtx() != null) {
EchoSMCtx peCtx = (EchoSMCtx)smCtx.getpCtx();
peCtx.setExecuteRet(0);
peCtx.setEchoRespValue(paxosValue);
}
executeForCheckpoint(groupIdx, instanceID, paxosValue);
return true;
}
}
在一个Paxos实例分组中可以同时挂载多个状态机,但是同一instance数据只会选择应用于某一个状态机,每个状态机采用唯一的SMID来标识。
execute()方法为状态机的处理逻辑,其中
第一个参数为groupIdx,用来区分不同Paxos分组实例;
第二个参数为instanceID,为一个全局递增的序列号,标识每次提交数据执行的编号;
第三个为paxosValue,即我们提交的数据;
第四个参数为SMCtx类的实例,即整个执行过程的上下文信息,下文中介绍。
WPaxos会保证同一个Paxos group的多个节点都会执行相同序列的状态机execute()方法,从而实现强一致性。
构建上下文信息
既然每条提交的数据都可选择应用的状态机,那么就需要一个上下文信息,用来保存数据应用的目标状态机信息。 这里用到了前文说的SMCtx类,如下:
public class SMCtx {
private int smId;
private Object pCtx;
public SMCtx() {
}
public SMCtx(int smId, Object pCtx) {
super();
this.smId = smId;
this.pCtx = pCtx;
}
}
SMCtx类保存了整个执行过程的上下文信息,由写入者初始化,并且会传入到状态机的execute()方法,主要封装有以下两种信息:
smId,即目标执行状态机ID;
pCtx,保存了用户自定义的上下文数据,用户在状态机中使用该变量,可用于回传状态机执行结果。
下面为echo上下文的定义:
public class EchoSMCtx {
/*状态机执行是否成功*/
public int executeRet;
/*状态机执行数据*/
private byte[] echoRespValue;
}
以上两个参数都是用来回传状态机执行结果信息。
运行WPaxos实例
在编写好echo状态机之后,接下来就是运行WPaxos,并且挂载echo状态机。 如下,是EchoServer类,用于封装Paxos node初始化、启动或其它接口调用:
public class EchoServer {
private NodeInfo myNode;
private List<NodeInfo> nodeList;
private String rootPath;
private Node paxosNode;
private IndexType indexType;
private int groupCount;
public EchoServer(NodeInfo myNode, List<NodeInfo> nodeList, int groupCount, String rootPath, int indexType) {
this.myNode = myNode;
this.nodeList = nodeList;
this.paxosNode = null;
this.groupCount = groupCount;
this.rootPath = rootPath;
if (indexType == IndexType.LEVEL_DB.getType()) {
this.indexType = IndexType.LEVEL_DB;
} else {
this.indexType = IndexType.PHYSIC_FILE;
}
}
public void runPaxos() throws Exception {
Options options = new Options();
String logStoragePath = this.makeLogStoragePath(this.rootPath);
options.setLogStoragePath(logStoragePath);
options.setGroupCount(groupCount);
options.setMyNode(this.myNode);
options.setNodeInfoList(this.nodeList);
options.setUseMembership(true);
options.setUseBatchPropose(false);
options.setIndexType(indexType);
options.setStoreConfig(new StoreConfig(rootPath, null));
for(int gid = 0; gid < groupCount; gid++) {
GroupSMInfo smInfo = new GroupSMInfo();
smInfo.setUseMaster(true);
smInfo.setGroupIdx(gid);
smInfo.getSmList().add(new EchoSM(gid));
options.getGroupSMInfoList().add(smInfo);
}
this.paxosNode = Node.runNode(options);
}
}
先看下几个参数的意义,
myNode标识本机的ip和port,nodeList标识Paxos集群所有节点的ip和port;
indexType标识要使用的索引存储类型,WPaxos中提供两种索引存储类型:levelDB和文件存储。使用方可以根据自身场景决定使用哪种索引存储;
groupCount标识要同时运行多少个Paxos分组实例,多实例之间完全独立;
runPaxos()方法来初始化并启动WPaxos,参数Options类包含了WPaoxs运行的所有参数设置;
设置好所有运行参数后,接下来需要挂载状态机。
GroupSMInfo类代表一个Paxos实例的状态机列表,其中groupIdx标识Paxos分组实例ID,smList标识挂载的状态机列表。
最后调用runNode()方法,启动WPaxos。
发起请求
我们通过echo()方法,来发起请求
public String echo(String echoReqValue, int groupIdx) throws Exception {
SMCtx ctx = new SMCtx();
EchoSMCtx echoSMctx = new EchoSMCtx();
ctx.setSmId(EchoSM.SMID);
ctx.setpCtx(echoSMctx);
this.paxosNode.setTimeoutMs(3000);
ProposeResult proposeResult = null;
proposeResult = this.paxosNode.propose(groupIdx, echoReqValue.getBytes(), ctx);
if (PaxosTryCommitRet.PaxosTryCommitRet_OK.getRet() == proposeResult.getResult() && echoSMctx.getEchoRespValue() != null) {
return new String(echoSMctx.getEchoRespValue());
}
return null;
}
首先定义上下文类型EchoSMCtx,然后设置到SMCtx的pCtx变量中,同时设置smID=1,标识请求数据要由smID=1的状态机来执行。
然后调用paxosNode的propose方法,发起数据同步请求,第一个参数为groupIdx,指定要执行的Paxos group,第二个参数为实际发送的数据,第三个参数为我们创建的上下文信息。
在略复杂的应用场景,如果同一个Paxos group有多个线程并发发起propose请求同步数据,可改为调用batchPropose接口,后端异步线程会进行批量合并,可提高整体吞吐量。
通过以上几个步骤,我们就利用WPaxos实现了一个简单的多机的echo()方法。
运行结果
我们启动三副本的节点,第一台机器:
echo server start, ip [127.0.0.1] port [30000]
please input :
1
[SM Execute] ok, smid 1 instanceid 5 value wpaxos_test0
echo response : wpaxos_test0
ip为127.0.0.1,port为30000的节点,可以看到我们输入的paxosValue为"wpaxos_test0",execute执行的结果打印出来的成功结果ok,通过上下文获取的返回值为"wpaxos_test0",与我们的输入相同。
看下其他副本的情况:
echo server start, ip [127.0.0.1] port [30001]
please input :
[SM Execute] ok, smid 1 instanceid 5 value wpaxos_test0
ip为127.0.0.1,port为30001的机器,execute执行结果与第一台机器相同。同样第三台机器执行结果也相同。
Master功能
Master是WPaxos中的一个角色,在多机构建的集合里面,任意时刻,每个Paxos实例中,最多只有一个节点能够成为master。
Master是一个十分实用的功能,在多数的分布式系统中,一般会选择master节点来同步更新数据到其它节点。
下面展示如何使用Master功能。 首先构建一个选举类:
public class ElectionServer {
private NodeInfo myNode;
private List<NodeInfo> nodeList;
private String rootPath;
private Node paxosNode;
private int groupCount;
public ElectionServer(NodeInfo myNode, List<NodeInfo> nodeList, int groupCount, String rootPath) {
this.myNode = myNode;
this.nodeList = nodeList;
this.paxosNode = null;
this.groupCount = groupCount;
this.rootPath = rootPath;
}
public void runPaxos() throws Exception {
Options options = new Options();
String logStoragePath = this.makeLogStoragePath(this.rootPath);
options.setLogStoragePath(logStoragePath);
options.setGroupCount(groupCount);
options.setMyNode(this.myNode);
options.setNodeInfoList(this.nodeList);
options.setUseMembership(true);
options.setUseBatchPropose(false);
options.setStoreConfig(new StoreConfig(rootPath, null));
for(int gid = 0; gid < groupCount; gid++) {
GroupSMInfo smInfo = new GroupSMInfo();
smInfo.setUseMaster(true);
smInfo.setGroupIdx(gid);
options.getGroupSMInfoList().add(smInfo);
}
this.paxosNode = Node.runNode(options);
}
public NodeInfo getMasterNode(int groupIdx) {
return this.paxosNode.getMaster(groupIdx);
}
public boolean isMaster(int groupIdx) {
return this.paxosNode.isIMMaster(groupIdx);
}
}
方法getMasterNode()获取当前实例的master节点信息。
isMaster()方法返回当前节点是否是master。
runPaxos()方法启动Paxos实例,这里看到与echo不同在于,不需要单独实现状态机,而是通过将GroupSMInfo的useMaster设置为true,开启内置的MasterStateMachine状态机。
这样我们就实现了master的选举。
成员变化
在Paxos实例运行过程中,可以动态增加或者删除集群成员,同样也不需要单独构建状态机,而是利用内置的SystemVSM状态机实现。
调用paxosNode的增加成员、删除成员方法即可,如下:
paxosNode.addMember(groupIdx, node);
paxosNode.removeMember(groupIdx, node);
其它
以上为简单的echo样例实现介绍,一些高级应用如数据快照checkpoint相关的使用,由于要结合不同应用场景实现,这里不再详细介绍,有需要可以参考KV存储样例中相关实现。
文档
如何贡献
致谢
感谢腾讯微信团队开源了如此优秀的 C++ PhxPaxos实现; 感谢Apache RocketMQ团队开源的关于文件管理部分的高效灵活设计;
开源许可
WPaxos 基于 Apache License 2.0 开源协议,源码中引用了开源协议同为 Apache License 2.0 的Apache RocketMQ存储部分关于文件管理相关的代码。