kubernetes中容器获取IP地址是一个常见的需求,常见的有两种获取IP地址的方式

kubernetes环境变量注入

通过在部署时,container下的env中配置如下yaml

1
2
3
4
5
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP

进入容器就可以根据环境变量获取到容器IP

1
2
# echo $POD_IP
172.17.0.2

通过shell脚本获取

通过ip命令(推荐)

1
2
# ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1
172.17.0.2

注意这里一定要用inet\b,不要用inet。使用inet的话,在Ipv6双栈场景下会因为匹配到inet6获取到错误的结果, Ipv6双栈场景下ip命令的部分输出结果如下图所示

1
2
inet 172.17.0.2/16 brd 172.17.255.255 scope global eth0
inet6 fe80::ffff prefixlen 64 scopeid 0x20<lin>

通过ifconfig命令(不推荐)

不推荐使用ifconfig命令的原因是,这个命令已经废弃,将会逐步删除

1
ifconfig eth0 | grep 'inet\b' | awk '{print $2}' | cut -d/ -f1

同样需要使用inet\b,不要使用inet

TLDR

优先配置如下yaml进行环境变量注入,其次使用ip addr show eth0 | grep “inet\b” | awk ‘{print $2}’ | cut -d/ -f1命令获取

1
2
3
4
5
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP

在一些场景下,您的kubernetes集群已经搭建完成了,但是还需要修改一些核心组件的参数,如etcd、kube-apiserver、kube-scheduler、kube-controller-manager等。

通过kubectl get pod -owide -n kube-system 可以查看到这些核心容器。

1
2
3
4
5
6
NAME                               READY   STATUS    RESTARTS       AGE
coredns-78fcd69978-rdmjm 1/1 Running 11 (23s ago) 281d
etcd-$NODE1 1/1 Running 13 (23s ago) 281d
etcd-$NODE2 1/1 Running 13 (23s ago) 281d
etcd-$NODE3 1/1 Running 13 (23s ago) 281d
.....

以etcd为例,etcd的参数就在pod中的commands参数里。可以通过kubectl describe pod etcd-$NODENAME -n kube-system来查看(省略部分参数)

1
2
3
4
5
6
7
Name: etcd-$NODENAME
Namespace: kube-system
Containers:
etcd:
Command:
--client-cert-auth=true
--trusted-ca-file=/etc/kubernetes/pki/etcd/ca.crt

然而,如果您尝试编辑pod中的参数,会发现它们是不可修改的。

不过,如果您需要修改参数,还有另一个办法,通过修改/etc/kubernetes/manifests/下的yaml文件来修改运行中kubernetes集群中”系统”Pod的参数。原理是,当您把yaml文件修改后,kubelet会自动监听yaml文件的变更,并重新拉起本机器上的pod。

举个例子,如果您希望关闭etcd集群对客户端的认证,那么您可以修改/etc/kubernetes/mainfiest/etcd.yaml,将client-cert-auth设置为false,把trusted-ca-file去掉。注意:三台master机器节点都需要执行此操作

偶尔也回首一下处理的棘手问题吧。问题的现象是,通过kubernetes get node输出的ip不是期望的ip地址。大概如下所示

1
2
3
4
ip addr

eth0 ip1
eth0:xxx ip2

最终输出的不是预期的ip1地址,而是ip2地址。

按藤摸瓜,kubernetes把节点信息保存在/registry/minions/$node-name中的InternalIp 字段。

InternalIp是如何确定的呢,这段代码位于pkg/kubelet/nodestatus/setters.go

1
2
3
4
5
6
7
// 1) Use nodeIP if set (and not "0.0.0.0"/"::")
// 2) If the user has specified an IP to HostnameOverride, use it
// 3) Lookup the IP from node name by DNS
// 4) Try to get the IP from the network interface used as default gateway
//
// For steps 3 and 4, IPv4 addresses are preferred to IPv6 addresses
// unless nodeIP is "::", in which case it is reversed.

我们的场景下没有手动设置nodeIp,如需设置通过kubelet命令行即可设置 –node-ip=localhost,最终通过如下的go函数获取ip地址

1
addrs, _ = net.LookupIP(node.Name)

对这行go函数进行strace追溯,最终调用了c函数,getaddrinfo函数。getaddrinfo底层是发起了netlink请求,开启netlink的抓包

1
2
3
4
5
modprobe nlmon
ip link add nlmon0 type nlmon
ip link set dev nlmon0 up
tcpdump -i nlmon0 -w netlinik.pcap
# 使用nlmon 驱动模块,这个nlmon 驱动模块会注册一个 netlink tap 口,用户态向内核发送 netlink 消息、内核向用户态发送 netlink 消息,报文都会经过这个 tap 口。

通过抓包我看到通过netlink报文请求返回的ip地址顺序都是合乎预期的,只能是getaddrinfo函数修改了返回的顺序

Google了一下发现是getaddrinfo支持了rfc3484导致了ip的重新排序,代码地址glibc/sysdeps/posix/getaddrinfo.c

