翻译自

https://blog.golang.org/concurrency-timeouts

Go并发范式:超时,继续执行

并发编程有自己的习惯用法。 超时是一个很好的例子。在商用软件开发时,所有操作都需要有超时。

虽然 Go 的channel不直接支持超时,但很容易实现。假设我们想从通道 ch 接收,但希望实现一秒钟超时。 我们可以创建一个信号channel并启动一个在通道上发送之前休眠的 goroutine

1
2
3
4
5
timeout := make(chan bool, 1)
go func() {
time.Sleep(1 * time.Second)
timeout <- true
}()

我们可以使用select语句从ch或者timeout中接收。如果过了1秒还没有数据返回,超时的case会被选中,尝试从ch读取的操作被放弃

1
2
3
4
5
6
select {
case <-ch:
// a read from ch has occurred
case <-timeout:
// the read from ch has timed out
}

timeout channel有1个buffer,允许超时 goroutine 发送到通道然后退出。 goroutine 不关心这个值是否被接收了。这意味着如果 ch 接收发生在超时之前,goroutine 不会永远挂起。timeout channel 最终会被gc释放。

(在这个例子中,我们使用 time.Sleep 来演示 goroutines 和通道的机制。在实际程序中,你应该使用 time.After来完成这个延迟发送)

让我们看看这种模式的另一种变体。在这个例子中,我们有一个程序可以同时从多个数据库的副本中读取数据。程序只需要一个结果,它应该接受最先返回的结果。

函数 Query 接受多个数据库连接和一个查询字符串。它并行查询每个数据库并返回它收到的第一个响应:

1
2
3
4
5
6
7
8
9
10
11
12
func Query(conns []Conn, query string) Result {
ch := make(chan Result)
for _, conn := range conns {
go func(c Conn) {
select {
case ch <- c.DoQuery(query):
default:
}
}(conn)
}
return <-ch
}

在这个例子中,闭包执行非阻塞发送,它通过在把send放在带有defaultselect中实现。 如果send不能立即完成,则将选择default情况。 非阻塞发送保证循环中启动的任何 goroutine 都不会挂起。

但是,这个例子中有一个问题,如果结果在主函数执行到11行接收的时候之前到达,发送都会失败,最终函数无法取得结果。

这个问题是所谓的竞争条件的教科书示例,修复非常简单。 我们只要保证channel有着缓冲通道(通过添加缓冲区长度作为 make 的第二个参数)来保证第一次发送有一个放置值的地方。 这确保发送总是成功,并且无论执行顺序如何,第一个值都会被获取。

这两个例子展示了 Go 可以简单地表达 goroutines 之间复杂的交互。

TLDR

大部分场景应该使用值传递。

正文

Go 的文档指出

1
Programs using times should typically store and pass them as values, not pointers. That is, time variables and struct fields should be of type time.Time, not *time.Time. A Time value can be used by multiple goroutines simultaneously.

翻译一下就是

程序使用times典型场景应该把它们当做值传递,而非指针。也就是说time变量及结构体field应该为time.Time,而不是*time.Time。Time值是协程安全的。之所以说typically,是因为有着那么不typically的场景,比如你想在方法内修改原来的time.Time对象这种场景(很少见)。

time.Time是一个较小的对象,把它作为值传递是完全合理的。传递指针而不是值的很重要的一个原因是避免昂贵的拷贝动作。但是传递指针也会给gc带来一些额外的开销,如逃逸分析等。

值得一提的是,你可以通过go build -gcflags="-m"来判断变量在堆中还是在栈中。

go-time-pass

time.Time在我的MBP上只有24个字节。附上几个简单类型的字节数

1
2
3
4
5
6
7
byte size is  1
int32 size is 4
int size is 8
int64 size is 8
float32 size is 4
float64 size is 8
time size is 24

测试程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package demo_base

import (
"fmt"
"testing"
"time"
"unsafe"
)

func TestSize(t *testing.T) {
{
var x byte = 0
fmt.Println("byte size is ", unsafe.Sizeof(x))
}
{
var x int32 = 0
fmt.Println("int32 size is ", unsafe.Sizeof(x))
}
{
var x int = 0
fmt.Println("int size is ", unsafe.Sizeof(x))
}
{
var x int64 = 0
fmt.Println("int64 size is ", unsafe.Sizeof(x))
}
{
var x float32 = 0
fmt.Println("float32 size is ", unsafe.Sizeof(x))
}
{
var x float64 = 0
fmt.Println("float64 size is ", unsafe.Sizeof(x))
}
{
t := time.Now()
fmt.Println("time size is ", unsafe.Sizeof(t))
}
}

前言

这是**icza**
在StackOverflow上的一篇回答
,质量很高,翻译一下,大家一起学习

问题是:在go,有没有什么最快最简单的方法,用来生成只包含英文字母的随机字符串

icza给出了8个方案,最简单的方法并不是最快的方法,它们各有优劣,末尾附上性能测试结果:

1. Runes

比较简单的答案,声明一个rune数组,通过随机数选取rune字符,拼接成结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package approach1

import (
"fmt"
"math/rand"
"testing"
"time"
)

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

func randStr(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}

func TestApproach1(t *testing.T) {
rand.Seed(time.Now().UnixNano())
fmt.Println(randStr(10))
}

func BenchmarkApproach1(b *testing.B) {
rand.Seed(time.Now().UnixNano())
for i := 0; i < b.N; i++ {
_ = randStr(10)
}
}

2. Bytes

如果随机挑选的字符只包含英文字母,我们可以直接使用bytes,因为在UTF-8编码模式下,英文字符和Bytes是一对一的(Go正是使用UTF-8模式编码)

所以可以把

1
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

用这个替代

1
var letters = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

或者更好

1
const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

现在我们有很大的进展了,我们把它变为了一个常数,在go里面,只有string常数,可并没有slice常数。额外的收获,表达式len(letters)
也变为了一个常数(如果s为常数,那么len(s)也将是常数)

我们没有付出什么代码,现在letters可以通过下标访问其中的bytes了,这正是我们需要的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package approach2

import (
"fmt"
"math/rand"
"testing"
"time"
)

const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

func randStr(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}

func TestApproach2(t *testing.T) {
rand.Seed(time.Now().UnixNano())

fmt.Println(randStr(10))
}

func BenchmarkApproach2(b *testing.B) {
rand.Seed(time.Now().UnixNano())
for i := 0; i < b.N; i++ {
_ = randStr(10)
}
}

3. Remainder

上面的解决方法通过rand.Intn()来获得一个随机字母,这个方法底层调用了Rand.Intn(),然后调用了Rand.Int31n()

相比于生成63个随机bits的函数rand.Int63()来说,Rand.Int31n()很慢。

我们可以简单地调用rand.Int63()然后除以len(letterBytes),使用它的余数来生成字母

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package approach3

