- 作者:老汪软件技巧
- 发表时间:2024-12-13 04:03
- 浏览量:
背景
GCounter只支持节点自身的增加操作,但不支持减少操作。在实际应用中,我们经常需要一个既可以增加也可以减少的计数器。
请设计一个CRDT(Conflict-free Replicated Data Type)来实现一套满足最终一致性的分布式系统
为了减少要理解的概念,下文描述的CRDT同时有两层意思
无冲突的数据类型,即类型一个CRDT实例,即实例思维链
flowchart
A["CRDT可以应用在不同的节点中"]
A -->B["CRDT可以表达任意节点的counter"]
B --> C["CRDT可以让任意节点增加、减少自身的counter"]
C --> D["可以合并多个节点中的CRDT,且无论合并的顺序是怎么样的,其结果都一致,\n即Merge(A.CRDT, B.CRDT, C.CRDT) = Merge(B.CRDT, C.CRDT, A.CRDT) = a new CRDT"]
D --> E["多次合并同一个节点的CRDT,其结果都等于合并一次,\n即Merge(A.CRDT, B.CRDT, B.CRDT) = Merge(A.CRDT, B.CRDT) = a new CRDT"]
E --> F["但是GCounter只能增加"]
F --> G["那我们设立两个GCounter,一个负责增加,一个负责减少"]
G --> H["那么某个节点就能增加、减少自身的counter、其值等于两个GCounter的差值"]
实现
这个CRDT如下,我们暂时给它起名PNCounter(稍后解释其为什么叫PNCounter)
const PNCounter:Map<string, [GCounter, GCounter]> = {
[A.id]: [1,0],
[B.id]: [2,1],
[C.id]: [3,0],
};
意思很明显,
因为是分布式系统,且支持弱网环境,所以这个PNCounter在不同节点里,都不一样。
比如下面的例子(稍微增加难度,B节点可能会没有C节点的信息、C节点也可能没有B节点的信息)
A.PNCounter = {
[A.id]: [1,0],
[B.id]: [1,1],
[C.id]: [2,0],
};
B.PNCounter = {
[A.id]: [1,0],
[B.id]: [2,1],
};
C.PNCounter = {
[A.id]: [1,0],
[C.id]: [3,0],
};
在A节点上增加自身counter通过A.PNCounter[A.id][0]++,减少自身counter通过A.PNCounter[A.id][1]++
合并A、B节点的PNCounter,比如Merge(A.PNCounter,B.PNCounter),将A.PNCounter和B.PNCounter中相同id的item的值进行合并,步骤如下
flowchart LR
B["1. 创建newPNCounter\n2. 合并A、B节点的key为一个Set,遍历Set"]
B1["2.1 如果A.PNCounter[id][0]存在,且B.PNCounter[id][0]不存在,\n则newPNCounter[id][0] = A.PNCounter[id][0]"]
B2["2.2 如果A.PNCounter[id][0]不存在,且B.PNCounter[id][0]存在,\n则newPNCounter[id][0] = B.PNCounter[id][0]"]
B3["2.3 如果A.PNCounter[id][0]存在,且B.PNCounter[id][0]存在,\n则newPNCounter[id][0] = Max(A.PNCounter[id][0],B.PNCounter[id][0])"]
B4["2.4 如果A.PNCounter[id][1]存在,且B.PNCounter[id][1]不存在,\n则newPNCounter[id][1] = A.PNCounter[id][1]"]
B5["2.5 如果A.PNCounter[id][1]不存在,且B.PNCounter[id][1]存在,\n则newPNCounter[id][1] = B.PNCounter[id][1]"]
B6["2.6 如果A.PNCounter[id][1]存在,且B.PNCounter[id][1]存在,\n则newPNCounter[id][1] = Max(A.PNCounter[id][1], B.PNCounter[id][1])"]
C["3.返回newPNCounter"]
B --> B1
B --> B2
B --> B3
B --> B4
B --> B5
B --> B6
B1 -->C
B2 -->C
B3 -->C
B4 -->C
B5 -->C
B6 -->C
代码如下
A.PNCounter = {
[A.id]: [1,0],
[B.id]: [1,1],
[C.id]: [2,0],
};
B.PNCounter = {
[A.id]: [1,0],
[B.id]: [2,1],
};

// 某个节点只能增加、减少自身PNCounter中id为自身的counter,比如A节点只能增加PNCounter[A.id]的counter
const inc = (id:string,value:number) => {
A.PNCounter[id][0] += value
}
const dec = (id:string,value:number) => {
A.PNCounter[id][1] += value
}
// 合并两个PNCounter
const merge = (PNCounterA:PNCounter, PNCounterB:PNCounter) => {
const newPNCounter = {}
const ids = new Set([...Object.keys(PNCounterA), ...Object.keys(PNCounterB)])
ids.forEach(id => {
PNCounterA[id] = [0,0]
// 我就不写这么多if else了,直接缩写
newPNCounter[id][0] = Math.max(PNCounterA[id][0] || 0, PNCounterB[id][0] || 0)
newPNCounter[id][1] = Math.max(PNCounterA[id][1] || 0, PNCounterB[id][1] || 0)
})
return GCounter
}
const newPNCounter = merge(A.PNCounter, B.PNCounter)
你按随意的顺序合并A.PNCounter, B.PNCounter, C.PNCounter,其结果都一致,即最终一致性。
QA
问:为什么不直接用两个GCounter来实现PNCounter?答:是可以的,比如
const PNCounter<positive: GCounter, negative: GCounter> = {
positive: {
[A.id]: 1,
[B.id]: 2,
[C.id]: 1,
},
negative: {
[A.id]: 1,
[B.id]: 2,
[C.id]: 1,
},
}
但是你会发现,inc、dec、merge的实现会比文章中的实现复杂很多,所以不推荐使用这种方式。
总结
能增加、减少的counter,且是一个CRDT,我们称之为PNCounter(全称Positive-Negative Counter)。