RFC3484 总共有十个规则,比较关键的有

Rule9

1
2
3
4
5
6
Rule 9:  Use longest matching prefix.
When DA and DB belong to the same address family (both are IPv6 or
both are IPv4): If CommonPrefixLen(DA, Source(DA)) >
CommonPrefixLen(DB, Source(DB)), then prefer DA. Similarly, if
CommonPrefixLen(DA, Source(DA)) < CommonPrefixLen(DB, Source(DB)),
then prefer DB.

举个例子,假如机器的ip地址是 172.18.45.2/24,它会更青睐于172.18.45.6而不是172.31.80.8。这个RFC存在较大的争议,它与dns轮询策略不兼容,如:dns服务器轮询返回多个ip地址,客户端总是选择第一个ip连接。与这个策略存在很大的冲突。并且社区内也有投票试图停止对RFC3484 rule9的适配, 但是最终被拒绝了。

根据分析,认为是ip2的地址小于ip1的地址,最终glibc排序的时候把ip2放在了前面。最终我们给kubelet配置了eth0地址的–node-ip,解决了这个问题。

什么是umask, umask即user file-creation mask. 用来控制最终创建文件的权限。

umask是进程级属性,通常是由login shell设置,可以通过系统调用umask()或者命令umask permission来修改,通过umask命令来查询,linux内核版本4.7之后,还可以通过cat /proc/self/status|grep -i umask 查询,示例如下

1
2
3
4
5
6
7
8
9
hezhangjian:~/masktest $ umask
0022
hezhangjian:~/masktest $ umask 0077
hezhangjian:~/masktest $ umask
0077
hezhangjian:~/masktest $ umask 0022
hezhangjian:~/masktest $ umask
0022
hezhangjian:~/masktest $

一般来说,umask的系统默认值在**/etc/login.defs** 中设置

1
2
3
4
5
6
7
hezhangjian:~ $cat /etc/login.defs|grep -i umask
# UMASK Default "umask" value.
# UMASK is the default umask value for pam_umask and is used by
# 022 is the "historical" value in Debian for UMASK
# If USERGROUPS_ENAB is set to "yes", that will modify this UMASK default value
UMASK 022
# Other former uses of this variable such as setting the umask when
  • 最常见的默认的umask值是022,目录权限755,文件权限644
  • 077 的 umask 适用于私有的系统,则其他用户无法读取或写入您的数据。

针对标准函数open来说,最终写入磁盘的权限位是由mode参数和用户的文件创建掩码(umask)执行按位与操作而得到。

假设当umask为0022时,创建一个具有0666权限的文件,就会进行运算决定文件的最终权限,先对掩码取非,再和指定的权限进行binary-And操作,如图所示

linux-umask-analyze

示例代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>

int main(int argc, char *argv[]) {
int fd;
if (argc != 2) {
fprintf(stderr, "usage: %s <file>", argv[0]);
exit(1);
}
fprintf(stdout, "create file %s", argv[1]);
fd = open(argv[1], O_WRONLY | O_CREAT | O_TRUNC, 0666);
if (fd == -1) {
perror("open");
exit(1);
}
close(fd);
}

结果如下,权限644,符合预期

1
2
3
4
5
ll
total 8
drwxr-xr-x 2 hezhangjian hezhangjian 4096 Nov 8 06:25 .
drwxr-xr-x 15 hezhangjian hezhangjian 4096 Nov 8 06:18 ..
-rw-r--r-- 1 hezhangjian hezhangjian 0 Nov 8 06:25 my.txt

本文代码均已上传到gitee
calcite的parser代码生成分为如下两个步骤

calcite-parser-code-generate-process

生成Parse.jj

文件目录如下

1
2
3
4
5
6
7
8
9
10
├── pom.xml
└── src
├── main
│   ├── codegen
│   │   ├── config.fmpp
│   │   ├── includes
│   │   │   ├── compoundIdentifier.ftl
│   │   │   └── parserImpls.ftl
│   │   └── templates
│   │   └── Parser.jj

添加calcite dependency

1
2
3
4
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
</dependency>

配置drill-fmpp-maven-plugin插件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<plugin>
<groupId>org.apache.drill.tools</groupId>
<artifactId>drill-fmpp-maven-plugin</artifactId>
<executions>
<execution>
<configuration>
<config>src/main/codegen/config.fmpp</config>
<output>${project.build.directory}/generated-sources/fmpp</output>
<templates>src/main/codegen/templates</templates>
</configuration>
<id>generate-fmpp-sources</id>
<phase>validate</phase>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
</plugin>

codegen 模块的文件都拷贝自对应版本的calclite core/src/main/codegen路径 https://github.com/apache/calcite/tree/main/core/src/main/codegen

然后把https://github.com/apache/calcite/blob/main/core/src/main/codegen/default_config.fmpp 中的parser属性与config.fmpp中的parser属性合并。就可以通过mvn package命令生成Parser.jj了。当然,如果有定制化修改的需求,也可以在这个阶段修改config.fmpp

calcite-parser-code-generator-fmpp

Parser.jj生成java代码