import (
"fmt"
"math/rand"
"testing"
"time"
)

const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

func randStr(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = letters[rand.Int63() % int64(len(letters))]
}
return string(b)
}

func TestApproach3(t *testing.T) {
rand.Seed(time.Now().UnixNano())

fmt.Println(randStr(10))
}

func BenchmarkApproach3(b *testing.B) {
rand.Seed(time.Now().UnixNano())
for i := 0; i < b.N; i++ {
_ = randStr(10)
}
}

这个算法能正常工作并且非常快,不过它牺牲了部分精确性,字母出现的概率并不是精确一样的(假设rand.Int63()
生成63比特的数字是等概率的)。由于字母总共才52个,远小于 1<<63 - 1,因此失真非常小,因此实际上这完全没问题。

解释: 假设你想要05的随机数,如果使用3位的bit,3位的bit等概率出现07,所以出现0和1的概率是出现2、3、4概率的两倍。使用5位的
bit,0和1出现的概率是6/32,2、3、4出现的概率是5/32。现在接近了一些了,是吧?不断地增加比特位,这个差距就会变得越小,当你有63位地时候,这差别已经可忽略不计。

4. Masking

在上一个方案的基础上,我们通过仅使用随机数的最低n位保持均匀分布,n为表示所有字符的数量。比如我们有52个字母,我们需要6位(52 =
110100b)。所以我们仅仅使用了rand.Int63()的最后6位。并且,为了保持所有字符的均匀分布,我们决定只接受在
0..len(letterBytes)-1的数字即0~51。(译者注:这里已经没有第三个方案的不准确问题了)

最低几位大于等于len(letterBytes)的概率一般小于0.5
(平均值为0.25),这意味着出现这种情况,只要重试就好。重试n次之后,我们仍然需要丢弃这个数字的概率远小于0.5的n次方(这是上界了,实际会低于这个值)。以本文的52个字母为例,最低6位需要丢弃的概率只有
(64-52)/64=0.19。这意味着,重复10次,仍然没有数字的概率是1*10^-8。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package approach4

import (
"fmt"
"math/rand"
"testing"
"time"
)

const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

const (
// 6 bits to represent a letters index
letterIdBits = 6
// All 1-bits as many as letterIdBits
letterIdMask = 1 <<letterIdBits - 1
)

func randStr(n int) string {
b := make([]byte, n)
for i := range b {
if idx := int(rand.Int63() & letterIdMask); idx < len(letters) {
b[i] = letters[idx]
i++
}
}
return string(b)
}

func TestApproach4(t *testing.T) {
rand.Seed(time.Now().UnixNano())

fmt.Println(randStr(10))
}

func BenchmarkApproach4(b *testing.B) {
rand.Seed(time.Now().UnixNano())
for i := 0; i < b.N; i++ {
_ = randStr(10)
}
}

5. Masking Improved

第4节的方案只使用了rand.Int63()方法返回的64个随机字节的后6位。这实在是太浪费了,因为rand.Int63()是我们算法中最耗时的部分了。

如果我们有52个字母,6位就能生成一个随机字符串。所以63个随机字节,可以利用63/6=10次。

译者注:使用了缓存,缓存了rand.Int63()方法返回的内容,使用10次,不过已经并不是协程安全的了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package approach5

import (
"fmt"
"math/rand"
"testing"
"time"
)

const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

const (
// 6 bits to represent a letter index
letterIdBits = 6
// All 1-bits as many as letterIdBits
letterIdMask = 1<<letterIdBits - 1
letterIdMax = 63 / letterIdBits
)

func randStr(n int) string {
b := make([]byte, n)
// A rand.Int63() generates 63 random bits, enough for letterIdMax letters!
for i, cache, remain := n-1, rand.Int63(), letterIdMax; i >= 0; {
if remain == 0 {
cache, remain = rand.Int63(), letterIdMax
}
if idx := int(cache & letterIdMask); idx < len(letters) {
b[i] = letters[idx]
i--
}
cache >>= letterIdBits
remain--
}
return string(b)
}

func TestApproach5(t *testing.T) {
rand.Seed(time.Now().UnixNano())

fmt.Println(randStr(10))
}

func BenchmarkApproach5(b *testing.B) {
rand.Seed(time.Now().UnixNano())
for i := 0; i < b.N; i++ {
_ = randStr(10)
}
}

6. Source

第5个方案非常好,能改进的点并不多。我们可以但不值得搞得很复杂。

让我们来找可以改进的点:随机数的生成源

crypto/rand的包提供了Read(b []byte)的函数,可以通过这个函数获得需要的随机比特数,只需要一次调用。不过并不能提升性能,因为
crypto/rand实现了一个密码学上的安全伪随机数,所以速度比较慢。

所以让我们坚持使用math/rand包,rand.Rand使用rand.Source作为随机位的来源,rand.Source是一个声明了Int63() int64
的接口:正是我们在最新解决方案中需要和使用的唯一方法。

所以我们不是真的需要rand.Randrand.Source包对于我们来说已经足够了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package approach6

import (
"fmt"
"math/rand"
"testing"
"time"
)

const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

var src = rand.NewSource(time.Now().UnixNano())

const (
// 6 bits to represent a letter index
letterIdBits = 6
// All 1-bits as many as letterIdBits
letterIdMask = 1<<letterIdBits - 1
letterIdMax = 63 / letterIdBits
)

func randStr(n int) string {
b := make([]byte, n)
// A rand.Int63() generates 63 random bits, enough for letterIdMax letters!
for i, cache, remain := n-1, src.Int63(), letterIdMax; i >= 0; {
if remain == 0 {
cache, remain = src.Int63(), letterIdMax
}
if idx := int(cache & letterIdMask); idx < len(letters) {
b[i] = letters[idx]
i--
}
cache >>= letterIdBits
remain--
}
return string(b)
}

func TestApproach6(t *testing.T) {
fmt.Println(randStr(10))
}

func BenchmarkApproach6(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = randStr(10)
}
}

注意到这里我们没有使用种子初始化rand了,取而代之的是初始化了rand.Source

还有一件需要注意的事,math/rand的文档指出

1
默认的Source是协程安全的

所以默认的Source比通过rand.NewSource()创建出来的Source要慢。不用处理协程并发场景,当然慢啦。

7. 使用 strings.Builder

之前的解决方案都返回了通过slice构造的字符串。最后的一次转换进行了一次拷贝,因为字符串是不可变的,如果转换的时候不进行拷贝,就无法保证转换完成之后,byte
slice再被修改后,字符串仍能保持不变。

