从简单通信协议开始

最近工作中又需要处理协议解析,我对协议解析和网络抓包其实还是小有研究,17年刚毕业的时候,就用Netty手写过SMPP协议的对接。(其实做协议解析是一个很枯燥的工作,如果协议解析可以像antlr那样子写grammar自动解析应该会很酷?)本文总结一下协议在tcp下编码拆包粘包的三种解决方案。

网上有一些人对拆包粘包的说法不是很认可,但是我觉得这个术语还是挺形象的。

首先,让我们来设计一个简单地通信协议,Sorry,客户端一直对服务器发送I am Sorry,服务端回复That's ok。如下图所示

image-20210704104926698

让我们来写个demo程序实现这个协议

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
"fmt"
"net"
)

func main() {
listen, err := net.Listen("tcp", "localhost:1997")
if err != nil {
panic(err)
}
defer listen.Close()
for {
conn, err := listen.Accept()
if err != nil {
panic(err)
}
go handleRequest(conn)
}
}

// handle incoming requests
func handleRequest(conn net.Conn) {
// make a buffer to hold incoming data
buf := make([]byte, 1024)
// Read the incoming connection into the buffer
reqLen, err := conn.Read(buf)
if err != nil {
fmt.Println("error reading: ", err.Error())
}
if reqLen != 10 {
fmt.Println("invalid request size ", reqLen)
}
_, err = conn.Write([]byte("That's ok"))
if err != nil {
fmt.Println("error sending: ", err.Error())
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
use std::io::{Read, Write};
use std::net::TcpStream;
use std::str::from_utf8;

fn main() {
match TcpStream::connect("localhost:1997") {
Ok(mut stream) => {
println!("success connect to 1997");
let msg = b"I am Sorry";
let expect_resp = b"That's ok";
stream.write(msg);
println!("Send hello, awaiting reply");
// use 9 byte buffer
let mut data = [0 as u8; 9];
match stream.read_exact(&mut data) {
Ok(_) => {
if &data == expect_resp {
println!("Reply is ok")
} else {
let text = from_utf8(&data).unwrap();
println!("Unexpected reply: {}", text);
}
},
Err(e) => {
println!("Failed to receive data: {}", e);
}
}
}
Err(e) => {
println!("Failed to connect: {}", e)
}
}

}

注意上面在服务端的实现中,我们校验了请求体的大小。

运行成功,我们在Wireshark上可以看到

image-20210704115955993

目标端口为1997,这是客户端发出的报文。当然也能看到响应的报文

image-20210704120027704

那么,如果客户端是个十分礼貌的人,他如果连续发送10个I am Sorry呢?我们将代码修改为

1
2
3
for _ in 0..10 {
stream.write(msg);
}

服务端报错了,服务端收到了一个请求,大小为100。并不是新手预期的10个大小为10的消息,

image-20210704120639637

那么实际在网络中是如何传输的呢?一定是1个大小为100的消息吗?答案是否定的。在我的这次测试中,在TCP层,分成了两组消息,第一个大小为10,包含一个I am Sorry

image-20210704120759769

另一个大小为90,包含9个

image-20210704120818834

揭秘时刻

TCP协议

TCPUDP不同,它是一个基于流的协议,TCP并不识别你定义的协议规则,只负责将这些报文打包发送,它可以基于TCP_NODELAYNagle算法等,任意的对你的报文进行切分发送。有两个典型的场景:第一个像上文中的例子,两个及以上的包在一个TCP数据包发送了,有个很形象的名字叫粘包。还有一个,因为报文过大,拆分成两个TCP报文发送,这叫拆包。

应用层读取

常见API,应用层读取也不保证单次操作一定仅仅读取一个tcp数据包,会根据你提供的buffer大小,尽量提供数据。你读取到的可能是上一个TCP包的末尾和下一个TCP包的开头部分。

总结

TCP是基于流的协议,并非基于报文。TCP提供了保序的语义保证,这要求应用程序,尤其是接收者,需要能够从报文流中提取出协议信息,TCP决不保证读取到的报文恰好是发送者一次write写入的报文,即使能在测试环境通过case,那也只不过是你运气好而已。

像我们上面,读取到100大小的消息。根据协议大小请求固定为10,我们就可以将100消息分割为10条协议报文。如果读取到的大小为96,那就先处理前90个字节,剩下6个字节,待后面4个字节到达之后再合并处理。下一节我们详细介绍一下几种常见方式。

常见TCP协议定义方式

定长编码

就像我们例子中的那样一样,定义一个定长宽度,然后切分

使用Go的gnet库的Server例子

1
2
3
4
5
6
7
8
9
10
import "github.com/panjf2000/gnet"

type ExampleServer struct {
*gnet.EventServer
}

func main() {
codec := gnet.NewFixedLengthFrameCodec(10)
gnet.Serve(&ExampleServer{}, "tcp://localhost:1998", gnet.WithCodec(codec))
}

基于分隔符

基于分隔符的编码也十分容易理解,双方约定好一个字符,并在正常报文中不出现这个字符(出现则需要转义),比较类似的是以太网的7d7d?这个计算机网络链路层相关的知乎,学太久了,忘记了。

1
2
3
4
5
6
7
8
9
10
import "github.com/panjf2000/gnet"

type ExampleServer struct {
*gnet.EventServer
}

func main() {
codec := gnet.NewDelimiterBasedFrameCodec(0x11)
gnet.Serve(&ExampleServer{}, "tcp://localhost:1998", gnet.WithCodec(codec))
}

基于固定行数的编码

这个也很简单,协议内容不换行,发送完再发送一个换行符,比较类似的有HTTP的\r\n

1
2
3
4
5
6
7
8
9
10
11
12
package main

import "github.com/panjf2000/gnet"

type ExampleServer struct {
*gnet.EventServer
}

func main() {
gnet.Serve(&ExampleServer{}, "tcp://localhost:1998", gnet.WithCodec(&gnet.LineBasedFrameCodec{}))
}

长度编码

长度编码是使用最多的,最流行的一种编码方式。最简单的一种工作方式是,在报文的最开始数个字节(常见为4个字节,足以编码4个G长度,相比之下两个字节仅能存放64K消息),声明报文剩余内容的长度。以Kafka协议举例

image-20210704125652379

Kafka这条消息,在TCP层占据的总长度为87字节,其中前4个字节00 00 00 53声明为83长度,为其余报文的长度。

这一模式还有很多变体,如

  • 声明的长度包括其长度字段本身的长度
  • 长度字段并不是打头的字段
  • 长度字段的长度

等等。这也就是下面解码器,拥有的参数非常多的原因,都是为了适配这些变体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import (
"encoding/binary"
"github.com/panjf2000/gnet"
)

type ExampleServer struct {
*gnet.EventServer
}

func main() {
encoderConfig := gnet.EncoderConfig{
ByteOrder: binary.BigEndian,
LengthFieldLength: 4,
LengthAdjustment: 0,
LengthIncludesLengthFieldLength: true,
}
decoderConfig := gnet.DecoderConfig{
ByteOrder: binary.BigEndian,
LengthFieldOffset: 0,
LengthFieldLength: 4,
LengthAdjustment: -4,
InitialBytesToStrip: 4,
}
codec := gnet.NewLengthFieldBasedFrameCodec(encoderConfig, decoderConfig)
gnet.Serve(&ExampleServer{}, "tcp://localhost:1998", gnet.WithCodec(codec))
}

事实上,长度字段编码格式是我见过开源代码使用最多的格式,像MQTT、KAFKA、SMPP等都使用这种格式。其中原因,个人觉得在于声明长度之后,buffer申请及释放,可以简化很多,性能最好。

其他网络协议使用的编码方式

MQTT

使用长度字段编码格式

image-20210704131034560

AMQP

AMQP的解析较为麻烦,它根据协议目前的状态,同时使用定长编码和长度字段两种编码方式。这就要求解码器不仅仅要处理报文,还要处理当前协议交互到那个状态了。

定长场景

image-20210704131231757

长度字段模式

image-20210704131317098

代码地址

开发一个filebeat的websocket插件, 代码仓地址: https://github.com/hezhangjian/beats_output_websocket

引入对beat的依赖

1
go get github.com/elastic/beats/v7

定义在filebeat中的配置文件

filebeat通常以配置文件的方式加载插件。让我们定义一下必须的配置,就像elasticsearch中的连接地址等等一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
output.websocket:
# worker
# 用于工作的websocket客户端数量
workers: 1
# 日志批量的最大大小
batch_size: 1
# 重试的最大次数,0代表不重试
retry_limit: 1
# conn
# ws/wss
schema: "ws"
# websocket连接地址
addr: "localhost:8080"
# websocket路径
path: "/echo"
# websocket心跳间隔,用于保活
ping_interval: 30

go文件中的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type clientConfig struct {
// Number of worker goroutines publishing log events
Workers int `config:"workers" validate:"min=1"`
// Max number of events in a batch to send to a single client
BatchSize int `config:"batch_size" validate:"min=1"`
// Max number of retries for single batch of events
RetryLimit int `config:"retry_limit"`
// Schema WebSocket Schema
Schema string `config:"schema"`
// Addr WebSocket Addr
Addr string `config:"addr"`
// Path WebSocket Path
Path string `config:"path"`
// PingInterval WebSocket PingInterval
PingInterval int `config:"ping_interval"`
}

初始化加载插件

加载插件

在某个init函数中注册插件

1
2
3
func init() {
outputs.RegisterType("websocket", newWsOutput)
}

newWsOutput中卸载配置,并提供配置给WebSocket客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func newWsOutput(_ outputs.IndexManager, _ beat.Info, stats outputs.Observer, cfg *common.Config) (outputs.Group, error) {
config := clientConfig{}
// 卸载配置,将配置用于初始化WebSocket客户端
if err := cfg.Unpack(&config); err != nil {
return outputs.Fail(err)
}
clients := make([]outputs.NetworkClient, config.Workers)
for i := 0; i < config.Workers; i++ {
clients[i] = &wsClient{
stats: stats,
Schema: config.Schema,
Host: config.Addr,
Path: config.Path,
PingInterval: config.PingInterval,
}
}

return outputs.SuccessNet(true, config.BatchSize, config.RetryLimit, clients)
}

初始化WebSocket客户端

WebSocket客户端不仅仅是一个WebSocket客户端,而且还需要实现filebeat中的NetworkClient接口,接下来,让我们来关注接口中的每一个方法的作用及实现

String()接口

String作为客户端的名字,用来标识日志以及指标。是最简单的一个接口

1
2
3
func (w *wsClient) String() string {
return "websocket"
}

Connect()接口

Connect用来初始化客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (w *wsClient) Connect() error {
u := url.URL{Scheme: w.Schema, Host: w.Host, Path: w.Path}
dial, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err == nil {
w.conn = dial
ticker := time.NewTicker(time.Duration(w.PingInterval) * time.Second)
go func() {
for range ticker.C {
w.conn.WriteMessage(websocket.PingMessage, nil)
}
}()
} else {
time.Sleep(10 * time.Second)
}
return err
}

注意,这里初始化失败,需要Sleep一段时间,否则,filebeat会一直重试。这绝非是你想要的。或许对于场景来说,退避重试可能会更好

Close()接口

关闭客户端,也是很简单的接口

1
2
3
func (w *wsClient) Close() error {
return w.conn.Close()
}

Publish()接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (w *wsClient) Publish(_ context.Context, batch publisher.Batch) error {
events := batch.Events()
// 记录这批日志
w.stats.NewBatch(len(events))
failEvents, err := w.PublishEvents(events)
if err != nil {
// 如果发送正常,则ACK
batch.ACK()
} else {
// 发送失败,则重试。受RetryLimit的限制
batch.RetryEvents(failEvents)
}
return err
}

func (w *wsClient) PublishEvents(events []publisher.Event) ([]publisher.Event, error) {
for i, event := range events {
err := w.publishEvent(&event)
if err != nil {
// 如果单条消息发送失败,则将剩余的消息直接重试
return events[i:], err
}
}
return nil, nil
}

func (w *wsClient) publishEvent(event *publisher.Event) error {
bytes, err := encode(&event.Content)
if err != nil {
// 如果编码失败,就不重试了,重试也不会成功
// encode error, don't retry.
// consider being success
return nil
}
err = w.conn.WriteMessage(websocket.TextMessage, bytes)
if err != nil {
// 写入WebSocket Server失败
return err
}
return nil
}

编码

编码的逻辑因人而异,事实上,这可能是大家最大的差异所在。这里只是做一个简单地例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type LogOutput struct {
Timestamp time.Time `json:"timestamp"`
Message string `json:"message"`
}

func encode(event *beat.Event) ([]byte, error) {
logOutput := &LogOutput{}
value, err := event.Fields.GetValue("message")
if err != nil {
return nil, err
}
logOutput.Timestamp = event.Timestamp
logOutput.Message = value.(string)
return json.Marshal(logOutput)
}

最后是我们的wsclient

1
2
3
4
5
6
7
8
9
10
type wsClient struct {
// construct field
Schema string
Host string
Path string
PingInterval int

stats outputs.Observer
conn *websocket.Conn
}

添加额外的功能:大包丢弃

你可能会想保护你的WebSocket服务器,避免接收到超级大的日志。我们可以在配置项中添加一个配置

maxLen用来限制日志长度,超过maxLen的日志直接丢弃。为什么不使用filebeat中的max_bytes

因为filebeatmax_bytes的默认行为是截断,截断的日志在某些场景下不如丢弃。(比如,日志是json格式,截断后格式无法解析)

配置中添加maxLen

1
max_len: 1024

省略掉那些重复的添加结构体,读取max_len在encode的时候忽略掉

1
2
3
4
s := value.(string)
if len(s) >= w.MaxLen {
return nil, err
}

参考资料

https://golang.org/ref/mem

TL;DR

  • 协程之间的数据可见性满足HappensBefore法则,并具有传递性
  • 如果包 p 导入包 q,则 q 的 init 函数的完成发生在任何 p 的操作开始之前
  • main.main 函数的启动发生在所有 init 函数完成之后
  • go语句启动新的协程发生在新协程启动开始之前
  • go协程的退出并不保证发生在任何事件之前
  • channel上的发送发生在对应channel接收之前
  • 无bufferchannel的接收发生在发送操作完成之前
  • 对于容量为C的buffer channel来说,第k次从channel中接收,发生在第k + C次发送完成之前。
  • 对于任何的sync.Mutex或者sync.RWMutex变量,且有n<m,第n个调用UnLock一定发生在mLock`之前。
  • 从 once.Do(f) 对 f() 的单个调用返回在任何一个 once.Do(f) 返回之前。
  • 如果两个动作不满足HappensBefore,则顺序无法预测

介绍

Go内存模型指定了在何种条件下可以保证在一个 goroutine 中读取变量时观察到不同 goroutine 中写入该变量的值。

建议

通过多个协程并发修改数据的程序必须将操作序列化。为了序列化访问,通过channel操作或者其他同步原语(syncsync/atomic)来保护数据。

如果你必须要阅读本文的其他部分才能理解你程序的行为,请尽量不要这样…

Happens Before

在单个 goroutine 中,读取和写入的行为必须像按照程序指定的顺序执行一样。 也就是说,只有当重新排序不会改变语言规范定义的 goroutine 中的行为时,编译器和处理器才可以重新排序在单个 goroutine 中执行的读取和写入。 由于这种重新排序,一个 goroutine 观察到的执行顺序可能与另一个 goroutine 感知的顺序不同。 例如,如果一个 goroutine 执行 a = 1; b = 2;,另一个可能会在 a 的更新值之前观察到 b 的更新值。

为了满足读写的需求,我们定义了happens before,Go程序中内存操作的局部顺序。如果事件e1e2之前发生,我们说e2e1之后发生。还有,如果e1不在e2之前发生、e2也不在e1之前发生,那么我们说e1e2并发happen。

在单个goroutine中,happens-before顺序由程序指定。

当下面两个条件满足时,变量v的阅读操作r可能观察到写入操作w

  • r不在w之前发生
  • 没有其他的请求w2发生在w之后,r之前

为了保证r一定能阅读到v,保证wr能观测到的唯一的写操作。当下面两个条件满足时,r保证可以读取到w

  • wr之前发生
  • 任何其他对共享变量v的操作,要么在w之前发生,要么在r之后发生

这一对条件比上一对条件更强;这要求无论是w还是r,都没有相应的并发操作。

在单个goroutine中,没有并发。所以这两个定义等价:读操作r能读到最近一次w写入v的值。但是当多个goroutine访问共享变量时,它们必须使用同步事件来建立happens-before关系。

使用变量v类型的0值初始化变量v的行为类似于内存模型中的写入。

对于大于单个机器字长的值的读取和写入表现为未指定顺序的对多个机器字长的操作。

同步

初始化

程序初始化在单个 goroutine 中运行,但该 goroutine 可能会创建其他并发运行的 goroutine。

如果包 p 导入包 q,则 q 的 init 函数的完成发生在任何 p 的操作开始之前。

main.main 函数的启动发生在所有 init 函数完成之后。

Go协程的创建

go语句启动新的协程发生在新协程启动开始之前。

举个例子

1
2
3
4
5
6
7
8
9
10
var a string

func f() {
print(a)
}

func hello() {
a = "hello, world"
go f()
}

调用hello将会打印hello, world。当然,这个时候hello可能已经返回了。

Go协程的销毁

go协程的退出并不保证发生在任何事件之前

1
2
3
4
5
6
var a string

func hello() {
go func() { a = "hello" }()
print(a)
}

对 a 的赋值之后没有任何同步事件,因此不能保证任何其他 goroutine 都会观察到它。 事实上,激进的编译器可能会删除整个 go 语句。

如果一个 goroutine 的效果必须被另一个 goroutine 观察到,请使用同步机制,例如锁或通道通信来建立相对顺序。

通道通信

通道通信是在go协程之间传输数据的主要手段。在特定通道上的发送总有一个对应的channel的接收,通常是在另外一个协程。

channel上的发送发生在对应channel接收之前

1
2
3
4
5
6
7
8
9
10
11
12
13
var c = make(chan int, 10)
var a string

func f() {
a = "hello, world"
c <- 0
}

func main() {
go f()
<-c
print(a)
}

程序能保证输出hello, world。对a的写入发生在往c发送数据之前,往c发送数据又发生在从c接收数据之前,它又发生在print之前。

channel的关闭发生在从channel中获取到0值之前

在之前的例子中,将c<-0替换为close(c),程序还是能保证输出hello, world

无bufferchannel的接收发生在发送操作完成之前
这个程序,和之前一样,但是调换发送和接收操作,并且使用无buffer的channel

1
2
3
4
5
6
7
8
9
10
11
12
13
var c = make(chan int)
var a string

func f() {
a = "hello, world"
<-c
}

func main() {
go f()
c <- 0
print(a)
}

也保证能够输出hello, world。对a的写入发生在c的接收之前,继而发生在c的写入操作完成之前,继而发生在print之前。

如果该channel是bufferchannel(例如:c=make(chan int, 1)),那么程序就不能保证输出hello, world。可能会打印空字符串、崩溃等等。从而,我们得到一个相对通用的推论:

对于容量为C的buffer channel来说,第k次从channel中接收,发生在第k + C次发送完成之前。

此规则将先前的规则推广到缓冲通道。 它允许通过buffer channel 来模拟信号量:通道中的条数对应活跃的数量,通道的容量对应于最大并发数。向channel发送数据相当于获取信号量,从channel中接收数据相当于释放信号量。 这是限制并发的常用习惯用法。

该程序为工作列表中的每个条目启动一个 goroutine,但是 goroutine 使用limitchannel进行协调,以确保一次最多三个work函数正在运行。

1
2
3
4
5
6
7
8
9
10
11
12
var limit = make(chan int, 3)

func main() {
for _, w := range work {
go func(w func()) {
limit <- 1
w()
<-limit
}(w)
}
select{}
}

sync包中实现了两种锁类型:sync.Mutexsync.RWMutex

对于任何的sync.Mutex或者sync.RWMutex变量,且有n<m,第n个调用UnLock一定发生在mLock`之前。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var l sync.Mutex
var a string

func f() {
a = "hello, world"
l.Unlock()
}

func main() {
l.Lock()
go f()
l.Lock()
print(a)
}

这个程序也保证输出hello,world。第一次调用unLock一定发生在第二次Lock调用之前

对于任何sync.RWMutexRLock方法调用,存在变量n,满足RLock方法发生在第nUnLock调用之后,并且对应的RUnlock发生在第n+1Lock方法之前。

Once

在存在多个 goroutine 时,sync包通过once提供了一种安全的初始化机制。对于特定的f,多个线程可以执行once.Do(f),但是只有一个会运行f(),另一个调用会阻塞,直到f()返回

从 once.Do(f) 对 f() 的单个调用返回在任何一个 once.Do(f) 返回之前。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var a string
var once sync.Once

func setup() {
a = "hello, world"
}

func doprint() {
once.Do(setup)
print(a)
}

func twoprint() {
go doprint()
go doprint()
}

调用 twoprint 将只调用一次 setup。 setup函数将在任一打印调用之前完成。 结果将是hello, world打印两次。

不正确的同步

注意,读取r有可能观察到了由写入w并发写入的值。尽管观察到了这个值,也并不意味着r后续的读取可以读取到w之前的写入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var a, b int

func f() {
a = 1
b = 2
}

func g() {
print(b)
print(a)
}

func main() {
go f()
g()
}

有可能g会接连打印2和0两个值。

双检查锁是为了降低同步造成的开销。举个例子,twoprint方法可能会被误写成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var a string
var done bool

func setup() {
a = "hello, world"
done = true
}

func doprint() {
if !done {
once.Do(setup)
}
print(a)
}

func twoprint() {
go doprint()
go doprint()
}

因为没有任何机制保证,协程观察到done为true的同时可以观测到a为hello, world,其中有一个doprint可能会输出空字符。

另外一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var a string
var done bool

func setup() {
a = "hello, world"
done = true
}

func main() {
go setup()
for !done {
}
print(a)
}

和以前一样,不能保证在 main 中,观察对 done 的写入意味着观察对 a 的写入,因此该程序也可以打印一个空字符串。 更糟糕的情况下,由于两个线程之间没有同步事件,因此无法保证 main 会观察到对 done 的写入。 main 中的循环会一直死循环。

下面是该例子的一个更微妙的变体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type T struct {
msg string
}

var g *T

func setup() {
t := new(T)
t.msg = "hello, world"
g = t
}

func main() {
go setup()
for g == nil {
}
print(g.msg)
}

尽管main观测到g不为nil,但是也没有任何机制保证可以读取到t.msg。

在上述例子中,解决方案都是相同的:请使用显式的同步机制。

参考资料

阶段

Go编译器由四个阶段组成,可以分为两类

  • frontend前端:这一阶段对源码进行语法解析,并生成AST
  • backend后端:这一阶段将把transform the representation of the source code into machine code, 并进行数项优化

为了更好地理解每个阶段,让我们使用如下的示例程序

1
2
3
4
5
6
7
8
9
10
11
12
13
package main

func main() {
a := 1
b := 2
if true {
add(a, b)
}
}

func add(a, b int) {
println(a + b)
}

P1 解析

  • cmd/compile/internal/syntax 词法,解析器,语法树

第一个阶段非常简单直接:

第一阶段,源码经过词法分析、语法解析,对每个源码文件,都构造出相应的语法树

Lexer首先运行,把源代码转化为词法单元。我们可以通过这个程序来自己模拟运行Lexer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"go/scanner"
"go/token"
"io/ioutil"
)

func main() {
// src is the input that we want to tokenize.
src, _ := ioutil.ReadFile(`main.go`)

// Initialize the scanner
var s scanner.Scanner
// positions are relative to fSet
fSet := token.NewFileSet()
file := fSet.AddFile("", fSet.Base(), len(src))
// nil means no error handler
s.Init(file, src, nil, scanner.ScanComments)

// Repeated calls to Scan yield the token sequence found in the input
for {
pos, tok, lit := s.Scan()
if tok == token.EOF {
break
}
fmt.Printf("%s\t%s\t%q\n", fSet.Position(pos), tok, lit)
}
}

截选输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1:1	package	"package"
1:9 IDENT "main"
1:13 ; "\n"
3:1 func "func"
3:6 IDENT "main"
3:10 ( ""
3:11 ) ""
3:13 { ""
4:2 IDENT "a"
4:4 := ""
4:7 INT "1"
4:8 ; "\n"
5:2 IDENT "b"
5:4 := ""
5:7 INT "2"
5:8 ; "\n"
6:2 if "if"
6:5 IDENT "true"
6:10 { ""
7:3 IDENT "add"
7:6 ( ""
7:7 IDENT "a"

一旦经过词法化,源码被解析构造成语法树。

语法树还包含了代码位置信息,该信息可用于debug或错误报告。

P2 类型检查和AST转化

  • cmd/compile/internal/gc 创建编译器AST,类型检查,AST转换

AST是类型检查的。第一个步骤就是名字解析和类型推断,确定对象和标识符的对应关系,表达式是何种类型。Type-checking这一阶段还引入了额外的确定性步骤,例如,“声明未使用”、函数是否终止等。

还有一些确定的转换也在AST阶段完成。一些节点会根据类型信息进行细化,比如字符串加法从算术加法节点中分离出来。其他一些示例是不可达代码清除、内联函数调用、逃逸分析。

转化到AST的步骤可以通过命令go tool compile -w来展示出来,如果加上-l,则可以禁用内联。在我们的样例代码中,如果不禁用内联,add方法会被内联掉。我们可以分别使用go tool compile -w example.ogo tool compile -w -l example.o进行对比

禁用了内联的命令,会输出这样的AST

image-20210701110847543

没禁用内联的命令则不会生成,这里可以看出来,编译器做了内联的优化。

SSA 生成

SSA 概念

  • cmd/compile/internal/gc AST转化到SSA
  • cmd/compile/internal/ssa SSA阶段和规则

在这个阶段,AST转化为SSA的格式,这是一种具有特定属性的更底层的IR,可以更轻松地在上面进行优化并最终生成机器码。阶段应用了内联函数。这些是编译器被教导要根据具体情况用高度优化的代码替换的特殊函数。在AST到SSA的转换期间,某些确定的节点也被降低为更简单的组件,使得编译器的其余部分可以使用它们。例如,内置的copy函数被内存移动取代、范围循环被重写为for循环。由于历史原因,其中一些目前在SSA转换之前发生,但长期计划是将它们全部移到这里。

然后,应用一系列的、机器无关的阶段和规则。这些不涉及任何的计算机架构,因此可以在任何GOARCH变体上运行。

这些通用的阶段包括:不可达代码清除、删除不需要的nil检查、移除无用的分支。

通用的重写规则主要涉及表达式,包括表达式替换为常量、优化乘法和浮点运算等。

SSA code可以用这个命令dump并展示出来

1
GOSSAFUNC=main go tool compile main.go && open ssa.html

SSA阶段

Go编译流程

SSA优化解析

start Tab上生成了最开始的SSA

image-20210701101016958

变量 a 和 b 与 if 条件一起在此处突出显示,以便我们稍后查看这些行是如何更改的。 代码还向我们展示了编译器如何管理 println
函数,它被分解为 4 个步骤:printlockprintintprintnlprintunlock。 编译器会自动为我们加锁,并根据参数的类型调用相关方法正确打印。
在我们的示例中,由于 a 和 b 在编译时已知,编译器可以计算最终结果并将变量标记为不再需要。 opt阶段 会优化这部分:

image-20210701101445933

这个阶段v7被优化计算成了3。并且接下来,因为v4和v5已经没有人声明使用,在opt deadcode阶段,v4和v5也会被清除掉

image-20210701101729496

等待所有阶段完成之后,Go编译器将会生成中间汇编语言

image-20210701102735403

下一阶段会将汇编语言转换为二进制文件

机器代码生成

  • cmd/compile/internal/ssa SSA “lowering” 和 特定arch的阶段
  • cmd/internal/obj 机器语言生成

机器相关的编译阶段从”lowering”阶段开始,它将通用的值替换成机器特定的变体。例如,在 amd64 内存操作数上是可能的,因此可以组合许多加载-存储操作。

注意这些底层阶段执行了所有机器特定的规则,所以也应用了很多优化。

一旦SSA被”lowered”到更特定的目标架构,就开始执行最终的代码优化。这包括另一个不可达代码清除阶段、将值更靠近它们的使用者、移除从未使用的本地变量、寄存器分配。

还有一部分重要工作包括堆栈帧布局,它将堆栈偏移分配给局部变量,以及指针存活分析,它计算每个 GC 安全点上哪些堆栈上指针是活跃的。

在 SSA 生成阶段结束时,Go 函数已转换为一系列 obj.Prog 指令。 这些被传递给汇编器(cmd/internal/obj),汇编器将它们转换成机器代码并写出最终的目标文件。
目标文件还将包含反射数据、导出数据和调试信息。

我们可以使用go tool objdump $binary来查看汇编代码。当compile的.o文件生成之后,可以通过go tool link来生成二进制可运行文件。

翻译自

https://blog.golang.org/concurrency-timeouts

Go并发范式:超时,继续执行

并发编程有自己的习惯用法。 超时是一个很好的例子。在商用软件开发时,所有操作都需要有超时。

虽然 Go 的channel不直接支持超时,但很容易实现。假设我们想从通道 ch 接收,但希望实现一秒钟超时。 我们可以创建一个信号channel并启动一个在通道上发送之前休眠的 goroutine

1
2
3
4
5
timeout := make(chan bool, 1)
go func() {
time.Sleep(1 * time.Second)
timeout <- true
}()

我们可以使用select语句从ch或者timeout中接收。如果过了1秒还没有数据返回,超时的case会被选中,尝试从ch读取的操作被放弃

1
2
3
4
5
6
select {
case <-ch:
// a read from ch has occurred
case <-timeout:
// the read from ch has timed out
}

timeout channel有1个buffer,允许超时 goroutine 发送到通道然后退出。 goroutine 不关心这个值是否被接收了。这意味着如果 ch 接收发生在超时之前,goroutine 不会永远挂起。timeout channel 最终会被gc释放。

(在这个例子中,我们使用 time.Sleep 来演示 goroutines 和通道的机制。在实际程序中,你应该使用 time.After来完成这个延迟发送)

让我们看看这种模式的另一种变体。在这个例子中,我们有一个程序可以同时从多个数据库的副本中读取数据。程序只需要一个结果,它应该接受最先返回的结果。

函数 Query 接受多个数据库连接和一个查询字符串。它并行查询每个数据库并返回它收到的第一个响应:

1
2
3
4
5
6
7
8
9
10
11
12
func Query(conns []Conn, query string) Result {
ch := make(chan Result)
for _, conn := range conns {
go func(c Conn) {
select {
case ch <- c.DoQuery(query):
default:
}
}(conn)
}
return <-ch
}

在这个例子中,闭包执行非阻塞发送,它通过在把send放在带有defaultselect中实现。 如果send不能立即完成,则将选择default情况。 非阻塞发送保证循环中启动的任何 goroutine 都不会挂起。

但是,这个例子中有一个问题,如果结果在主函数执行到11行接收的时候之前到达,发送都会失败,最终函数无法取得结果。

这个问题是所谓的竞争条件的教科书示例,修复非常简单。 我们只要保证channel有着缓冲通道(通过添加缓冲区长度作为 make 的第二个参数)来保证第一次发送有一个放置值的地方。 这确保发送总是成功,并且无论执行顺序如何,第一个值都会被获取。

这两个例子展示了 Go 可以简单地表达 goroutines 之间复杂的交互。

TL;DR

大部分场景应该使用值传递。

正文

Go 的文档指出

1
Programs using times should typically store and pass them as values, not pointers. That is, time variables and struct fields should be of type time.Time, not *time.Time. A Time value can be used by multiple goroutines simultaneously.

翻译一下就是

程序使用times典型场景应该把它们当做值传递,而非指针。也就是说time变量及结构体field应该为time.Time,而不是*time.Time。Time值是协程安全的。之所以说typically,是因为有着那么不typically的场景,比如你想在方法内修改原来的time.Time对象这种场景(很少见)。

time.Time是一个较小的对象,把它作为值传递是完全合理的。传递指针而不是值的很重要的一个原因是避免昂贵的拷贝动作。但是传递指针也会给gc带来一些额外的开销,如逃逸分析等。

值得一提的是,你可以通过go build -gcflags="-m"来判断变量在堆中还是在栈中。

go-time-pass

time.Time在我的MBP上只有24个字节。附上几个简单类型的字节数

1
2
3
4
5
6
7
byte size is  1
int32 size is 4
int size is 8
int64 size is 8
float32 size is 4
float64 size is 8
time size is 24

测试程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package demo_base

import (
"fmt"
"testing"
"time"
"unsafe"
)

func TestSize(t *testing.T) {
{
var x byte = 0
fmt.Println("byte size is ", unsafe.Sizeof(x))
}
{
var x int32 = 0
fmt.Println("int32 size is ", unsafe.Sizeof(x))
}
{
var x int = 0
fmt.Println("int size is ", unsafe.Sizeof(x))
}
{
var x int64 = 0
fmt.Println("int64 size is ", unsafe.Sizeof(x))
}
{
var x float32 = 0
fmt.Println("float32 size is ", unsafe.Sizeof(x))
}
{
var x float64 = 0
fmt.Println("float64 size is ", unsafe.Sizeof(x))
}
{
t := time.Now()
fmt.Println("time size is ", unsafe.Sizeof(t))
}
}

前言

这是**icza**
在StackOverflow上的一篇回答
,质量很高,翻译一下,大家一起学习

问题是:在go,有没有什么最快最简单的方法,用来生成只包含英文字母的随机字符串

icza给出了8个方案,最简单的方法并不是最快的方法,它们各有优劣,末尾附上性能测试结果:

1. Runes

比较简单的答案,声明一个rune数组,通过随机数选取rune字符,拼接成结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package approach1

import (
"fmt"
"math/rand"
"testing"
"time"
)

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

func randStr(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}

func TestApproach1(t *testing.T) {
rand.Seed(time.Now().UnixNano())
fmt.Println(randStr(10))
}

func BenchmarkApproach1(b *testing.B) {
rand.Seed(time.Now().UnixNano())
for i := 0; i < b.N; i++ {
_ = randStr(10)
}
}

2. Bytes

如果随机挑选的字符只包含英文字母,我们可以直接使用bytes,因为在UTF-8编码模式下,英文字符和Bytes是一对一的(Go正是使用UTF-8模式编码)

所以可以把

1
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

用这个替代

1
var letters = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

或者更好

1
const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

现在我们有很大的进展了,我们把它变为了一个常数,在go里面,只有string常数,可并没有slice常数。额外的收获,表达式len(letters)
也变为了一个常数(如果s为常数,那么len(s)也将是常数)

我们没有付出什么代码,现在letters可以通过下标访问其中的bytes了,这正是我们需要的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package approach2

import (
"fmt"
"math/rand"
"testing"
"time"
)

const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

func randStr(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}

func TestApproach2(t *testing.T) {
rand.Seed(time.Now().UnixNano())

fmt.Println(randStr(10))
}

func BenchmarkApproach2(b *testing.B) {
rand.Seed(time.Now().UnixNano())
for i := 0; i < b.N; i++ {
_ = randStr(10)
}
}

3. Remainder

上面的解决方法通过rand.Intn()来获得一个随机字母,这个方法底层调用了Rand.Intn(),然后调用了Rand.Int31n()

相比于生成63个随机bits的函数rand.Int63()来说,Rand.Int31n()很慢。

我们可以简单地调用rand.Int63()然后除以len(letterBytes),使用它的余数来生成字母

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package approach3

import (
"fmt"
"math/rand"
"testing"
"time"
)

const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

func randStr(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = letters[rand.Int63() % int64(len(letters))]
}
return string(b)
}

func TestApproach3(t *testing.T) {
rand.Seed(time.Now().UnixNano())

fmt.Println(randStr(10))
}

func BenchmarkApproach3(b *testing.B) {
rand.Seed(time.Now().UnixNano())
for i := 0; i < b.N; i++ {
_ = randStr(10)
}
}

这个算法能正常工作并且非常快,不过它牺牲了部分精确性,字母出现的概率并不是精确一样的(假设rand.Int63()
生成63比特的数字是等概率的)。由于字母总共才52个,远小于 1<<63 - 1,因此失真非常小,因此实际上这完全没问题。

解释: 假设你想要05的随机数,如果使用3位的bit,3位的bit等概率出现07,所以出现0和1的概率是出现2、3、4概率的两倍。使用5位的
bit,0和1出现的概率是6/32,2、3、4出现的概率是5/32。现在接近了一些了,是吧?不断地增加比特位,这个差距就会变得越小,当你有63位地时候,这差别已经可忽略不计。

4. Masking

在上一个方案的基础上,我们通过仅使用随机数的最低n位保持均匀分布,n为表示所有字符的数量。比如我们有52个字母,我们需要6位(52 =
110100b)。所以我们仅仅使用了rand.Int63()的最后6位。并且,为了保持所有字符的均匀分布,我们决定只接受在
0..len(letterBytes)-1的数字即0~51。(译者注:这里已经没有第三个方案的不准确问题了)

最低几位大于等于len(letterBytes)的概率一般小于0.5
(平均值为0.25),这意味着出现这种情况,只要重试就好。重试n次之后,我们仍然需要丢弃这个数字的概率远小于0.5的n次方(这是上界了,实际会低于这个值)。以本文的52个字母为例,最低6位需要丢弃的概率只有
(64-52)/64=0.19。这意味着,重复10次,仍然没有数字的概率是1*10^-8。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package approach4

import (
"fmt"
"math/rand"
"testing"
"time"
)

const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

const (
// 6 bits to represent a letters index
letterIdBits = 6
// All 1-bits as many as letterIdBits
letterIdMask = 1 <<letterIdBits - 1
)

func randStr(n int) string {
b := make([]byte, n)
for i := range b {
if idx := int(rand.Int63() & letterIdMask); idx < len(letters) {
b[i] = letters[idx]
i++
}
}
return string(b)
}

func TestApproach4(t *testing.T) {
rand.Seed(time.Now().UnixNano())

fmt.Println(randStr(10))
}

func BenchmarkApproach4(b *testing.B) {
rand.Seed(time.Now().UnixNano())
for i := 0; i < b.N; i++ {
_ = randStr(10)
}
}

5. Masking Improved

第4节的方案只使用了rand.Int63()方法返回的64个随机字节的后6位。这实在是太浪费了,因为rand.Int63()是我们算法中最耗时的部分了。

如果我们有52个字母,6位就能生成一个随机字符串。所以63个随机字节,可以利用63/6=10次。

译者注:使用了缓存,缓存了rand.Int63()方法返回的内容,使用10次,不过已经并不是协程安全的了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package approach5

import (
"fmt"
"math/rand"
"testing"
"time"
)

const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

const (
// 6 bits to represent a letter index
letterIdBits = 6
// All 1-bits as many as letterIdBits
letterIdMask = 1<<letterIdBits - 1
letterIdMax = 63 / letterIdBits
)

func randStr(n int) string {
b := make([]byte, n)
// A rand.Int63() generates 63 random bits, enough for letterIdMax letters!
for i, cache, remain := n-1, rand.Int63(), letterIdMax; i >= 0; {
if remain == 0 {
cache, remain = rand.Int63(), letterIdMax
}
if idx := int(cache & letterIdMask); idx < len(letters) {
b[i] = letters[idx]
i--
}
cache >>= letterIdBits
remain--
}
return string(b)
}

func TestApproach5(t *testing.T) {
rand.Seed(time.Now().UnixNano())

fmt.Println(randStr(10))
}

func BenchmarkApproach5(b *testing.B) {
rand.Seed(time.Now().UnixNano())
for i := 0; i < b.N; i++ {
_ = randStr(10)
}
}

6. Source

第5个方案非常好,能改进的点并不多。我们可以但不值得搞得很复杂。

让我们来找可以改进的点:随机数的生成源

crypto/rand的包提供了Read(b []byte)的函数,可以通过这个函数获得需要的随机比特数,只需要一次调用。不过并不能提升性能,因为
crypto/rand实现了一个密码学上的安全伪随机数,所以速度比较慢。

所以让我们坚持使用math/rand包,rand.Rand使用rand.Source作为随机位的来源,rand.Source是一个声明了Int63() int64
的接口:正是我们在最新解决方案中需要和使用的唯一方法。

所以我们不是真的需要rand.Randrand.Source包对于我们来说已经足够了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package approach6

import (
"fmt"
"math/rand"
"testing"
"time"
)

const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

var src = rand.NewSource(time.Now().UnixNano())

const (
// 6 bits to represent a letter index
letterIdBits = 6
// All 1-bits as many as letterIdBits
letterIdMask = 1<<letterIdBits - 1
letterIdMax = 63 / letterIdBits
)

func randStr(n int) string {
b := make([]byte, n)
// A rand.Int63() generates 63 random bits, enough for letterIdMax letters!
for i, cache, remain := n-1, src.Int63(), letterIdMax; i >= 0; {
if remain == 0 {
cache, remain = src.Int63(), letterIdMax
}
if idx := int(cache & letterIdMask); idx < len(letters) {
b[i] = letters[idx]
i--
}
cache >>= letterIdBits
remain--
}
return string(b)
}

func TestApproach6(t *testing.T) {
fmt.Println(randStr(10))
}

func BenchmarkApproach6(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = randStr(10)
}
}

注意到这里我们没有使用种子初始化rand了,取而代之的是初始化了rand.Source

还有一件需要注意的事,math/rand的文档指出

1
默认的Source是协程安全的

所以默认的Source比通过rand.NewSource()创建出来的Source要慢。不用处理协程并发场景,当然慢啦。

7. 使用 strings.Builder

之前的解决方案都返回了通过slice构造的字符串。最后的一次转换进行了一次拷贝,因为字符串是不可变的,如果转换的时候不进行拷贝,就无法保证转换完成之后,byte
slice再被修改后,字符串仍能保持不变。

Go1.10引入了strings.Builder,这是一个新的类型,和bytes.Buffer类似,用来构造字符串。底层使用[]byte
来构造内容,正是我们现在在做的,最后可以通过Builder.String()方法来获得最终的字符串值。但它很酷的地方在于,它无需执行刚才谈到的复制即可完成此操作。它敢这么做是因为它底层构造的
[]byte从未暴露出来,所以仍然可以保证没有人可以无意地、恶意地来修改已经生成的不可变字符串。

所以我们的下一个想法不是在slice中构建随机字符串,而是在 strings.Builder 的帮助下,一旦我们完成,我们就可以获取并返回结果,而无需复制。
这可能在速度方面有所帮助,并且在内存使用和分配方面肯定会有所帮助(译者注:等会在benchmark中会清晰地看到)。

对于物联网平台来说,规则引擎是其中一个很重要的功能,也叫消息流转功能,将消息流转到各类中间件、云产品中。在华为、AWS、Azure、阿里这四个物联网平台中,阿里不支持流转到S3/类S3存储中。本文对比一下华为云、AWS、Azure把设备消息流转到S3/类S3存储的功能

参考资料

华为云

规则粒度和限制

  • 规则配置粒度到OBS

  • 限制单用户配置100条规则,每个规则10个Action

功能实现

针对华为云,我测试了设备的消息上报转发到华为云OBS的功能。

流转规则需要指定obs桶,随后运行之后,华为云OBS体现为

  • 设备的每条消息都会在obs中存储为一个文件
  • 名称采用deviceId+毫秒级时间戳+后面4位数字

关键路径截图

配置规则时指定到obs桶

image-20210614184348066

单条消息单个文件

image-20210614184522958

优势

可以非常轻易地查询出单个设备的消息,因为文件名携带有毫秒级时间戳,还可以指定具体

劣势

用来做MapReduce的话,文件数目太多,由于S3云厂商往往通过API调用次数收费,不仅是速度,成本也会很高。

AWS

规则粒度和限制

  • 规则配置粒度到桶及Key,相当于华为云OBS桶+文件名
  • 限制规则每秒进行20k次运算
  • 限制最多拥有1000条规则
  • 限制每个规则最多10个action

功能实现

再次上报数据触发规则会把obs中的数据替换。(通过版本控制可以获取到老的数据)

关键路径截图

配置规则指定粒度到Key

image-20210614165708645

仅有一个Key,新值覆盖旧值

image-20210614170447237

优劣势

AWS的这个模式很适合存储每个设备的最新数据。不过由于规则数量上的限制,最多只能在S3上存储1000个键值对。可用性较低。可用于数量小于1000的设备,存储、查询最新数据。

Azure转发

规则粒度限制

  • 规则配置粒度到存储容器
  • Azure可配置存储入存储容器的批量频率和大小限制
  • 编码支持Avro和Json两种格式
  • 最多100条路由

功能实现

自上报事件,到存储中出现数据,azure是最慢的,azure做了批量的缓冲,达到batch的大小和时间要求后才会写入存储。

关键路径截图

配置路由规则

image-20210614181514437

存储中批量数据

因为选择了avro格式,所以vim打开是乱码,不过明显可以看到是多条数据

image-20210614181738380

优势

Azure的这种方式,比较适合做MapReduce类操作,相对华为云来说,Azure的文件数量大大减少,如果用于做MapReduce这类操作,因为文件碎片小,作业速度会比华为云快,而且由于云厂商对存储,通常以api调用次数收费,价格也会比华为云低。

劣势

不易针对单个设备进行查询。

需要了解的概念

  • DNAT映射:网络报文,目的地址转换。
  • 胖客户端:像Cassandra 客户端、Kafka客户端,客户端感知中间件拓扑,即客户端感知中间件部署、均衡,把请求发给指定的中间件服务器。
  • VPC:用户的私有网络环境。
  • VPC Peering:多个网段不冲突的VPC之间网络打通的方式,可跨用户。
  • 三方厂家:非公有云厂商的PaaS供应商。
  • 中间件用户:需要在公有云上运行中间件的公有云用户。

前言

现在,在公有云上买rediskafka这类组件已经变得非常普遍,由公有云提供的中间件往往能给你带来良好的体验,现在中间件在公有云上的交付模式大致分为三种。我对
Pulsar较为熟悉一些,大部分图例以消息中间件Pulsar举例。本文中成本指用在云服务或三方厂商上的成本,指人力成本时,会注明人力成本。

image-20210424145257826

公有云模式

公有云提供的中间件有得天独厚的优势,主要优势在以下几点

低廉的成本

  • 和三方厂家相比,对中间件用户来说,云厂商自运营的价格要低于三方厂家。
  • 和租户自运维相比,对中间件用户来说,云厂商价格略高一点点,但省去了大量的运维人力。

使用方便

云厂商可以把中间件的ip地址申请在你的vpc内,对任何应用程序来说,连接都是最方便的。无论是容器化部署、虚拟机部署、还是本vpc和其他vpc
peering打通的场景,都可以通信。

监控、运维系统对接

得力于云厂商的积累,能够方便地和云厂商的告警、统计系统对接,接收告警通知和报表等。甚至可以在手机app上查看云中间件的监控指标。

安全

  • 公有云有专门的团队时刻关注中间件安全漏洞,及时打上补丁
  • 有些白帽组织会在发现漏洞并公开发布前通知一些大的使用者,大的厂家可以在漏洞公布前提前预防

主要的劣势有

  • 云厂商提供的中间件有限,不是所有中间件的所有版本都支持
  • 即使业务有强烈诉求,也很难在中间件上做定制修改

租户自运维模式

租户自运维,就是用户自己部署中间件维护,仅利用云上的基础设施。主要的优势有

  • 可以在中间件上做定制修改
  • 可按任意规则进行部署,节约成本
  • 可部署任意版本中间件,比如代码一定要Kafka 0.11版本,公有云上该版本已停止售卖,代码不能改,那就只好自己部署Kafka的该版本了。

主要的劣势有

  • 和运维系统对接,需要人力成本。因为业务本身也需要对接运维系统,这部分人力成本较低,常见的困难点在于中间件的技术栈和团队不符(如Java团队尝试在go语言开发的中间件上对接运维系统)。
  • 中间件的网上运维、问题处理需要很大的人力成本,且对人员技术水平要求高。
  • 安全问题、漏洞,可能处理不及时,存在安全隐患。

三方厂家模式

三方厂家的主要优势有

  • 安全性方面,如果是社区主导者,有些白帽组织可能会提前告知发现的漏洞
  • 如果是社区的主导者,可以将您的新的合理需求优先级提高开发。

主要劣势

  • 运维上无法和公有云打通,三方厂家需要自己构建一套运维系统并通知到客户,对客户来说,也要适配三方厂家的运维方案。
  • 成本比公有云模式高
  • 易用性上面,网络是三方厂家的痛点,这点值得单独在下一小节详细描述

三方厂家网络模式

三方厂家最难做的就是,如何在厂家用户中间件用户隔离的情况下,提供中间件的接入点,可行的方式有以下几种

VPC Peering打通模式

Peering可以说是最简单的网络打通模式,可以给用户不错的私网体验(如果走公网,数据至少要是tls加密的,不然会有数据在公网上被人监听的风险)。只需要跟用户规划一个不冲突的网段。

image-20210424110409903

这个网段不需要很大,一般26、27就可以满足要求,27的网段已经可以部署30台虚拟机了,值得一提的是,这种打通模式下,一般不会选择容器化部署,都是虚拟机部署,原因有二

  • 如果客户也使用k8s容器化部署,两个容器化集群互通,对网设的要求很高,限制很大。
  • 仅仅部署一个中间件的话,加上k8s集群的部署,成本占比太高

VPC Peering的主要挑战有

  • 自动化:自动化较为困难,需要考虑如何获取尽量少的客户权限,自动化完成整个流程。
  • 成本:Peering的模式下,三方厂商几乎只能针对每个用户的集群单独Peering,很多地方无法均摊成本。想要客户购买,需要对客户有很强的吸引力。
  • 网络规划受限,路由规则配置复杂:如果客户不仅仅和三方厂家建立了Peering关系,又和其他VPC建立了Peering关系,这三方网络不能冲突,还要互相配置路由规则。这需要进行细致的网络规划,毕竟网络一旦冲突,解决的代价很大。

LB接入模式

LB接入模式,在三方厂家模式下,只能通过公网的方式接入,一般给用户提供域名或ip地址接入,域名使用的较多,像华为云设备接入服务、Azure
物联网服务等都是提供域名,主要的原因

在EIP资源有限的情况下,将部分用户的域名解析成一个地址,既能节约EIP资源,也能在一些基础设施上共用资源,节约成本(如共k8s、共数据库、共Bookkeeper等)

image-20210507194200489

LB接入的限制是,中间件需要支持可负载均衡访问,如果中间件的客户端是有状态的胖客户端(如Pulsar、Cassandra),需要中间有一层无状态的代理。

华为云VPCEP模式

VPCEP,VPC终端节点,是华为云的云服务,可以将一个VPC的地址,映射到另一个VPC的地址,在客户端的视角来看,本质上是做了一次DNAT地址映射。

image-20210424230158506

VPCEP的主要限制是连接中间件经过了DNAT映射。需要对应中间件支持DNAT的方式访问。另一方面其他云上不一定有该能力。三方厂商一般也希望做一个统一的方案来适配所有云厂商,所以目前使用的并不是很多。

三方厂商模式的劣势主要是

  • 公网的方式下,时延高,难以满足有低时延要求的客户,且带宽费用也占比很大
  • Peering模式,成本高,费用贵

并且如果公有云厂商没有提供新的用于用户和用户之间打通网络的方案,那么对三方厂商来说,在接入方面,恐怕很难再有所突破了。

注:这里的网络打通三种方式不限于三方场景和用户之间,还可以扩展到用户和用户之间网络打通进行通信,这里不再扩展了。

建议选型流程

中型和长尾用户可按照下图流程进行选型。

对于大型用户,可能会维护自己的核心中间件,紧密联系自己的业务,在上面做定制开发,不但降低成本,并且提升业务竞争力。非核心中间件,也可参照下图流程进行选型。

image-20210507195340204

总结

公有云 三方厂商 自建
机器成本
处理测试、网上问题工作量
运维系统对接开发量
接入灵活性
安全

大型系统中的证书管理

随着安全的要求,现在我们在越来越多的通信中使用TLS加密。下图是一个微服务架构下数据流向的例子

cert-manager1

  • 蓝色部分,即和三方交互时需要TLS加密认证
  • 红色部分,各个微服务、消息中间件等通信需要TLS加密认证
  • 绿色部分,各个微服务和存储层通信也需要TLS加密认证

安全上对我们的要求逐步变化为,仅蓝色使用TLS=》蓝色和红色使用TLS=》全部使用TLS加密

证书管理的必要性

从安全的角度上来说,我们最好能支持证书的更换和热加载。如果您的业务当前使用加密的场景不多,可能暂时看不到证书管理的意义。但是当你在各个方面使用TLS更加频繁之后,会发现证书管理可带来如下好处:

  • 可以通过抽象出场景,通过场景和证书的关联联系,在各个地方通信使用的证书,可以统一更换。
  • 统一提供证书过期告警等功能
  • 统一提供证书的变更通知,通知到各个实例

以我在工作中接触到的两个基础PAAS平台,都有证书管理的功能,可见证书管理的必要性。

PS: 开源组件大多都拥有证书配置能力,没有可对接证书管理的能力,但这个能力很难贡献给社区,需要自己开发。

证书管理概念

在TLS会话中,从依赖的证书文件角度来看,可以分为加密流程和验证流程。

加密证书

TLS加密流程的证书,包含证书链文件和密钥

验证证书

TLS验证流程的证书,仅包含证书链文件

拆分为加密流程和验证流程的合理性

这使得加密流程证书和验证流程证书可以互相独立的替换,更方便在大型场景下复用证书。

让我们来假设如下的场景:

cert-manager2

客户A、客户B、客户C、客户D的验证流程证书自然不相同,但服务跟客户交互的时候,使用的加密流程证书确实同一份。如果将两个阶段的证书合一,那么在更换证书的时候,就需要更新4份数据,当你有1000名用户的时候,这个数字将会是1000,这对于存储和应用程序来说都是不小的冲击。

Scene

Scene是在一个会话中,代表会话和请求证书、验证证书的绑定关系。Scene和请求证书、验证证书都是1:1的关系。这使得我们不仅仅可以修改证书文件,也可以对TLS会话中使用的证书进行修改。在证书无法复用,且证书绑定了多个场景的时候,针对单个场景修改其绑定的证书。

以上图作为例子,假设客户D有特殊的要求,要求加密流程使用特定的证书或密钥,我们就可以将客户D的场景绑定到客户D独有的加密证书

多集群管理

如果证书管理需要管理多个集群,那么证书和Scene前面可以加上层级来隔离,如环境、集群等。

对小型系统的建议

如果规模不大,且TLS场景有限,需要考虑一下有无拆分加密证书和验证证书的必要,可以合一,应用程序直接以合一的证书id来关联,而非场景id。虽不方便复用,但大大降低了复杂性。

证书管理的功能

cert-manager3

证书管理场景

设定一个TLS会话

cert-manager4

使用TLS会话

这要求应用程序持久化场景信息

cert-manager5

组织架构相关

大型系统下,证书管理是一个必须的组件,且一定是由团队最底层的组织架构承接。如若不然,那么由底层组织架构维护的组件,因为依赖关系,无法基于证书管理来统一实现证书的更换和过期告警。除非不基于证书管理自己构筑一套能力。

TL;DR

随着组件和使用加密场景的不断扩大,证书管理是一个必须的组件,通过抽象出场景的概念来复用证书,通过变更通知在微服务模式下快速更换所有微服务实例上的证书,并提供统一的证书过期告警功能来提醒管理员更换证书。

0%