文件目录如下

1
2
3
4
5
├── pom.xml
├── src
│   ├── main
│   │   ├── codegen
│   │   │   └── Parser.jj

Parser.jj就是我们上一步生成的Parser.jj,如果有什么想要的定制化修改,也可以在这个步骤改入到Parser.jj中。

添加calcite dependency

1
2
3
4
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
</dependency>

配置javacc-maven-plugin如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
<executions>
<execution>
<id>javacc</id>
<goals>
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/codegen</sourceDirectory>
<includes>
<include>**/Parser.jj</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>

生成代码

calcite-parser-code-generator-javacc

无Parser.jj定制化修改,一步生成

如果不需要对Parser.jj进行定制化修改,那么可以通过连续运行两个插件来生成代码,这里给出pom文件样例,不再赘述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
<plugin>
<groupId>org.apache.drill.tools</groupId>
<artifactId>drill-fmpp-maven-plugin</artifactId>
<executions>
<execution>
<configuration>
<config>src/main/codegen/config.fmpp</config>
<output>${project.build.directory}/generated-sources/fmpp</output>
<templates>src/main/codegen/templates</templates>
</configuration>
<id>generate-fmpp-sources</id>
<phase>validate</phase>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
<executions>
<execution>
<id>javacc</id>
<goals>
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.build.directory}/generated-sources/fmpp</sourceDirectory>
<includes>
<include>**/Parser.jj</include>
</includes>
<lookAhead>2</lookAhead>
<isStatic>false</isStatic>
</configuration>
</execution>
<execution>
<id>javacc-test</id>
<phase>generate-test-sources</phase>
<goals>
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.build.directory}/generated-test-sources/fmpp</sourceDirectory>
<outputDirectory>${project.build.directory}/generated-test-sources/javacc</outputDirectory>
<includes>
<include>**/Parser.jj</include>
</includes>
<isStatic>false</isStatic>
<ignoreCase>true</ignoreCase>
<unicodeInput>true</unicodeInput>
</configuration>
</execution>
</executions>
</plugin>

lvs是做什么的

lvs通常用做tcp/udp协议的四层负载均衡

lvs-brief

相比也可以用于四层负载的Nginx组件,Lvs因为运行在内核态,性能高是它的主要优势,同样,因为运行在内核态中,无法像Nginx那样,对四层的tls做卸载等动作。

lvs性能相关指标(用户视角)

客户端的连接数

  • UDP模式下,按连接超时时间计算(根据业务需求决定)。可通过ipvsadm -l --timeout来查看udp超时时间
  • TCP模式下,即为tcp连接数

客户端请求流量

即client与lvs、lvs与RS之间交互的流量

客户端请求平均包大小

即client与lvs、lvs与RS之间的平均包大小

lvs性能相关参数

会话超时时间

查看

1
ipvsadm -l --timeout

修改

1
ipvsadm --set ${tcptimeout} ${tcpfintimeout} ${udptimeout}

vm conntrack最大个数

查看

1
sysctl -a |grep net.netfilter.nf_conntrack_max

查看当前nf_conntrack个数

1
2
3
4
# 方式一
conntrack -C
# 方式二
cat /proc/net/nf_conntrack | wc -l

修改

1
sysctl -w net.netfilter.nf_conntrack_max=1024

hashsize

什么是hashsize

hashsize也就是nf_conntrack_buckets,如果不手动指定。linux会根据机器的内存计算。如果要支持海量的nf_conntrack,则可以适当调大。

1
2
3
4
5
6
7
8
9
10
11
12
  // nf_conntrack_core.c
nf_conntrack_htable_size
= (((nr_pages << PAGE_SHIFT) / 16384)
/ sizeof(struct hlist_head));
if (BITS_PER_LONG >= 64 &&
nr_pages > (4 * (1024 * 1024 * 1024 / PAGE_SIZE)))
nf_conntrack_htable_size = 262144;
else if (nr_pages > (1024 * 1024 * 1024 / PAGE_SIZE))
nf_conntrack_htable_size = 65536;

if (nf_conntrack_htable_size < 1024)
nf_conntrack_htable_size = 1024;

hlist_head的大小在64位的机器下大小为16

查看

1
cat /sys/module/nf_conntrack/parameters/hashsize

修改 (方式一)

1
echo 65536 > /sys/module/nf_conntrack/parameters/hashsize

修改(方式二)永久生效

1
2
3
4
5
6
# exmaple file, you can modify this config if exists. File name doesn't matter.
# 样例文件,你可以修改已存在的这个文件。文件名称并不重要。
touch /etc/modprobe.d/lvs.conf
echo "options nf_conntrack hashsize=65536" >> /etc/modprobe.d/lvs.conf
# then you need reboot
# 需要重试来使配置生效

文件句柄数

查看

1
ulimit -n

修改

不同的linux发行版,修改方式不太一样,以RedHat为例