Go1.10引入了strings.Builder,这是一个新的类型,和bytes.Buffer类似,用来构造字符串。底层使用[]byte
来构造内容,正是我们现在在做的,最后可以通过Builder.String()方法来获得最终的字符串值。但它很酷的地方在于,它无需执行刚才谈到的复制即可完成此操作。它敢这么做是因为它底层构造的
[]byte从未暴露出来,所以仍然可以保证没有人可以无意地、恶意地来修改已经生成的不可变字符串。

所以我们的下一个想法不是在slice中构建随机字符串,而是在 strings.Builder 的帮助下,一旦我们完成,我们就可以获取并返回结果,而无需复制。
这可能在速度方面有所帮助,并且在内存使用和分配方面肯定会有所帮助(译者注:等会在benchmark中会清晰地看到)。

对于物联网平台来说,规则引擎是其中一个很重要的功能,也叫消息流转功能,将消息流转到各类中间件、云产品中。在华为、AWS、Azure、阿里这四个物联网平台中,阿里不支持流转到S3/类S3存储中。本文对比一下华为云、AWS、Azure把设备消息流转到S3/类S3存储的功能

参考资料

华为云

规则粒度和限制

  • 规则配置粒度到OBS

  • 限制单用户配置100条规则,每个规则10个Action

功能实现

针对华为云,我测试了设备的消息上报转发到华为云OBS的功能。

流转规则需要指定obs桶,随后运行之后,华为云OBS体现为

  • 设备的每条消息都会在obs中存储为一个文件
  • 名称采用deviceId+毫秒级时间戳+后面4位数字

关键路径截图

配置规则时指定到obs桶

image-20210614184348066

单条消息单个文件

image-20210614184522958

优势

可以非常轻易地查询出单个设备的消息,因为文件名携带有毫秒级时间戳,还可以指定具体

劣势

用来做MapReduce的话,文件数目太多,由于S3云厂商往往通过API调用次数收费,不仅是速度,成本也会很高。

AWS

规则粒度和限制

  • 规则配置粒度到桶及Key,相当于华为云OBS桶+文件名
  • 限制规则每秒进行20k次运算
  • 限制最多拥有1000条规则
  • 限制每个规则最多10个action

功能实现

再次上报数据触发规则会把obs中的数据替换。(通过版本控制可以获取到老的数据)

关键路径截图

配置规则指定粒度到Key

image-20210614165708645

仅有一个Key,新值覆盖旧值

image-20210614170447237

优劣势

AWS的这个模式很适合存储每个设备的最新数据。不过由于规则数量上的限制,最多只能在S3上存储1000个键值对。可用性较低。可用于数量小于1000的设备,存储、查询最新数据。

Azure转发

规则粒度限制

  • 规则配置粒度到存储容器
  • Azure可配置存储入存储容器的批量频率和大小限制
  • 编码支持Avro和Json两种格式
  • 最多100条路由

功能实现

自上报事件,到存储中出现数据,azure是最慢的,azure做了批量的缓冲,达到batch的大小和时间要求后才会写入存储。

关键路径截图

配置路由规则

image-20210614181514437

存储中批量数据

因为选择了avro格式,所以vim打开是乱码,不过明显可以看到是多条数据

image-20210614181738380

优势

Azure的这种方式,比较适合做MapReduce类操作,相对华为云来说,Azure的文件数量大大减少,如果用于做MapReduce这类操作,因为文件碎片小,作业速度会比华为云快,而且由于云厂商对存储,通常以api调用次数收费,价格也会比华为云低。

劣势

不易针对单个设备进行查询。

需要了解的概念

  • DNAT映射:网络报文,目的地址转换。
  • 胖客户端:像Cassandra 客户端、Kafka客户端,客户端感知中间件拓扑,即客户端感知中间件部署、均衡,把请求发给指定的中间件服务器。
  • VPC:用户的私有网络环境。
  • VPC Peering:多个网段不冲突的VPC之间网络打通的方式,可跨用户。
  • 三方厂家:非公有云厂商的PaaS供应商。
  • 中间件用户:需要在公有云上运行中间件的公有云用户。

前言

现在,在公有云上买rediskafka这类组件已经变得非常普遍,由公有云提供的中间件往往能给你带来良好的体验,现在中间件在公有云上的交付模式大致分为三种。我对
Pulsar较为熟悉一些,大部分图例以消息中间件Pulsar举例。本文中成本指用在云服务或三方厂商上的成本,指人力成本时,会注明人力成本。

image-20210424145257826

公有云模式

公有云提供的中间件有得天独厚的优势,主要优势在以下几点

低廉的成本

  • 和三方厂家相比,对中间件用户来说,云厂商自运营的价格要低于三方厂家。
  • 和租户自运维相比,对中间件用户来说,云厂商价格略高一点点,但省去了大量的运维人力。

使用方便

云厂商可以把中间件的ip地址申请在你的vpc内,对任何应用程序来说,连接都是最方便的。无论是容器化部署、虚拟机部署、还是本vpc和其他vpc
peering打通的场景,都可以通信。

监控、运维系统对接

得力于云厂商的积累,能够方便地和云厂商的告警、统计系统对接,接收告警通知和报表等。甚至可以在手机app上查看云中间件的监控指标。

安全

  • 公有云有专门的团队时刻关注中间件安全漏洞,及时打上补丁
  • 有些白帽组织会在发现漏洞并公开发布前通知一些大的使用者,大的厂家可以在漏洞公布前提前预防

主要的劣势有

  • 云厂商提供的中间件有限,不是所有中间件的所有版本都支持
  • 即使业务有强烈诉求,也很难在中间件上做定制修改

租户自运维模式

租户自运维,就是用户自己部署中间件维护,仅利用云上的基础设施。主要的优势有

  • 可以在中间件上做定制修改
  • 可按任意规则进行部署,节约成本
  • 可部署任意版本中间件,比如代码一定要Kafka 0.11版本,公有云上该版本已停止售卖,代码不能改,那就只好自己部署Kafka的该版本了。

主要的劣势有

  • 和运维系统对接,需要人力成本。因为业务本身也需要对接运维系统,这部分人力成本较低,常见的困难点在于中间件的技术栈和团队不符(如Java团队尝试在go语言开发的中间件上对接运维系统)。
  • 中间件的网上运维、问题处理需要很大的人力成本,且对人员技术水平要求高。
  • 安全问题、漏洞,可能处理不及时,存在安全隐患。

三方厂家模式

三方厂家的主要优势有

  • 安全性方面,如果是社区主导者,有些白帽组织可能会提前告知发现的漏洞
  • 如果是社区的主导者,可以将您的新的合理需求优先级提高开发。

主要劣势

  • 运维上无法和公有云打通,三方厂家需要自己构建一套运维系统并通知到客户,对客户来说,也要适配三方厂家的运维方案。
  • 成本比公有云模式高
  • 易用性上面,网络是三方厂家的痛点,这点值得单独在下一小节详细描述

三方厂家网络模式