1
2
num=`ulimit -n`
sed -i "s|$num|65536|g" /etc/security/limits.d/*-nofile.conf

lvs性能瓶颈

虚拟机内存

contnrack使用slab分配内存,可以通过slabtop命令查看nf_conntrack模块占用的内存。当连接数较高时,Lvs的内存瓶颈在于会话管理。

conntrack最大理论内存占用为

1
max_mem_used = conntrack * max * sizeof (struct nf_conntrack) + conntrack_buckets * sizeof (struct list_head)

使用如下python代码计算

1
2
3
4
5
6
7
8
9
10
import ctypes

# 这个是nf_conntrack的动态库所在路径
# libnetfilter git地址 git://git.netfilter.org/libnetfilter_conntrack
LIBNETFILTER_CONNTRACK = '/usr/lib/aarch64-linux-gnu/libnetfilter_conntrack.so.3.7.0'
nfct = ctypes.CDLL(LIBNETFILTER_CONNTRACK)
print("max size of struct nf_conntrack:")
print(nfct.nfct_maxsize())
print("sizeof(struct list_head):")
print(ctypes.sizeof(ctypes.c_void_p) * 2)

其中nfct_maxsize出自于git://git.netfilter.org/libnetfilter_conntrack中的src/conntrack/api.c

1
2
3
/**
* nfct_maxsize - return the maximum size in bytes of a conntrack object
*/

在如下操作系统下

1
2
uname -a
> Linux primary 5.4.0-122-generic #138-Ubuntu SMP Wed Jun 22 15:05:39 UTC 2022 aarch64 aarch64 aarch64 GNU/Linux

以100万conntrack_max,65536buckets为例,占用的内存为

1_000_000 * 392 + 65536 * 16 约等于 373.84 + 1 为374M内存

网卡流量

最大进出带宽。在云上,通常由云厂商限制。如果你将lvs上面的浮动Ip通过EIP的方式暴露出去(这很常见),还需要考虑EIP自身的带宽

网卡进出包个数(PPS)

最大进出包个数

虚拟机能支持的最大网络连接数

ECS上可以支持的最大网络连接数。在云上,通常由云厂商限制

Lvs监控&扩容

cpu使用率

可在超过百分之80的时候告警。处理方式:

  • 如果内存还没有到达瓶颈,可以通过扩大hashsize的方式,降低hash链上元素的个数,减少匹配消耗的cpu
  • 如果内存水位也较高。对CPU进行扩容

内存使用率

可在超过内存容量百分之80的时候告警。处理方式:扩容内存

conntrack个数

通过conntrack -Ccat /proc/net/nf_conntrack | wc -l, 定期进行统计,使用sysctl -w net.netfilter.nf_conntrack_max进行扩容

网卡流量、网卡进出包个数

可以利用云厂商的监控或nicstat命令查看。处理方式:扩容网卡

最大网络连接数

可以利用云厂商的监控或netstat -an|egrep "tcp|udp"|grep -v "LISTEN"|wc -lss -tun state all | grep -v LISTEN | wc -l查看。处理方式:扩容ECS规格

EIP带宽

通过云厂商的指标来监控。处理方式,扩容EIP的BGP带宽

prometheus 磁盘布局

采集到的数据每两个小时形成一个block。每个block由一个目录组成,并存放在data路径下。该目录包含一个包含该时间窗口的所有时间序列样本的块子目录、一个元数据文件和一个索引文件(将metric_name和label索引到目录下的时间序列)。 chunks 目录中的样本默认组合成一个或多个段文件,每个段文件最大为 512MB。 当通过 API 删除系列时,删除记录存储在单独的 tombstone 文件中(而不是立即从块段中删除数据)。

当前正在写入的块保存在内存中,没有完全持久化。通过WAL日志来防止崩溃丢失数据。预写日志分为数节(segments)保存在wal文件夹中。这些文件包含尚未压缩的原始数据; 因此它们比常规块文件大得多。 Prometheus 将至少保留三个预写日志文件。在高流量下,会保留三个以上的 WAL 文件,以便保留至少两个小时的原始数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
./data
├── 01BKGV7JBM69T2G1BGBGM6KB12
│ └── meta.json
├── 01BKGTZQ1SYQJTR4PB43C8PD98
│ ├── chunks
│ │ └── 000001
│ ├── tombstones
│ ├── index
│ └── meta.json
├── 01BKGTZQ1HHWHV8FBJXW1Y3W0K
│ └── meta.json
├── 01BKGV7JC0RY8A6MACW02A2PJD
│ ├── chunks
│ │ └── 000001
│ ├── tombstones
│ ├── index
│ └── meta.json
├── chunks_head
│ └── 000001
└── wal
├── 000000002
└── checkpoint.00000001
└── 00000000

prometheus概念

  • Label: 标签,string格式的kv组合
  • series: 时间序列,label的组合
  • chunk: 时间,value的数据

prometheus索引格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
┌────────────────────────────┬─────────────────────┐
│ magic(0xBAAAD700) <4b> │ version(1) <1 byte> │
├────────────────────────────┴─────────────────────┤
│ ┌──────────────────────────────────────────────┐ │
│ │ Symbol Table │ │
│ ├──────────────────────────────────────────────┤ │
│ │ Series │ │
│ ├──────────────────────────────────────────────┤ │
│ │ Postings 1 │ │
│ ├──────────────────────────────────────────────┤ │
│ │ ... │ │
│ ├──────────────────────────────────────────────┤ │
│ │ Postings N │ │
│ ├──────────────────────────────────────────────┤ │
│ │ Postings Offset Table │ │
│ ├──────────────────────────────────────────────┤ │
│ │ TOC │ │
│ └──────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────┘

写入索引时,可以在上面列出的主要部分之间添加任意数量的0字节作为填充。顺序扫描文件时,必须跳过部分间的任意0字节。

下面描述的大部分部分都以 len 字段开头。 它总是指定就在尾随 CRC32 校验和之前的字节数。 校验和就计算这些字节的校验和(不包含len字段)

符号表

符号表包含已存储序列的标签对中出现的重复数据删除字符串的排序列表。 它们可以从后续部分中引用,并显着减少总索引大小。

该部分包含一系列字符串entry,每个entry都以字符串的原始字节长度为前缀。 所有字符串均采用 utf-8 编码。 字符串由顺序索引引用。 字符串按字典顺序升序排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
┌────────────────────┬─────────────────────┐
│ len <4b> │ #symbols <4b> │
├────────────────────┴─────────────────────┤
│ ┌──────────────────────┬───────────────┐ │
│ │ len(str_1) <uvarint> │ str_1 <bytes> │ │
│ ├──────────────────────┴───────────────┤ │
│ │ . . . │ │
│ ├──────────────────────┬───────────────┤ │
│ │ len(str_n) <uvarint> │ str_n <bytes> │ │
│ └──────────────────────┴───────────────┘ │
├──────────────────────────────────────────┤
│ CRC32 <4b> │
└──────────────────────────────────────────┘

序列 series

保存一个具体的时间序列,其中包含系列的label集合和block中的chunks。

每个series都是16字节对齐。series的id为偏移量除以16。series ID 的排序列表也就是series label的字典排序列表。

1
2
3
4
5
6
7
8
9
┌───────────────────────────────────────┐
│ ┌───────────────────────────────────┐ │
│ │ series_1 │ │
│ ├───────────────────────────────────┤ │
│ │ . . . │ │
│ ├───────────────────────────────────┤ │
│ │ series_n │ │
│ └───────────────────────────────────┘ │
└───────────────────────────────────────┘

每一个series先保存label的数量,然后是包含label键值对的引用。 标签对按字典顺序排序。然后是series涉及的索引块的个数,然后是一系列元数据条目,其中包含块的最小 (mint) 和最大 (maxt) 时间戳以及对其在块文件中位置的引用。mint 是第一个样本的时间,maxt 是块中最后一个样本的时间。 在索引中保存时间范围数据, 允许按照时间范围删除数据时,如果时间范围匹配,不需要直接访问时间数据。

空间大小优化: 第一个块的 mint 被存储,它的 maxt 被存储为一个增量,并且 mintmaxt 被编码为后续块的前一个时间的增量。 类似的,第一个chunk的引用被存储,下一个引用被存储为前一个chunk的增量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
┌──────────────────────────────────────────────────────────────────────────┐
│ len <uvarint> │
├──────────────────────────────────────────────────────────────────────────┤
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ labels count <uvarint64> │ │
│ ├──────────────────────────────────────────────────────────────────────┤ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ ref(l_i.name) <uvarint32> │ │ │
│ │ ├────────────────────────────────────────────┤ │ │
│ │ │ ref(l_i.value) <uvarint32> │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ │ ... │ │
│ ├──────────────────────────────────────────────────────────────────────┤ │
│ │ chunks count <uvarint64> │ │
│ ├──────────────────────────────────────────────────────────────────────┤ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ c_0.mint <varint64> │ │ │
│ │ ├────────────────────────────────────────────┤ │ │
│ │ │ c_0.maxt - c_0.mint <uvarint64> │ │ │
│ │ ├────────────────────────────────────────────┤ │ │
│ │ │ ref(c_0.data) <uvarint64> │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ c_i.mint - c_i-1.maxt <uvarint64> │ │ │
│ │ ├────────────────────────────────────────────┤ │ │
│ │ │ c_i.maxt - c_i.mint <uvarint64> │ │ │
│ │ ├────────────────────────────────────────────┤ │ │
│ │ │ ref(c_i.data) - ref(c_i-1.data) <varint64> │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ │ ... │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
├──────────────────────────────────────────────────────────────────────────┤
│ CRC32 <4b> │
└──────────────────────────────────────────────────────────────────────────┘

Posting

Posting这一节存放着关于series引用的单调递增列表,简单来说就是存放id和时间序列的对应关系