三方厂家最难做的就是,如何在厂家用户中间件用户隔离的情况下,提供中间件的接入点,可行的方式有以下几种

VPC Peering打通模式

Peering可以说是最简单的网络打通模式,可以给用户不错的私网体验(如果走公网,数据至少要是tls加密的,不然会有数据在公网上被人监听的风险)。只需要跟用户规划一个不冲突的网段。

image-20210424110409903

这个网段不需要很大,一般26、27就可以满足要求,27的网段已经可以部署30台虚拟机了,值得一提的是,这种打通模式下,一般不会选择容器化部署,都是虚拟机部署,原因有二

  • 如果客户也使用k8s容器化部署,两个容器化集群互通,对网设的要求很高,限制很大。
  • 仅仅部署一个中间件的话,加上k8s集群的部署,成本占比太高

VPC Peering的主要挑战有

  • 自动化:自动化较为困难,需要考虑如何获取尽量少的客户权限,自动化完成整个流程。
  • 成本:Peering的模式下,三方厂商几乎只能针对每个用户的集群单独Peering,很多地方无法均摊成本。想要客户购买,需要对客户有很强的吸引力。
  • 网络规划受限,路由规则配置复杂:如果客户不仅仅和三方厂家建立了Peering关系,又和其他VPC建立了Peering关系,这三方网络不能冲突,还要互相配置路由规则。这需要进行细致的网络规划,毕竟网络一旦冲突,解决的代价很大。

LB接入模式

LB接入模式,在三方厂家模式下,只能通过公网的方式接入,一般给用户提供域名或ip地址接入,域名使用的较多,像华为云设备接入服务、Azure
物联网服务等都是提供域名,主要的原因

在EIP资源有限的情况下,将部分用户的域名解析成一个地址,既能节约EIP资源,也能在一些基础设施上共用资源,节约成本(如共k8s、共数据库、共Bookkeeper等)

image-20210507194200489

LB接入的限制是,中间件需要支持可负载均衡访问,如果中间件的客户端是有状态的胖客户端(如Pulsar、Cassandra),需要中间有一层无状态的代理。

华为云VPCEP模式

VPCEP,VPC终端节点,是华为云的云服务,可以将一个VPC的地址,映射到另一个VPC的地址,在客户端的视角来看,本质上是做了一次DNAT地址映射。

image-20210424230158506

VPCEP的主要限制是连接中间件经过了DNAT映射。需要对应中间件支持DNAT的方式访问。另一方面其他云上不一定有该能力。三方厂商一般也希望做一个统一的方案来适配所有云厂商,所以目前使用的并不是很多。

三方厂商模式的劣势主要是

  • 公网的方式下,时延高,难以满足有低时延要求的客户,且带宽费用也占比很大
  • Peering模式,成本高,费用贵

并且如果公有云厂商没有提供新的用于用户和用户之间打通网络的方案,那么对三方厂商来说,在接入方面,恐怕很难再有所突破了。

注:这里的网络打通三种方式不限于三方场景和用户之间,还可以扩展到用户和用户之间网络打通进行通信,这里不再扩展了。

建议选型流程

中型和长尾用户可按照下图流程进行选型。

对于大型用户,可能会维护自己的核心中间件,紧密联系自己的业务,在上面做定制开发,不但降低成本,并且提升业务竞争力。非核心中间件,也可参照下图流程进行选型。

image-20210507195340204

总结

公有云 三方厂商 自建
机器成本
处理测试、网上问题工作量
运维系统对接开发量
接入灵活性
安全

大型系统中的证书管理

随着安全的要求,现在我们在越来越多的通信中使用TLS加密。下图是一个微服务架构下数据流向的例子

cert-manager1

  • 蓝色部分,即和三方交互时需要TLS加密认证
  • 红色部分,各个微服务、消息中间件等通信需要TLS加密认证
  • 绿色部分,各个微服务和存储层通信也需要TLS加密认证

安全上对我们的要求逐步变化为,仅蓝色使用TLS=》蓝色和红色使用TLS=》全部使用TLS加密

证书管理的必要性

从安全的角度上来说,我们最好能支持证书的更换和热加载。如果您的业务当前使用加密的场景不多,可能暂时看不到证书管理的意义。但是当你在各个方面使用TLS更加频繁之后,会发现证书管理可带来如下好处:

  • 可以通过抽象出场景,通过场景和证书的关联联系,在各个地方通信使用的证书,可以统一更换。
  • 统一提供证书过期告警等功能
  • 统一提供证书的变更通知,通知到各个实例

以我在工作中接触到的两个基础PAAS平台,都有证书管理的功能,可见证书管理的必要性。

PS: 开源组件大多都拥有证书配置能力,没有可对接证书管理的能力,但这个能力很难贡献给社区,需要自己开发。

证书管理概念

在TLS会话中,从依赖的证书文件角度来看,可以分为加密流程和验证流程。

加密证书

TLS加密流程的证书,包含证书链文件和密钥

验证证书

TLS验证流程的证书,仅包含证书链文件

拆分为加密流程和验证流程的合理性

这使得加密流程证书和验证流程证书可以互相独立的替换,更方便在大型场景下复用证书。

让我们来假设如下的场景:

cert-manager2

客户A、客户B、客户C、客户D的验证流程证书自然不相同,但服务跟客户交互的时候,使用的加密流程证书确实同一份。如果将两个阶段的证书合一,那么在更换证书的时候,就需要更新4份数据,当你有1000名用户的时候,这个数字将会是1000,这对于存储和应用程序来说都是不小的冲击。

Scene

Scene是在一个会话中,代表会话和请求证书、验证证书的绑定关系。Scene和请求证书、验证证书都是1:1的关系。这使得我们不仅仅可以修改证书文件,也可以对TLS会话中使用的证书进行修改。在证书无法复用,且证书绑定了多个场景的时候,针对单个场景修改其绑定的证书。

以上图作为例子,假设客户D有特殊的要求,要求加密流程使用特定的证书或密钥,我们就可以将客户D的场景绑定到客户D独有的加密证书

多集群管理

如果证书管理需要管理多个集群,那么证书和Scene前面可以加上层级来隔离,如环境、集群等。

对小型系统的建议

如果规模不大,且TLS场景有限,需要考虑一下有无拆分加密证书和验证证书的必要,可以合一,应用程序直接以合一的证书id来关联,而非场景id。虽不方便复用,但大大降低了复杂性。

证书管理的功能

cert-manager3

证书管理场景

设定一个TLS会话

cert-manager4

使用TLS会话

这要求应用程序持久化场景信息

cert-manager5

组织架构相关

大型系统下,证书管理是一个必须的组件,且一定是由团队最底层的组织架构承接。如若不然,那么由底层组织架构维护的组件,因为依赖关系,无法基于证书管理来统一实现证书的更换和过期告警。除非不基于证书管理自己构筑一套能力。