1
2
3
4
5
6
7
8
9
10
11
12
13
┌────────────────────┬────────────────────┐
│ len <4b> │ #entries <4b> │
├────────────────────┴────────────────────┤
│ ┌─────────────────────────────────────┐ │
│ │ ref(series_1) <4b> │ │
│ ├─────────────────────────────────────┤ │
│ │ ... │ │
│ ├─────────────────────────────────────┤ │
│ │ ref(series_n) <4b> │ │
│ └─────────────────────────────────────┘ │
├─────────────────────────────────────────┤
│ CRC32 <4b> │
└─────────────────────────────────────────┘

Posting sections的顺序由postings offset table决定。

Posting Offset Table

postings offset table包含着一系列posting offset entry,根据label的名称和值排序。每一个posting offset entry存放着label的键值对以及在posting sections中其series列表的偏移量。用来跟踪posting sections。当index文件加载时,它们将部分加载到内存中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
┌─────────────────────┬──────────────────────┐
│ len <4b> │ #entries <4b> │
├─────────────────────┴──────────────────────┤
│ ┌────────────────────────────────────────┐ │
│ │ n = 2 <1b> │ │
│ ├──────────────────────┬─────────────────┤ │
│ │ len(name) <uvarint> │ name <bytes> │ │
│ ├──────────────────────┼─────────────────┤ │
│ │ len(value) <uvarint> │ value <bytes> │ │
│ ├──────────────────────┴─────────────────┤ │
│ │ offset <uvarint64> │ │
│ └────────────────────────────────────────┘ │
│ . . . │
├────────────────────────────────────────────┤
│ CRC32 <4b> │
└────────────────────────────────────────────┘

TOC

table of contents是整个索引的入口点,并指向文件中的各个部分。 如果引用为零,则表示相应的部分不存在,查找时应返回空结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
┌─────────────────────────────────────────┐
│ ref(symbols) <8b> │
├─────────────────────────────────────────┤
│ ref(series) <8b> │
├─────────────────────────────────────────┤
│ ref(label indices start) <8b> │
├─────────────────────────────────────────┤
│ ref(label offset table) <8b> │
├─────────────────────────────────────────┤
│ ref(postings start) <8b> │
├─────────────────────────────────────────┤
│ ref(postings offset table) <8b> │
├─────────────────────────────────────────┤
│ CRC32 <4b> │
└─────────────────────────────────────────┘

chunks 磁盘格式

chunks文件创建在block中的chunks/目录中。 每个段文件的最大大小为 512MB。
文件中的chunk由uint64的索引组织,索引低四位为文件内偏移,高四位为段序列号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
┌──────────────────────────────┐
│ magic(0x85BD40DD) <4 byte> │
├──────────────────────────────┤
│ version(1) <1 byte> │
├──────────────────────────────┤
│ padding(0) <3 byte> │
├──────────────────────────────┤
│ ┌──────────────────────────┐ │
│ │ Chunk 1 │ │
│ ├──────────────────────────┤ │
│ │ ... │ │
│ ├──────────────────────────┤ │
│ │ Chunk N │ │
│ └──────────────────────────┘ │
└──────────────────────────────┘

chunks中的Chunk格式

1
2
3
┌───────────────┬───────────────────┬──────────────┬────────────────┐
│ len <uvarint> │ encoding <1 byte> │ data <bytes> │ CRC32 <4 byte> │
└───────────────┴───────────────────┴──────────────┴────────────────┘

查询数据

code

查询的prometheus方法签名

1
Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet

支持从block中,remote等各种地方查询获取数据

prometheus会在内存中维护一个数据结构

1
2
3
// Map of LabelName to a list of some LabelValues's position in the offset table.
// The first and last values for each name are always present.
postings map[string][]postingOffset

在内存中,保留每个label name,并且每n个保存label值,降低内存的占用。但是第一个和最后一个值总是保存在内存中。

查询数据流程

prometheus-tsdb-index

参考资料

物联网是现在比较热门的软件领域,众多云厂商都有自己的物联网平台,而物联网平台其中一个核心的模块就是Mqtt网关。

使用Netty搭建高性能服务器是一个常见的选择,Netty自带Mqtt的编解码,我们很快就可以在Netty服务器中插入Mqtt的编解码handler,由netty已经编写好的模块帮助我们做mqtt的编解码,我们仅需自己处理mqtt协议业务的处理,如登录,订阅分发等。

NettyServer使用MqttHandler编解码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.github.hezhangjian.demo.iot.mqtt.broker;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

/**
* @author hezhangjian
*/
@Slf4j
public class MqttServer {

public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new MqttDecoder());
p.addLast(MqttEncoder.INSTANCE);
}
});

// Start the server.
ChannelFuture f = b.bind(1883).sync();

// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

}

客户端采用eclipse mqtt client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.github.hezhangjian.demo.mqtt.client;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
* @author hezhangjian
*/
@Slf4j
public class MqttClientEx {

public static void main(String[] args) throws Exception {
String topic = "MQTT Examples";
String content = "Message from MqttPublishSample";
int qos = 2;
String broker = "tcp://127.0.0.1:1883";
String clientId = "JavaSample";
MemoryPersistence persistence = new MemoryPersistence();

try {
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: " + broker);
sampleClient.connect(connOpts);
System.out.println("Connected");
System.out.println("Publishing message: " + content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.publish(topic, message);
System.out.println("Message published");
sampleClient.disconnect();
System.out.println("Disconnected");
System.exit(0);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}

}

然后我们先运行MqttServer,再运行MqttClient,发现MqttClient卡住了

image-20201218170957411

这是为什么呢,我们通过抓包发现仅仅只有客户端发送了Mqtt connect信息,服务端并没有响应

image-20201218171403188

但是根据mqtt标准协议,发送MqttConnect,必须有CONNAck

image-20201218180301759

所以我们要在mqttConn后,业务上返回ConnAck消息,下一节我们在这个基础上自己实现Handler返回Connack消息

参考

我们先创建一个MqttHandler,让他继承ChannelInboundHandlerAdapter, 用来接力MqttDecoder解码完成后的消息,这里要继承其中的channelRead方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.github.hezhangjian.demo.iot.mqtt.broker;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

/**
* 处理Mqtt协议栈
* @author hezhangjian
*/
@Slf4j
public class MqttHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
}

}

然后把这个handler加入到netty的职责链中,解码顺序

image-20201218204356576

打印出connectMessage如下

1
[MqttConnectMessage[fixedHeader=MqttFixedHeader[messageType=CONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=22], variableHeader=MqttConnectVariableHeader[name=MQTT, version=4, hasUserName=false, hasPassword=false, isWillRetain=false, isWillFlag=false, isCleanSession=true, keepAliveTimeSeconds=60], payload=MqttConnectPayload[clientIdentifier=JavaSample, willTopic=null, willMessage=null, userName=null, password=null]]]

我们先忽略这些信息,无脑返回ack给他

1
final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build();

我们再运行起Server和Client,随后可以看到已经走过了Connect阶段,进入了publish message过程,接下来我们再实现更多的其他场景

附上此阶段的MessageHandler代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.github.hezhangjian.demo.iot.mqtt.broker;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import lombok.extern.slf4j.Slf4j;

import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;

/**
* 处理Mqtt协议栈
*
* @author hezhangjian
*/
@Slf4j
public class MqttHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
if (msg instanceof MqttConnectMessage) {
handleConnect(ctx, (MqttConnectMessage) msg);
} else {
log.error("Unsupported type msg [{}]", msg);
}
}

private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
log.info("connect msg is [{}]", connectMessage);
final MqttFixedHeader fixedHeader = connectMessage.fixedHeader();
final MqttConnectVariableHeader variableHeader = connectMessage.variableHeader();
final MqttConnectPayload connectPayload = connectMessage.payload();
final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build();
ctx.channel().writeAndFlush(ackMessage);
}

}

什么是sidecar?

kubernetes-sidecar-what-is

sidecar,直译为边车。 如上图所示,边车就是加装在摩托车旁来达到拓展功能的目的,比如行驶更加稳定,可以拉更多的人和货物,坐在边车上的人可以给驾驶员指路等。边车模式通过给应用服务加装一个“边车”来达到控制逻辑的分离的目的。

对于微服务来讲,我们可以用边车模式来做诸如 日志收集、服务注册、服务发现、限流、鉴权等不需要业务服务实现的控制面板能力。通常和边车模式比较的就是像spring-cloud那样的sdk模式,像上面提到的这些能力都通过sdk实现。

kubernetes-sidecar-what-can-do

这两种实现模式各有优劣,sidecar模式会引入额外的性能损耗以及延时,但传统的sdk模式会让代码变得臃肿并且升级复杂,控制面能力和业务面能力不能分开升级。

本文的代码已经上传到gitee

sidecar 实现原理

介绍了sidecar的诸多功能,但是,sidecar是如何做到这些能力的呢?

原来,在kubernetes中,一个pod是部署的最小单元,但一个pod里面,允许运行多个container(容器),多个container(容器)之间共享存储卷和网络栈。这样子,我们就可以多container来做sidecar,或者init-container(初始化容器)来调整挂载卷的权限

kubernetes-sidecar-inside

日志收集sidecar

日志收集sidecar的原理是利用多个container间可以共用挂载卷的原理实现的,通过将应用程序的日志路径挂出,用另一个程序访问路径下的日志来实现日志收集,这里用cat来替代了日志收集,部署yaml模板如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
apiVersion: v1
kind: Pod
metadata:
name: webserver
spec:
volumes:
- name: shared-logs
emptyDir: {}

containers:
- name: nginx
image: ttbb/nginx:mate
volumeMounts:
- name: shared-logs
mountPath: /opt/sh/openresty/nginx/logs

- name: sidecar-container
image: ttbb/base
command: ["sh","-c","while true; do cat /opt/sh/openresty/nginx/logs/nginx.pid; sleep 30; done"]
volumeMounts:
- name: shared-logs
mountPath: /opt/sh/openresty/nginx/logs

使用kubectl create -f 创建pod,通过kubectl logs命令就可以看到sidecar-container打印的日志输出

1
kubectl logs webserver sidecar-container

转发请求sidecar