TLDR

随着组件和使用加密场景的不断扩大,证书管理是一个必须的组件,通过抽象出场景的概念来复用证书,通过变更通知在微服务模式下快速更换所有微服务实例上的证书,并提供统一的证书过期告警功能来提醒管理员更换证书。

前言

之前在InfoQ的《华为云物联网四年配置中心实践》文章中分享了业务配置中心。

本文讲述业务配置中心(下文简述为配置中心)的关键技术和实现方式。华为云物联网平台按照本文的实现方式实现了一个业务配置中心,该配置中心2020年1月上线,平稳运行至今。

概念

运维配置

和用户无关,通常为集群界级别的配置,程序只会进行读取,如数据库配置、邮箱服务器配置、网卡配置、子网地址配置等。

业务配置

作为SaaS 服务,每个用户在上面都有一些业务配置。如用户的证书配置、用户服务器的流控配置等,这些业务配置相对运维配置来说更加复杂,且可能会有唯一性限制,如按用户 id 唯一。这部分配置数据一般由用户操作触发,代码动态写入,并且通知到各个微服务实例。通常,我们希望这些配置能在界面展示,且支持人为修改。上述逻辑如果由各微服务自己实现,会存在大量重复代码,并且质量无法保证。我们希望由一个公共组件来统一实现这个能力。开源或体量较小的项目就不会选择依赖一个配置中心,而是直接通过连接数据库或etcd来解决问题

env

代表一个部署环境。

cluster

代表环境下的集群。常见于单环境下蓝绿发布,蓝集群、绿集群、金丝雀集群等。

配置

配置名称,如用户证书配置、用户流控配置等。

Key

配置的唯一键,如用户id。

Value

配置唯一键对应的值。

配置中心设计梗概

业务配置特点

  • 虽然业务配置写入可能存在并发,但并发量不大,频率较低。
  • 业务配置常常以用户为id,单集群用户量有限,一般不超过5万。

配置中心要解决的问题

business-config-center-impl1

设计要点

  • 单配置要求有配置id,每个id上通过version的乐观并发控制来解决多版本冲突问题
  • 通知不追求可靠,应用程序和配置中心断链无法接收通知的场景下,通过定期同步数据来保证数据的可靠
  • 支持Schema的变更,因Schema变更不频繁,也采用version的乐观并发控制来解决多版本冲突问题

通知是否包含消息内容

我认为应该只通知Key,具体的数值让应用程序再去配置中心查询。仅通知Key实现简洁易懂。同时通知Key&Value需要多考虑定期同步和通知两条通道并发,可能引起的竞态冲突。

配置中心业务流程

本小节描述业务配置中心的所有业务流程,并试图从交互中抽象出与具体实现无关的接口

配置的增删改查

business-config-center-impl2

配置值的增删改查

business-config-center-impl3

定期同步

分布式场景下,通知有可能无法送达,如程序陷入网络中断(或长gc),通知消息送达超时,待程序恢复后,数据不再准确。因此需要对数据做定期同步,提高可靠性。

business-config-center-impl4

同步过程中,仅仅请求交互id和version,避免传输大量数据。应用程序接收到需要同步的数据后:

  • 删除操作,触发删除通知,从本地缓存中移除数据。
  • 添加、修改操作,向配置中心查询最新数据,触发通知并写入本地缓存。

服务启动

服务启动也可看做是一个同步的流程,只是需要同步大量的数据添加。为了避免向配置中心频繁大量的请求,引入批量操作来减轻压力

business-config-center-impl5

限制

该配置中心设计思路依赖客户端可把数据全量放入到内存中,如用户量太大,则不适合采用这种模式。

注:一个节省内存的思路是,内存中只放置全量的id和version,数据只有当用到的时候再去查询。这个思路要求配置中心持久化一些老旧数据以供以下场景的查询使用

  • 业务流程中,需要使用该配置值的。

  • 回调业务程序修改的时候,需要提供旧值的。

除此之外没有任何区别。

业务配置抽象实现

从上述描述的业务场景,我们抽象出业务配置中心的交互接口和抽象实现。接口的Swagger Yaml已上传到Github:https://gist.github.com/hezhangjian/68c9c2ecae72cc2a125184e95b0a741e

配置相关接口

  • 提供env、cluster、配置名称、配置Schema、配置版本号添加配置
  • 提供env、cluster、配置名称删除配置
  • 提供env、cluster、配置名称、新Schema、新Version来修改配置
  • 提供env、cluster、配置名称来查询配置

配置值相关接口

  • 提供env、cluster、配置名称、Key、Value来添加配置值
  • 提供env、cluster、Key、ValueVersion(可选)来删除配置值
  • 提供env、cluster、Key、Value、ValueVersion(可选)修改配置值
  • 提供env、cluster、Key查询配置值
  • 根据env、cluster、应用程序当前的配置数据来做定期同步
  • 根据Key列表批量查询配置值

通知相关接口

  • 通知某env某cluster下,配置项中的一个Key发生变化,新增、修改或是删除。可选方式有HTTP长链接(Inspired by Apollo)、Mqtt、WebSocket等。

配置中心存储层抽象实现

配置中心存储层需要存储配置配置值数据,支持UpdateByVersion,且需要捕捉数据的变化,用来通知到应用程序

服务发现抽象实现

为了使应用程序连接到配置中心,需要一个发现机制可以让应用程序感知到配置中心的地址。高可用的方式很多,如K8s发现、ZooKeeper、Etcd、ServiceComb、业务环境变量注入ELB地址(ELB后端挂载配置中心的地址)等。

抽象总结

business-config-center-impl6

根据这个抽象,我们可以进行关键技术点选型,来实现业务配置中心。

配置中心实现

华为云物联网配置中心实现

business-config-center-impl7

  • env+cluster+config组成数据表的名称
  • 一个key、value对应一行数据

另一种实现方式

只要实现上述接口和抽象能力,都可以实现业务配置中心,也可以这么实现

business-config-center-impl8

  • env+cluster+config+key 组合成etcd的key
  • 一个key、value对应一个键值对

又一种实现方式

当然也可以

business-config-center-impl9

  • env+cluster+config+key 组合成RocksDB的key
  • 一个key、value对应一个键值对

一句话结论,可以在拷贝镜像文件的时候,通过如下命令指定user来压缩dockerfile的体积,避免把指定的文件在dockerfile中计算两次。

为什么要指定User?

  • 往往,我们会因为安全的要求,不允许使用root用户运行程序。
  • 像ElasticSearch这个开源组件要求不能用root用户运行,其实也是出于安全的原因
1
COPY --chown=sh:sh source /opt/sh

效果展示

先使用dd命令创建1GB的测试文件

1
dd if=/dev/zero of=testfile bs=1024 count=1048576