这一节我们来实现,一个给应用程序转发请求的sidecar,应用程序代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
use std::io::prelude::*;
use std::net::{TcpListener, TcpStream};

fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

for stream in listener.incoming() {
let stream = stream.unwrap();

handle_connection(stream);
}
println!("Hello, world!");
}

fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];

stream.read(&mut buffer).unwrap();

let contents = "Hello";

let response = format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
contents.len(),
contents
);

println!("receive a request!");
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
}

我们再来写一个sidecar,它会每15秒向应用程序发出请求

1
2
3
4
5
6
7
8
9
10
use std::thread;
use std::time::Duration;

fn main() {
loop {
thread::sleep(Duration::from_secs(15));
let response = reqwest::blocking::get("http://localhost:7878").unwrap();
println!("{}", response.text().unwrap())
}
}

通过仓库下的intput/build.sh脚本构造镜像,运行yaml如下

1
2
3
4
5
6
7
8
9
10
11
apiVersion: v1
kind: Pod
metadata:
name: webserver
spec:
containers:
- name: input-server
image: sidecar-examples:input-http-server

- name: input-sidecar
image: sidecar-examples:sidecar-input

通过查看kubectl logs input input-http-server可以看到input-http-server收到了请求

1
2
receive a request!
receive a request!

拦截请求sidecar

应用程序代码,它会每15s向localhost发出请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.hezhangjian.sidecar

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._

import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}

object HttpClient {
def main(args: Array[String]): Unit = {
while (true) {
Thread.sleep(15_000L)
implicit val system: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "SingleRequest")
// needed for the future flatMap/onComplete in the end
implicit val executionContext: ExecutionContextExecutor = system.executionContext

val responseFuture: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://localhost:7979/hello"))

responseFuture
.onComplete {
case Success(res) => println(res)
case Failure(_) => sys.error("something wrong")
}
}
}
}

我们再来写一个sidecar,它会拦截http请求并打印日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.hezhangjian.sidecar

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._

import scala.concurrent.ExecutionContextExecutor
import scala.io.StdIn

object HttpServer {

def main(args: Array[String]): Unit = {

implicit val system: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "my-system")
// needed for the future flatMap/onComplete in the end
implicit val executionContext: ExecutionContextExecutor = system.executionContext

val route =
path("hello") {
get {
println("receive a request")
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
}
}

val bindingFuture = Http().newServerAt("localhost", 7979).bind(route)
while (true) {
Thread.sleep(15_000L)
}
}
}

通过仓库下的output/build.sh脚本构造镜像,运行yaml如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
apiVersion: v1
kind: Pod
metadata:
name: output
spec:
volumes:
- name: shared-logs
emptyDir: {}

containers:
- name: output-workload
image: sidecar-examples:output-workload
imagePullPolicy: Never

- name: sidecar-output
image: sidecar-examples:sidecar-output
imagePullPolicy: Never

通过查看kubectl logs output output-workload可以看到output-sidecar收到了请求

1
2
3
4
5
6
7
8
HttpResponse(200 OK,List(Server: akka-http/10.2.9, Date: Tue, 29 Mar 2022 00:15:47 GMT),HttpEntity.Strict(text/html; charset=UTF-8,31 bytes total),HttpProtocol(HTTP/1.1))
HttpResponse(200 OK,List(Server: akka-http/10.2.9, Date: Tue, 29 Mar 2022 00:16:02 GMT),HttpEntity.Strict(text/html; charset=UTF-8,31 bytes total),HttpProtocol(HTTP/1.1))
HttpResponse(200 OK,List(Server: akka-http/10.2.9, Date: Tue, 29 Mar 2022 00:16:17 GMT),HttpEntity.Strict(text/html; charset=UTF-8,31 bytes total),HttpProtocol(HTTP/1.1))
HttpResponse(200 OK,List(Server: akka-http/10.2.9, Date: Tue, 29 Mar 2022 00:16:32 GMT),HttpEntity.Strict(text/html; charset=UTF-8,31 bytes total),HttpProtocol(HTTP/1.1))
HttpResponse(200 OK,List(Server: akka-http/10.2.9, Date: Tue, 29 Mar 2022 00:16:47 GMT),HttpEntity.Strict(text/html; charset=UTF-8,31 bytes total),HttpProtocol(HTTP/1.1))
HttpResponse(200 OK,List(Server: akka-http/10.2.9, Date: Tue, 29 Mar 2022 00:17:02 GMT),HttpEntity.Strict(text/html; charset=UTF-8,31 bytes total),HttpProtocol(HTTP/1.1))
HttpResponse(200 OK,List(Server: akka-http/10.2.9, Date: Tue, 29 Mar 2022 00:17:17 GMT),HttpEntity.Strict(text/html; charset=UTF-8,31 bytes total),HttpProtocol(HTTP/1.1))
HttpResponse(200 OK,List(Server: akka-http/10.2.9, Date: Tue, 29 Mar 2022 00:17:32 GMT),HttpEntity.Strict(text/html; charset=UTF-8,31 bytes total),HttpProtocol(HTTP/1.1))
0%