测试基础镜像ttbb/base:latest,大小439MB

1
2
docker images|grep 'ttbb/base'|grep latest
ttbb/base latest bacdb9e7b5f4 2 weeks ago 439MB

优化前DockerFile

1
2
3
4
5
FROM ttbb/base

COPY testfile /opt/sh/testfile

RUN chown -R sh:sh /opt/sh/testfile

大小

1
1280d315e09d        31 seconds ago      2.59GB

可以看到testfile计算了两次,大小达到了2G多。

优化后DockerFile

1
2
3
FROM ttbb/base

COPY --chown=sh:sh testfile /opt/sh/testfile

大小

1
115b68bc4db8        21 seconds ago       1.51GB

testfile仅计算一次,仅使用1.5G。

Raft主要使用了重叠的大多数技术来保证算法的安全

Raft首要追求的是可理解性

Raft使用数个技术来提升可理解性。包括

  • 问题分解:主备选举、日志复制、安全性
  • 尽量减少状态空间(相比Paxos,Raft减少了不确定性)

Raft新颖的特性

强leader

日志文件只单向传输,简化状态

leader选举

Raft使用随机定时器来选举leader。只添加了很小的机制,却能简单、快速解决冲突

Membership变更

Raft的Membership变更机制使用joint consensus方法,在变更过程中,两个不同配置的大多数 重叠。这使得在集群成员变更时,也能正常处理请求

复制状态机

补图

保证复制状态机的一致,也就保证了数据的一致

一致性算法拥有如下的典型属性

  • 在非拜占庭场景下,保证了正确性。包括 网络延迟、分区、丢包、乱序等
  • 当大多数节点在线的时候,功能可用
  • 不依赖时间来保证日志的一致性。错误的时钟和极大地消息延时,在最差的场景下,可能会导致一致性问题
  • 在最常见的场景下,当一轮大多数节点反悔的时候,就能完成一个命令。小部分节点响应缓慢并不影响系统的整体性能。

Raft协议

简述

Leader选举

新的leader必须在已存在的leader宕机后选出

日志复制

leader必须从客户端哪里接收日志请求,复制到整个集群,迫使其他人达成一致

安全

Raft的安全属性关键。如果任何服务器将一个entry log复制到状态机中,那么其他任意服务器都不能在相同的log index上放置不同的命令。

安全的详细内容

Election Safety 选举安全

每一个任期内,至多只会有一个leader

Leader仅追加

leader不会覆写或者删除已存在的entry,只会追加新的entry(todo 待确认,是写入的,还是commit的,从leader可能是一个老的节点来说,这里应该说commit的更为恰当)

Log Matching 日志匹配

如果两个日志具有相同的任期值和相同的index,那么直到这个index之前的日志都是一样的

Leader Completeness Leader完整性

如果一个日志在一个给定任期内提交了,那么这个日志会一直存在,存在在任何高任期的Leader之中

State Machine Safety 状态机安全

如果服务器已在其状态机上将给定索引的日志条目应用于其状态机,则其他服务器将永远不会对同一索引应用不同的日志条目

Raft基础

Raft将时间切分成任期时长间隔的任期。Raft保证一个任期内至多只有一个Leader。任期可以称为是Raft中的逻辑时钟。每个服务器之间都会互相传播任期值。

Leader选举

Raft使用随机的选举时间来保证分裂投票场景少见并快速解决。将选举超时设定为一个范围。

Raft的作者们考虑过使用不同的Rank值,当分裂投票的时候,Rank值高的优先成为主节点,但在可用性方面有细微的问题。Rank值低的节点需要超时才能成为新的leader,这个时间间隔如果太短,会破坏已有的选举,集群太过敏感)

最终认为随机的措施更明显、更易懂

日志复制

Leader来决定何时将日志提交到状态机是安全的,叫做committed提交。Raft保证提交过的entry都是持久化的,然后最终会被所有的状态机执行。

只有当前的Leader在任期内,然后将其复制到大多数节点,才算做committed!(这里有和仅仅复制到大多数节点有着重要的区别)然后这里会将之前的日志提交。

Leader每次发送AppendEntries RPC请求时,确认在这之前的日志和从节点完全相同。

Raft可以accept、replicate、应用新的日志记录。在正常场景下,经过一轮大多数RPC调用,就可以复制完成。

安全

假如,当leader提交数个日志的时候,follower不可用,然后他当选了leader之后,提交的日志把之前提交的日志覆盖了怎么办?

这里在选举当选leader的上面加了个限制,保证了之后的leader包含了之前所有已提交的entry。

选举限制

Raft使用投票阶段来防止一个没有之前提交过日志的候选者当选leader。候选者必须联络大多数节点才能当选,这就意味着提交过的entry一定在其中的一个服务器中。

提交之前任期的entry

leader不能立刻得出结论:之前任期的日志复制到大多数节点就已经算commit了。

image-20210325211802223

  • a S1是leader,然后部分复制了日志2
  • b S1宕机,S5接受了S3和S4的投票当选了任期3的leader,在index2接受了不同的entry
  • c S5宕机,S1重启,当选了leader,继续复制
  • d S1宕机,S5重启,然后用任期3的日志覆盖了其他节点
  • e 然而如果S1在宕机前,把日志覆盖到大多数节点,那么S5就不能当选leader了

为了避免上图的问题,Raft绝不将复制的数量当作commit 日志的依据。只有当前任期下的entry log通过复制数量来计算。一旦当前任期的entry被提交,那么之前所有的entry都被间接commit了。

安全性保证

我们用反证法证明一旦Leader Completeness Property没有满足,我们就会推断出一个矛盾。假设任期T的Leader提交了一个log entry在任期T,但是这个log entry没有被将来一些任期的Leader拥有。假设有一个没有包含这个entry的最小的任期U的Leader,Leader U没有存储这个entry

    1. 在选举的时候,提交的entry必须不在leader U的日志中(leader从不删除或复写日志)
    1. Leader将这个entry复制到了集群中的大多数节点,并且leader U接收到了集群中大多数节点的投票。至少有一个服务器,即从leader T哪里接受了entry,并且给U投票。这是达成矛盾的关键
    2. voter 必须在接收leaderT的entry之前给U投票。否则它就要拒绝T的写入请求
    3. 当voter给U投票的时候,它始终持久化着这个日志,因为每个中间的leader都包含这个entry,leader不会删除这个entry,除非冲突,follower也不会删除这个entry
    4. voter给U投票,所以U的日志必须至少和voter的一样新,这就达成了第一个冲突
    5. 首先假设,如果voter和U都有同样的上一次log的任期,U的日志至少和vote一样。矛盾,因为最初假设U没有这个log,而voter有。
    6. 否则,leader U的上次log任期比voter的大。此外,它比T大,因为选民的上一个log term至少为T(其中包含来自T的提交entry)。 创建leaderU的最后一个log term的较早的领导者必须在其日志中包含已提交的条目(通过假设)。 然后,通过Log Matching Property,leaderU的日志还必须包含已提交的条目,这是矛盾的
    7. 这就完成了矛盾的证明。比T任期大的leader一定包含了任期T内提交的entry

时间和可用性

Raft可以选举并维持一个稳定的leader,只要系统满足如下的时间限制条件

1
broadcastTime << electionTimeout << MTBF

broadcastTime是进行一个并行rpc到所有服务器来回的平均时间。MTBF是单个服务器故障的平均时间。

广播时间必须比选举时间小一个量级,所以leader放心的发送心跳消息,维护自己的follower。加上随机的选举时延,这个不等式也让选票分裂变得不可能。 如果广播时间和选举时间差不多,选举leader不稳定。

选举超时应该比MTBF小几个数量级,要不然选举的leader就不稳定。
broadcast的时间差不多在0.5ms到20ms
选举超时应该在100ms到500ms。
典型的服务器MTBF时间应该在数月或以上

集群成员变更

成员变更的时候,中途必须没有两个相同任期的leader。不幸的是,任何将服务器们直接从老配置变换到新配置都是不安全的。不可能一次性地原子性地把所有服务器的配置变更,所以中集群在中途可能会分裂为两个多数派。
为了保证安全性,集群成员变更必须使用两阶段的方式。有很多种方式实现两阶段提交。例如,一些系统使用第一次提交来禁用旧的配置,使得旧的配置无法接受客户端的请求,然后第二次操作启动新的配置。在Raft中,集群首先切换到一个过度的配置,叫做joint consensus。一旦joint consensus被提交,系统接下来过渡到新的配置。joint consensus结合了新老配置

  • Log entry在两种配置下都会复制。即新机器和老机器都会复制entry
  • 不管是老配置还是新配置,都有可能当选leader
  • Agreement(协议,包括选举和entry提交)需要老配置和新配置多数派都确认
    补图 Figure11
    集群配置通过复制日志中的特殊entry来进行存储、通信。

上述流程有三个问题
第一个问题是,新的服务器可能初始没有存储任何log entry。如果现在添加到集群中,会花费一些时间来跟上集群的数据,这中间有可能无法commit新的log entry。为了避免可用性的gap。Raft在配置变更之前引入了一个额外的阶段,新的服务器首先
第二个问题是,cluster的leader可能不是新配置中的服务器。这个场景,leader的变化发生在新配置提交的时候。
第三个问题是,移除的服务器可以打乱整个集群。这些服务器接收不到心跳,这些服务器会超时然后启动新的选举。他们将使用新的任期发送RequestVote RPC,会导致当前的leader变为follower。新的leader最终会被选举,但是移除的服务器将会再次超时,重复整个过程,最终导致集群较差的可用性。
为了解决这个问题,Server当认为有leader存在的时候,会忽略RequestVote请求。如果服务器在选举超时前接收到RequestVote RPC请求,它并不会更新它的任期或是给予它的投票。这并不影响正常的选举(每个服务器在选举之前等待最小超时时间)。并且,这有助于避免移除的server破坏选举:如果一个leader可以发送心跳到他负责的集群中的大多数节点,他将不会被更高任期的节点罢免。

成员变更过程中如果发生Failover,老Leader宕机, Cold,new中任意一个节点都可能成为新Leader,如果新 Cold,newLeader上没有 日志,则继续使用Cold ,Follower上如果有 Cold,new 日志会被新Leader截断,回退到 Cold,成员变更失败;如果新Leader上有 Cold,new日志,则继续将未完成的成员变更流程走完。

新成员先加入再同步数据,成员变更可以立即完成,并且因为只要大多数成员同意即可加入,甚至可以加入还不存在的成员,加入后再慢慢同步数据。但在数据同步完成之前新成员无法服务,但新成员的加入可能让多数派集合增大,而新成员暂时又无法服务,此时如果有成员发生Failover,很可能导致无法满足多数成员存活的条件,让服务不可用。因此新成员先加入再同步数据,简化了成员变更,但可能降低服务的可用性。

新成员先同步数据再加入,成员变更需要后台异步进行,先将新成员作为Learner角色加入,只能同步数据,不具有投票权,不会增加多数派集合,等数据同步完成后再让新成员正式加入,正式加入后可立即开始工作,不影响服务可用性。因此新成员先同步数据再加入,不影响服务的可用性,但成员变更流程复杂,并且因为要先给新成员同步数据,不能加入还不存在的成员。

日志压缩

每当有新的操作发生的时候,Raft的日志就会增长,然而在实际的系统中,日志并不能无边界地增长。
快照是最简单的压缩日志的方式。在快照中,整个系统的状态写入到持久化存储的快照中,然后在这之前的日志都可以丢弃。
todo 补图
其他方式,像日志清理或lsm树。在数据的一部分子集上面执行,它们均摊了压缩日志的消耗。

Leader创建snapshot,再分发给follower。有如下两个缺点
第一,Server必须选择何时进行快照,如果服务器快照进行地太频繁,将会浪费磁盘带宽和磁盘energy。如果快照太不频繁,会浪费磁盘的存储空间,然后增加了重放日志所需的时间。如果阈值设置地大,时间周期长的话,磁盘开销小。
第二,写快照会消耗较大的时间,我们不希望这个操作延迟了正常的操作。方案是使用Copy on write技术,这样子在不影响snapshot写入的情况下,集群可以接受新的更新。

客户端的交互

Raft实现了线性化的语义。Linearizable semantics。像es那样使用version,是达不到线性化的语义的。
在读取数据的时候需要额外的措施来保证线性化的语义。首先,leader必须知道最新有那些entry已经提交。Leader Completeness Property 保证了leader有所有的committed entries,但是在任期的开头,可能并不知道那些entry已被提交。(为了确认,可以发空请求来commit数据)
通过向大多数节点来发送心跳,来保证读请求的返回的是最新的。这里就依赖了前面所说的时钟。依赖时钟来实现安全。

性能

todo 补图
随机杀死leader,重新选举最短时间刚好是leader选举超时的一半,因为心跳超时时间刚好是选举超时的一半。

实现

InstallSnapshot RPC接口

由leader调用,发送snapshot的一部分到从节点。Leader总是按顺序发送chunk

参数

  • term leader的任期
  • leaderId 使得follower可重定向client的请求
  • lastIncludedIndex snapshot最后包含的index号
  • lastIncludedTerm snapshot最后包含的最后一个任期号
  • offset chunk在整个snapshot中的offset
  • data[] 原始数据
  • done 如果是last chunk则为true

返回体

term 当前任期,使得leader可以根据这个结果判断是否做操作

接收方的实现

  • 如果接收到的任期小于当前任期,则立刻返回
  • 如果接收到的chunk offset为0,则开始创建快照文件
  • 在给定的offset处写入数据
  • 如果done为false,返回且等待接下来的数据
  • 保存snapshot文件,丢弃之前存在的所有快照文件
  • 如果现有日志条目的索引和术语与快照的最后一个包含的条目相同,请保留其后的日志条目并回复
  • 丢弃整个日志文件
  • 把状态机重设为快照的内容(也使用快照内的集群信息)

State 状态

所有服务器上的持久化状态

在响应RPC之前更新到持久化存储上

  • 当前任期 server见过的最大任期值(初始值是0,单调递增)
  • votedFor 在当前任期内获得投票的候选人ID(如果没有,则为null)
  • log[] log entry;每一个包含一个状态机的命令,包含从leader获取的任期(初始index为1)

所有服务器上的可变状态

  • commitIndex 已知要提交的最高日志条目的index(初始值是0,单调递增)
  • lastApplied 已知要应用到状态机上的最高index(初始值是0,单调递增)

leader上的可变状态

选举后重新初始化

  • nextIndex[] 对每个服务器,将要送给另一个服务器的下一个log entry的index(初始化为leader的lastLogIndex+1)
  • matchIndex[] 对每个服务器,知道的复制到该服务器的最大的log entry的索引。(初始化为0,单调递增)

AppendEntryies RPC

被leader触发,用来复制log entry;也用于心跳。

参数

  • term leader的任期
  • leaderId 使得follower可重定向client的请求
  • prevLogIndex 紧接新记录之前的日志条目索引
  • prevLogTerm 紧接新记录之前的日志条目索引的任期
  • entries[] 要存储的entries。心跳时为空,批量来提升性能
  • leaderCommit leader的commit index提交索引

返回结果

  • term 当前任期,使得leader可以根据这个结果判断是否做操作
  • success 如果follower包含了匹配的prevLogIndex和prevLogTerm,就返回true

接收方实现

  • 如果term < current term,返回false
  • 如果不包含匹配的prevLogIndex和prevLogTerm,就返回false
  • 如果有一个存在的entry和新的冲突(相同的index,不同的term),删除哪个entry和在其之后的所有entry
  • 添加没在log里面的所有entry
  • 如果leaderCommit > commitIndex, 设置commitIndex为leaderCommit、lastNewEntry的最小值

RequestVote RPC

由candidate调用来收集选票

参数

  • term 候选者的任期
  • candidateId 候选者的请求投票
  • lastLogIndex 候选者的最后log entry的索引
  • lastLogTerm 候选者的最后log entry的任期

返回结果

  • term 任期,可以让candidate根据任期做操作
  • voteGranted true代表候选者收到了选票

接收方实现

  • 如果term < currentTerm返回false
  • 如果votedFor或candidateId是null,并且候选者日志至少比接受者的新,给予自己的投票

Server遵守的规则 Rules for Servers

所有服务器

  • 如果commitIndex > lastApplied: 增加lastApplied,把log[lastAplied]应用到状态机上
  • 如果rpc请求或响应中,包含的任期T比当前的Term值大,则将当前的任期值设置为T,转换自己为follower

Follower

  • 向候选者和leader响应rpc
  • 在选举超时的时间间隔内,没有接收到AppendEntries RPC请求或者投票给其他人,那么切换为候选者

候选者

  • 一旦转化为候选者,开始选举
    • 增加任期号
    • 给自己投票
    • 重置选举定时器
    • 向其他服务器发送RequestVote RPC请求
  • 如果接收到了大部分节点的投票,成为leader
  • 如果AppendEntries RPC从新的leader处返回,变为follower
  • 如果选举超时,启动新的选举流程

Leader

  • 选举完成后:发送空的AppendEntries RPC请求到每个服务器(心跳),在空闲期间重复此操作避免选举超时
  • 从客户端哪里接收到命令;添加entry到本地的日志中,待把这个entry复制到状态机后响应
  • 如果对一个follower有,last log index >= nextIndex,从nextIndex开始发送AppendEntries RPC请求
    • 如果成功:更新follower的nextIndex和matchIndex
    • 如果因为日志不一致的原因失败:降低nextIndex然后重试
  • 如果存在N,且N>commitIndex,大多数节点的matchIndex[i]>=N,并且log[N].term == cuurentTerm,设置commitIndex=N

为什么要采样追踪对接SkyWalking

为了提升Pulsar的可维护性,我们希望能深入pulsarbookkeeper的底层,做性能剖析,识别中间延时高的环节,然后很方便地进行定位分析。

为什么Pulsar原生的普罗监控无法满足

  • 部门基础设施方面,所有环境都集成了ELK,大部分环境都没有普罗,且部门正在引入SkyWalking。
  • 普罗的监控无法和用户的一条消息对应,较难处理单个客户保障。

幸运的是,这两个问题都可以通过采样跟踪来解决。并且在合理的采样跟踪配置下,测试环境可以达成百分百采样追踪,对于我们定位测试环境问题非常方便。

我们就想到了使用经典的采样追踪模式来对数据进行采样,将数据输出到SkyWalking进行下一步的分析、告警。并且在没有对接SkyWalking的环境,通过日志输出来进行下一步的分析、告警

Sample Tracing Basic

image-20210424091405442

  • 通过traceIdspanId结合,识别一条链路。
  • 每个spanId中间计算耗时
  • 为了性能,不会每个spanIdtraceId都收集分析,会进行采样(收集部分消息、收集时延较大的消息)

注:SkyWalking采用STAM来进行拓扑分析,并且引入了Segment等概念来表达进程内、进程外等含义,但大致原理相同。

SkyWalking Architecture

Image

Trace Internal In Pulsar

目前,我们将追踪的头部消息放在BrokerMetadata处,以下为追踪数据流向

message

image-20210506200048088

batch message

image-20210506200035405

Two ways output

Logging

可在ELK上展示,搜索时延大的消息。(注:如搜索4位数的消息)。

SkyWalking

可依赖SkyWalking进行分析,告警。

对比

Logging SkyWalking
采样方式 SkyWalking Way SkyWalking Way
输出格式 Logging SkyWalking protocol through kafka
追踪数据传播方式 protobuf、Broker Metadata protobuf、Broker Metadata

Beyond SkyWalking

在SkyWalking的采样算法之上,支持对时延大的消息再进行采样,输出到日志,进行分析、告警。

采样规则

  • 全局一秒最多xx条,单topic一秒最多xx条
  • 采样时延大于xx的消息

Follow-up

  • 将Pulsar和Bookkeeper的metric信息上报到SkyWalking,使得在SkyWalking分析、告警更多信息。
  • 使用agent方式实现,避免和SkyWalking协议过度耦合
0%