在本篇文章中,我们将探讨如何在容器内指定特定域名解析结果的几种方式。为了方便演示,首先我们创建一个演示用的Deployment配置文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
apiVersion: apps/v1
kind: Deployment
metadata:
name: busybox-deployment
labels:
app: busybox
spec:
replicas: 1
selector:
matchLabels:
app: busybox
template:
metadata:
labels:
app: busybox
spec:
containers:
- name: busybox
image: busybox
args:
- /bin/sh
- -c
- "while true; do echo Hello, Kubernetes!; sleep 10;done"

这个deployment会创建1个busybox的pod,容器每隔10s会打印“Hello, Kubernetes!”到控制台

TL;DR

方案 修改级别 是否推荐 备注
修改/etc/hosts pod
添加HostAliases记录 pod/deploy/statefulset
修改Coredns配置 整个kubernetes集群
自定义DNS策略 pod/deploy/statefulset 视情况而定 如需对接三方的DNS服务器,推荐采用
使用三方DNS插件 整个kubernetes集群 不推荐,Coredns为业内主流

修改/etc/hosts

修改/etc/hosts是最传统的方式,直接在容器内修改相应的文件来实现域名解析,在Pod级别生效。由于其可维护性较差(每次pod发生重启都需要手动修改),不推荐在生产环境使用。

例如,我们可以在/etc/hosts里面添加这样一条记录

1
250.250.250.250 four-250
1
2
/ # ping four-250
PING four-250 (250.250.250.250): 56 data bytes

添加HostAliases记录

HostAliases是kubernetes中Pod配置的一个字段,它提供了Pod内容器的/etc/hosts文件的附加记录。这在某些情况下非常有用,特别是当你想要覆盖某个主机名的解析结果,或者提供网络中没有的主机名解析时。

这个可以在Pod、Replica、Deployment、StatefulSet的级别修改,维护性稍强。举个🌰,我们将上面的yaml修改为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
apiVersion: apps/v1
kind: Deployment
metadata:
name: busybox-deployment
labels:
app: busybox
spec:
replicas: 3
selector:
matchLabels:
app: busybox
template:
metadata:
labels:
app: busybox
spec:
hostAliases:
- ip: "250.250.250.250"
hostnames:
- "four-250"
containers:
- name: busybox
image: busybox
args:
- /bin/sh
- -c
- "while true; do echo Hello, Kubernetes!; sleep 10;done"

这个时候我们查看容器的/etc/hosts,发现它被kubernetes自动插入了一条记录**Entries add by HostAliases。**这就是hostAliases的实现原理

kubelet_pods代码中进行了这样的写入动作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func hostsEntriesFromHostAliases(hostAliases []v1.HostAlias) []byte {
if len(hostAliases) == 0 {
return []byte{}
}

var buffer bytes.Buffer
buffer.WriteString("\n")
buffer.WriteString("# Entries added by HostAliases.\n")
// for each IP, write all aliases onto single line in hosts file
for _, hostAlias := range hostAliases {
buffer.WriteString(fmt.Sprintf("%s\t%s\n", hostAlias.IP, strings.Join(hostAlias.Hostnames, "\t")))
}
return buffer.Bytes()
}

Coredns配置

我们可以通过修改ConfigMap来实现让容器解析特定域名的目的。

更改Coredns配置

我们可以通过以下命令修改Coredns的配置:

1
kubectl edit cm coredns -n kube-system

原有的configmap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Corefile: |
.:53 {
log
errors
health {
lameduck 5s
}
ready
kubernetes cluster.local in-addr.arpa ip6.arpa {
pods insecure
fallthrough in-addr.arpa ip6.arpa
ttl 30
}
prometheus :9153
hosts {
192.168.65.2 host.minikube.internal
fallthrough
}
forward . /etc/resolv.conf {
max_concurrent 1000
}
cache 30
loop
reload
loadbalance
}

在hosts里面加上特定的记录

1
250.250.250.250 four-250

如果您没有配置reload插件,则需要重启Coredns才能生效,默认的reload时间是30s,在plugin/reload/setup.go的defaultInterval中定义

自定义DNS策略

通过修改DNS策略。使得对于单个Pod/Deploy/StatefulSet将特定的域名解析发给特定的服务器来达到效果,如下,可以对pod添加dns的服务器以及search域

1
2
3
4
5
6
7
8
9
10
11
12
13
spec:
dnsConfig:
nameservers:
- 1.2.3.4
searches:
- search.prefix
containers:
- name: busybox
image: busybox
args:
- /bin/sh
- -c
- "while true; do echo Hello, Kubernetes!; sleep 10;done"

使用第三方DNS插件

不推荐,使用其他的DNS插件,来做一些炫酷的自定义操作。而且目前Coredns也是业内的主流,没有很好的替代

问题背景

有一个环境的kafka client发送数据有部分超时,拓扑图也非常简单

Untitled

定位历程

我们先对客户端的环境及JVM情况进行了排查,从JVM所在的虚拟机到kafka server的网络正常,垃圾回收(GC)时间也在预期范围内,没有出现异常。

紧接着,我们把目光转向了kafka 服务器,进行了一些基础的检查,同时也查看了kafka处理请求的超时日志,其中我们关心的metadata和produce请求都没有超时。

问题就此陷入了僵局,虽然也搜到了一些kafka server会对连上来的client反解导致超时的问题( https://github.com/apache/kafka/pull/10059),但通过一些简单的分析,我们确定这并非是问题所在。

同时,我们在环境上也发现一些异常情况,当时觉得不是核心问题/解释不通,没有深入去看

  • 问题JVM线程数较高,已经超过10000,这个线程数量虽然确实较高,但并不会对1个4U的容器产生什么实质性的影响。
  • 负责指标上报的线程CPU较高,大约占用了1/4 ~ 1/2 的CPU核,这个对于4U的容器来看问题也不大

当排查陷入僵局,我们开始考虑其他可能的调查手段。我们尝试抓包来找线索,这里的抓包是SASL鉴权+SSL加密的,非常难读,只能靠长度和响应时间勉强来推断报文的内容。

在这个过程中,我们发现了一个非常重要的线索,客户端竟然发起了超时断链,并且超时的那条消息,实际服务端是有响应回复的。

随后我们将kafka client的trace级别日志打开,这里不禁感叹kafka client日志打的相对较少,发现的确有log.debug(“Disconnecting from node {} due to request timeout.”, nodeId);的日志打印。

与网络相关的流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
try {
// 这里发出了请求
client.send(request, time.milliseconds());
while (client.active()) {
List<ClientResponse> responses = client.poll(Long.MAX_VALUE, time.milliseconds());
for (ClientResponse response : responses) {
if (response.requestHeader().correlationId() == request.correlationId()) {
if (response.wasDisconnected()) {
throw new IOException("Connection to " + response.destination() + " was disconnected before the response was read");
}
if (response.versionMismatch() != null) {
throw response.versionMismatch();
}
return response;
}
}
}
throw new IOException("Client was shutdown before response was read");
} catch (DisconnectException e) {
if (client.active())
throw e;
else
throw new IOException("Client was shutdown before response was read");

}

这个poll方法,不是简单的poll方法,而在poll方法中会进行超时判断,查看poll方法中调用的handleTimedOutRequests方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
public List<ClientResponse> poll(long timeout, long now) {
ensureActive();

if (!abortedSends.isEmpty()) {
// If there are aborted sends because of unsupported version exceptions or disconnects,
// handle them immediately without waiting for Selector#poll.
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
completeResponses(responses);
return responses;
}

long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}

// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
// 关键的超时判断
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);

return responses;
}

由此我们推断,问题可能在于客户端hang住了一段时间,从而导致超时断链。我们通过工具Arthas深入跟踪了Kafka的相关代码,甚至发现一些简单的操作(如A.field)也需要数秒的时间。这进一步确认了我们的猜想:问题可能出在JVM。JVM可能在某个时刻出现问题,导致系统hang住,但这并非由GC引起。

Untitled

为了解决这个问题,我们又检查了监控线程CPU较高的问题。我们发现线程的执行热点是从”sun.management.ThreadImpl”中的”getThreadInfo”方法。

1
2
3
4
5
"metrics-1@746" prio=5 tid=0xf nid=NA runnable
java.lang.Thread.State: RUNNABLE
at sun.management.ThreadImpl.getThreadInfo(Native Method)
at sun.management.ThreadImpl.getThreadInfo(ThreadImpl.java:185)
at sun.management.ThreadImpl.getThreadInfo(ThreadImpl.java:149)

进一步发现,在某些版本的JDK8中,读取线程信息是需要加锁的。

至此,问题的根源已经清晰明了:过高的线程数以及线程监控时JVM全局锁的存在导致了这个问题。您可以使用如下的demo来复现这个问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ThreadLockSimple {
public static void main(String[] args) {
for (int i = 0; i < 15_000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(200_000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
}
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("take " + " " + System.currentTimeMillis());
}
}, 1, 1, TimeUnit.SECONDS);
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
ScheduledExecutorService metricsService = Executors.newSingleThreadScheduledExecutor();
metricsService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
ThreadInfo[] threadInfoList = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds());
System.out.println("threads count " + threadInfoList.length + " cost :" + (System.currentTimeMillis() - start));
}
}, 1, 1, TimeUnit.SECONDS);
}
}

为了解决这个问题,我们有以下几个可能的方案:

  • 将不合理的线程数往下降,可能存在线程泄露的场景
  • 升级jdk到jdk11或者jdk17(推荐)
  • 将Thread相关的监控临时关闭

这个问题的解决方案应根据实际情况进行选择,希望对你有所帮助。

我写这篇文章来论证“超时之后要不要重启客户端”、“如何重启客户端”。简而言之,重启客户端还是为了让系统能够达到自愈,是比较高的可靠性要求。如果你的软件没有这么高的可靠性要求,像是人机交互程序等对可靠性要求较低的场景,可以选择不考虑这个功能。毕竟实现这个功能的时间至少够300倍你重新点击按钮/重启的时间了。

如果是一些串口协议,通过传输的间隙来判断报文的间隔,比如modbus协议,3.5个时间内不发送,就计算做一个协议报文的开始,那么故障时的报文毫无疑问会处理失败(因为存在CRC校验,奇偶校验等)。等待故障结束,又3.5个时间后,就会恢复正常。

如果能确保网络通信报文不会遭到篡改、也没有宇宙射线/太阳黑子修改你的比特位的场景下,笔者认为没有特别大的必要对客户端进行重启操作,因为不见得重启后就比之前更好,这种超时通常是由服务端处理时间长导致的。没做到建链限制的情况下,贸然重启,还可能会引起建链的波峰。

但是,在实际复杂的网络环境下,如网络报文遭到篡改部分字节丢失等的情况下,一切就大不一样了,不重启客户端就无法自愈。这其中的关键在于,切分报文是否正确。

比如基于TCP的网络协议,这也是本文重点讨论的场景,假设应用协议采用最常见的LengthBasedFrame分包方式,这种协议,通常根据前0~4个字节来判断协议的总长度,比如前面的字节是00000014,那这个报文长度就是1*16 + 4 = 20长度。这种时候,一旦发生了报文篡改/丢包,会导致通信端计算报文长度出错,一直在傻等,无法自愈。

比如上面的例子一旦发生篡改,将4篡改5,那么就会导致客户端/服务器一直在等待不存在的第21个字节,这种情况下,如果不做超时重建,那么这条链路就会一直处于等待状态,无法自愈。

综上所述,实际复杂的网络环境下出现通信超时,这条链路可能会无法自愈。这种情况下,笔者推荐对针对tcp链路做超时重建,业内的一些例子像是:bookkeeper client没有做,kafka client做了。至于重建的触发条件,比如一次超时就重建、多次超时之后才重建、仅当心跳报文超时才重建,这些就交给读者自己把握了。如果区别不大,笔者倾向于一次超时就重建,逻辑简单清晰。

在Java程序运行时,一些非受检异常可能会导致程序崩溃,比如NullPointerException、ArrayIndexOutOfBoundsException等等,这些异常都是由JVM抛出的,如果不对这些异常进行处理,小则线程运行中突然退出,大则整个程序崩溃。理想的场景下,每一个非受检异常都应该被捕获并进行处理,但是在实际开发中,我们往往会忽略一些异常,这些异常可能是由于程序员的疏忽导致的,也可能是由于程序员无法预知的原因导致的,比如第三方库抛出的异常。

为了避免这些异常导致程序崩溃,Java提供了一个全局的异常处理器,即DefaultUncaughtExceptionHandler,它可以捕获所有未被捕获的异常,从而避免程序崩溃。

DefaultUncaught的使用示例如下:

1
2
3
4
5
public class UncaughtExceptionHandle {
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception: ", e));
}
}

上述的代码会将未捕获的异常打印到日志中,如果你希望打印至标准输出或标准输出,可以将log替换为:

1
2
3
4
// 标准输出
System.out.println("Uncaught exception: " + e);
// 错误输出
System.err.println("Uncaught exception: " + e);

在主函数中捕获未处理的异常

在主函数中捕获未处理的异常,防止程序崩溃,同时记录日志,方便排查问题。

1
2
3
4
5
public class UncaughtExceptionHandle {
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception: ", e));
}
}

记一次中文指标乱码问题,问题也很简单,如下图所示:

Untitled

从metricbeat开始找原因,发现其实只要是UTF-8的编码格式就都可以解析,最终发现是webServer返回的数据非UTF-8格式,修改方案也很简单。将servlet中的content-type里面的text/plain修改成text/plain; charset=utf-8就可以了,如下面代码所示:

1
2
3
4
5
6
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws IOException {
response.setContentType("text/plain");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().write("<h1>哈哈</h1>");
}

我们可以轻易使用一个demo来复现这个问题,在maven中添加如下依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>9.4.35.v20201120</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>9.4.35.v20201120</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.hezhangjian.jetty;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;

import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

public class SimpleJettyServer {

public static void main(String[] args) throws Exception {
Server server = new Server(8080);

ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath("/");
server.setHandler(context);

context.addServlet(new ServletHolder(new HelloDefaultServlet()), "/hello-default");
context.addServlet(new ServletHolder(new HelloUTF8Servlet()), "/hello-utf8");

server.start();
server.join();
}

public static class HelloDefaultServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws IOException {
response.setContentType("text/plain");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().write("<h1>哈哈</h1>");
}
}

public static class HelloUTF8Servlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws IOException {
response.setContentType("text/plain; charset=UTF-8");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().write("<h1>哈哈</h1>");
}
}
}

通过curl命令来复现这个问题

1
2
3
4
curl localhost:8080/hello-default
<h1>??</h1>%
curl localhost:8080/hello-utf8
<h1>哈哈</h1>%

那么servlet里面的数据如何编码,我们可以dive一下,首先servlet里面有一个函数叫**response.setCharacterEncoding();**这个函数可以指定编码格式。其次,servlet还会通过上面的setContentType函数来做一定的推断,比如content-type中携带了charset,就使用content-type中的charset。还有些特定的content-type,比如text/json,在没有设置的情况下,servlet容器会假设它使用utf-8编码。在推断不出来,也没有手动设置的情况下,jetty默认的编码是iso-8859-1,这就解释了乱码的问题。

微服务广播模式,指的是在微服务多实例部署的场景下,将消息广播到多个微服务实例的一种模式。

Untitled

广播模式,一般用来维护微服务的内存数据,根据数据类型的不同,有助于解决两类问题。通常广播模式会使用支持发布订阅的消息中间件实现(如Redis、Kafka、Pulsar等),本文也基于消息中间件进行讨论。

利用广播模式维护一致的缓存

这应该是广播模式利用最多的一种场景,假想一个拥有海量用户的电商网站、或是一个亿级设备连接的IoT平台。势必会存在一些缓存数据,像是用户的购物车信息,或是设备的密钥缓存。如果没有广播模式,可能会存在这样的问题

Untitled

当用户更新了它的购物车之后,微服务实例1的数据发生了更新,数据库的数据也成功更新。但是微服务实例2中的缓存数据未能更新,那么如果用户的请求均衡到了实例2,就会发生意想不到的后果。

这种情况下我们可以让微服务1在广播通道中发送一个缓存的invalidate消息,将微服务实例2中该用户的缓存清零,使得微服务实例2在下一次处理该用户的请求时,从数据库中读取最新的消息。

使用该模式需要注意的点:

  • 每个微服务实例应该使用不同的消费组,可以通过微服务的IP、主机名、UUID等拼装成订阅组名称,这才称得上广播之名
  • 微服务消费消息的时候,应从Latest开始消费,避免从Earliest开始消费无用的缓存清理消息
  • 由于每一次微服务重启都会产生一个新的消费组,需要注意消费组的老化,可以通过消息中间件自带的不活跃消费组老化能力兜底,建议通过gracefulExit、监听kill信号等机制来主动删除消费组信息

为什么说消费组老化比较重要呢,因为很多监控系统都会根据消费组的积压来做告警,很容易产生误告警。

利用广播模式维护内存中的数据

这种模式相对比较少见,常见于key的基数不是很大,能够将数据完整地存储在内存中,比如电商平台的企业卖家个数、物联网平台的用户个数等,并且对数据的一致性要求不是很高(因为广播模式情况下,对于两个微服务实例来说没有一致性保障)。像Apache Pulsar设计的TableView,在我看来,就是做这个事的一个最佳实践。Pulsar内部大量使用了topic存储数据,就是采用这个方式。

使用该模式需要注意的点:

  • 同上,需要使用不同的消费组名称
  • 微服务消费消息的时候,应该从Earliest开始消费,保证所有微服务内存中的消息视图一致
  • 同上,需要注意消费组的老化

为什么需要消费组老化作为保底手段

因为在极端场景下,无论是graceful的代码,还是监听kill信号的代码,都不能保证代码百分百地被执行。需要兜底。

Kafka消费组老化

Kafka通过offsets.retention.minutes参数控制消费组中offsets保留时间,在此时间内如果没有提交offset,offsets将会被删除。Kafka判定消息组中没有在线的消费者(如empty状态),且没有offsets时,将会删除此消费组。

Pulsar消费组老化

pulsar的消费组老化策略更加灵活,可以配置到namespace级别。

1
2
3
4
5
6
7
8
bin/pulsar-admin namespaces | grep expiration
get-subscription-expiration-time Get subscription expiration time for
Usage: get-subscription-expiration-time [options] tenant/namespace
set-subscription-expiration-time Set subscription expiration time for
Usage: set-subscription-expiration-time [options] tenant/namespace
Subscription expiration time in minutes
remove-subscription-expiration-time Remove subscription expiration
Usage: remove-subscription-expiration-time [options] tenant/namespace

这里注意要合理地配置消费组的老化时间,在pulsar的当前版本(2.11版本)下,catch up读,也就是说消费组平时积压量不大。如果将消费组的老化时间配置大于等于消息的老化时间,会出现消费组老化不了的现象。

当然,由于消费组和消息老化都是定时任务,预估时间时,要考虑一定的buffer。

这里让我们稍稍dive一下原理,消费组的老化是通过判断Cursor游标的LastActive time来判断能否老化的。如果该消费组的游标位置到达了消息老化区域,被老化掉了,消费组的游标位置就会强制更新到一个可用的位置,这个时候会更新游标的LastActive time到当前时间,周而复始,导致消费组无法老化。举个🌰

假设消费组的老化时间为4h,消息的老化时间为3h,就可能会发生这样的事情

Untitled

总结

广播模式在微服务架构中起到了重要的角色,尤其是在需要在微服务实例之间同步数据的场景中,它具有显著的优势。它能够帮助维护内存数据的缓存一致性。希望本篇文章能提供您全面的广播模式的知识。

近期,我再次涉足于协议服务器相关的工作领域,致力于定位并解决各种问题。简单总结一些心得给大家。如果想要定位出协议服务器的问题,那么这些能力可能至关重要。

注:我这里比较偏向协议本身的问题,不涉及一些通用的网络问题(如网络吞吐量上不去、响应时间长等等)

对CPU和内存的通用分析能力

首先,网络协议服务器本质上也是一个应用程序。因此,需要具备一些关于CPU和内存的通用分析能力。PU/内存火焰图,内存dump分析,锁分析,以及远程调试(研发态手段)这些手段都要具备

日志和网络连接的关联

为了有效地定位网络问题,日志需要精确到毫秒级别。没有毫秒级别的精度,定位网络问题就会变得极其困难。所以golang的logrus默认只有秒级别,我觉得不太好,用rfc3339就很好。

在打印日志时,我们不能太过随意。例如,“connection lost”这样的日志,在调试阶段可能看似无大碍,但当真正的业务量和连接数大幅增加时,这种模糊的日志信息就会让人束手无策。

理想的日志至少应包含网络地址信息,这样我们可以根据网络地址和时间点来查阅日志。如果有抓包的话,那就更好了,可以从中获取大量信息。

当然,我们并不需要在所有的日志中都包含网络地址信息。例如,一旦完成了用户身份的鉴定,我们就可以打印用户的身份信息,这样更方便与后续的业务流程进行整合。如果需要查询网络地址信息,可以回溯到建立连接时的日志。举个🌰

1
2
3
2023-05-30 23:59:01.000 [INFO] 127.0.0.1:62345 connected
2023-05-30 23:59:02.000 [INFO] 127.0.0.1:62345 authed, username is Wolverine
2023-05-30 23:59:03.000 [INFO] Wolverine killed magneto

假设一条数据链上有大量的消息呢?在现代的网络环境中,一条TCP链接可以轻易达到5M bit/s以上的数据流。即使我们提供了时间点信息,仍然很难找到具有问题的报文(在同一秒内可能有上千条报文)。在这种情况下,就需要引入会话的ID信息。许多TCP协议会携带这种信息,换句话说,支持IO复用的协议都会有这种信息(比如MQTT的messageId,Kafka的correlationId等)。此类信息应该被正确地打印在日志中。

针对特征值的跟踪能力

你可能已经在调试日志中包含了非常详尽的信息,然而在实际环境中,这可能并没有太大用处。

原因是一旦全面开启debug日志,性能消耗会大幅增加。除非你的系统性能冗余极大,否则根本无法正常运行。

为此,我们可以提升debug的能力,针对特定的特征值开启debug,例如网络地址、mqtt的clientId、消息中间件的topic等。应用程序仅针对这些特征值打印详细的日志,这样的开销就相对较小,而且这种方法已经在生产环境中被我多次验证。

将网络报文与业务trace关联起来

在网络协议服务器中,我们需要将网络报文与业务trace关联起来。这种关联能力的实现可以大大提高我们定位业务端到端问题的效率和准确性。 理想情况下,我们应该能够根据网络报文来查找相关的业务trace,反之亦然,根据业务trace来查找对应的网络报文。但这些手段都需要业务端的配合,比如在报文中携带traceId,或者在业务trace中携带网络地址信息。

以mqtt协议为例,可以在payload中带上

1
2
3
4
{
"traceId": "xxxx",
"data": "xxxx"
}

在这个例子中,traceId就是我们为业务trace设定的唯一标识符,而data则是实际的业务数据。通过在网络报文中携带这些信息,我们就可以轻松地将网络报文与其对应的业务trace关联起来。

然而,这种方法在研发和测试环境中实现相对容易,但在生产环境中可能会遇到更多的困难。首先,对于在网络报文中携带traceId这一做法,业界并未形成统一的规范和实践。这导致在生产环境,极难做到。

更具挑战性的是,如果你面对的是一个端到端的复杂系统,将traceId从系统的入口传递到出口可能会遇到许多难以预见的问题。例如系统不支持这类数据的专递,这就封死了这条路。

查看原始报文的能力

查看原始报文的能力极其重要,特别是在协议栈的实现尚不成熟的情况下。如果无法查看原始报文,定位问题就会变得非常困难。我曾说过:“如果拿到了原始报文,还是无法复现问题,那我们的研发能力在哪里?”虽然这句话可能有些极端,但它准确地强调了抓包的重要性。

我们可以从抓包看出网络的连通性、网络的延迟、网络的吞吐量、报文的格式、报文的正确性等等。如果途径了多个网元,那么是谁的错?(一般来说,看抓包,谁先发RST,就从谁身上找原因)

虽然抓包的命令比较简单tcpdump port 8080 -i eth0 -s0 -w my.pcap就抓了,但实际想做成,最大的阻力是这两个,TLS和复杂的现网环境

在旧版本的TLS密钥交换算法下,只要有私钥和密码,就可以顺利解包,但现在的tls,都支持前向加密,什么叫前向加密呢?简单地来说,就是给你私钥和密码,你也解不出来。有tls debuginfo和ebpf能解决这两个问题,tls debug-info的原理是将密钥交换时的密钥输出持久化到某个地方,然后拿这个去解,实际很少见有人用这个方案。ebpf一需要linux内核高版本,同时还需要开启功能,安装kernel-debug-info,门槛也比较高。

现网环境,像抓包嗅探的这种工具,有时候可能是禁止上传的,或者即使能上传成功,也需要很长的时间。

也许我们可以通过“应用层抓包”来解决上述的问题,在网络层,我们支持受限的抓包能力,比如可以抓针对某个特征值(比如网络地址、messageId)的包,因为我们在应用层,可使用的过滤条件更多,更精细,输出到某个路径,这个报文的组装,完全在应用网络层,虽然看不到物理层的一些信息,但对于应用程序来说,除非我是做nat设备的,一般用不到这些信息。继续用这个报文来分析问题。实现应用层抓包,也要注意对内存的占用等等,不能因为这个功能,把整个进程搞崩溃。

应用层抓包的一些思考

抓包地点的选择

在应用层抓包,第一步就是确定抓包的地点。由于我们是在应用层进行操作,因此抓包地点一般位于应用程序与网络协议栈的交接处。例如,你可以在数据包刚被应用接收,还未被处理之前进行抓包,或者在数据包即将被应用发送出去,还未进入网络协议栈之前进行抓包。

过滤条件的设定

设定过滤条件是抓包的关键,因为在实际环境中,数据流量可能非常大,如果没有过滤条件,抓包的数据量可能会非常庞大,对应用和系统的性能产生影响。在应用层,我们可以设置更多更精细的过滤条件,如网络地址、端口、协议类型、特定的字段等。这些过滤条件可以帮助我们更精确地定位问题,减少无效的数据。

数据存储问题

将抓到的数据存储起来也是很重要的一步。可以选择将数据存储到内存或者硬盘。需要注意的是,如果选择存储到内存,要考虑到内存的大小,避免因为抓包数据过大导致内存溢出。如果选择存储到硬盘,要考虑到硬盘的读写速度和容量,避免因为抓包数据过大导致硬盘满载。

总结

本文首先阐述了网络协议服务器的一些问题定位能力,包括CPU内存分析能力、日志和网络连接的关联能力、针对特征值的跟踪能力,以及查看原始报文的能力,也讨论了将网络报文与业务trace有效关联的重要性和实现挑战。强调了抓包的重要性和对于解密TLS报文的挑战。为了解决网络层抓包遇到的困难,我们可以考虑应用层抓包方案。最后,我们讨论了应用层抓包的一些关键问题,包括抓包地点的选择、过滤条件的设定和数据存储问题。

Pulsar像大多数消息中间件一样,支持按时间和大小对消息积压进行老化。但是默认的策略只能在namespace级别配置。本文将介绍如何在topic级别实现老化策略的两种方案。

方案一:开启 TopicLevelPolicy 来实现

默认的策略配置通过在Zookeeper上配置对应的策略,可以通过./pulsar zookeeper-shell命令来登录zookeeper集群查询。但是如果将这一实现方式扩展到topic级别,将会产生大量的(百万、千万级别)的ZooKeeper节点,这对于ZooKeeper集群来说几乎是不可接受的。因此,Pulsar提供了一种新的实现方式,即通过Topic来存储策略配置,而不是通过ZooKeeper来存储。

Pulsar,从2.7.0版本开始,引入了SystemTopic,用于存储Topic的元数据信息,包括Topic的策略配置。主题级策略使用户可以更灵活地管理主题,并不会给 ZooKeeper 带来额外负担。

您可以通过如下配置来开启TopicLevelPolicy

1
2
systemTopicEnabled=true
topicLevelPoliciesEnabled=true

然后通过set-backlog-quota命令来设置您想要的老化时间和老化大小

PS: 完整的一些功能,如命令行set-backlog-quota,在3.0.0版本中支持

方案二:通过自定义代码来实现

PulsarTopicLevelPolicy实现需要通过topic存储策略配置,而不是通过ZooKeeper来存储。在实际的极端场景下,Topic中存储的内容可能会丢失(因为未开启Bookkeeper立即落盘或磁盘文件损坏等原因),这将导致策略配置丢失,从而导致策略失效。因此,我们可以通过自定义代码来实现topic级别的策略配置,这样可以避免策略配置丢失的问题。

举个例子,业务可以将策略存放在Mysql中,然后通过PulsarAdmin API来让策略生效

自定义代码实现Backlog时间策略

sequenceDiagram
    participant C as Client
    participant B as Broker
    loop
        C ->> B: expire-messages-all-subscriptions Request
        B -->> C: expire-messages-all-subscriptions Response
    end

自定义代码实现Backlog大小策略

sequenceDiagram
    participant C as Client
    participant B as Broker
    loop
        C ->> B: stats-internal Request
        B -->> C: stats-internal Response
        alt messageBacklogSize < maxMessageBacklogSize
        else messageBacklogSize >= maxMessageBacklogSize
            Note over B,C: estimate the backlog position
            C ->> B: get-message-by-id Request
            B -->> C: get-message-by-id
            Note over B,C: get the timestamp of the message
            C ->> B: expire-messages-all-subscriptions Request
            B -->> C: expire-messages-all-subscriptions Response
        end
    end

你的mysql客户端和服务端之间开启tls了吗?你的回答可能是No,我没有申请证书,也没有开启mysql客户端,服务端的tls配置。

可是当你抓取了3306 mysql的端口之后,你会发现,抓出来的包里居然有Client Hello、Server Hello这样的典型TLS报文。

mysql-tls-pcap

其实,Mysql的通信是否加密,是由客户端和服务端共同协商是否开启的,客户端与服务端都处于默认配置下的话,有些类似于StartTls。

服务端侧

在连接建立时,Mysql服务端会返回一个Server Greeting,其中包含了一些关于服务端的信息,比如协议版本、Mysql版本等等。在其中有一个flag的集合字段,名为
Capabilities Flag,顾名思义,这就是用来做兼容性,或者说特性开关的flag,大小为2个字节,其中的第12位,代表着CLIENT_SSL
,如果设置为1,那代表着如果客户端具备能力,服务端可以在后面的会话中切换到TLS。可以看到里面还有一些其他的flag,事务、长密码等等相关的兼容性开关。

mysql-pcap-capabilities-flag

我们可以测试一下设置为0的行为,只需要在my.cnf中添加

1
echo "ssl=0" >> /etc/my.cnf

重启mysql。再度进行抓包,就发现没有tls的报文了,都是在使用明文进行通信了。

mysql-plain-pcap

客户端侧

这个协商过程也可以在客户端进行控制,客户端对应的参数是sslMode,可以设置为DISABLED、PREFERRED、REQUIRED、VERIFY_CA、VERIFY_IDENTITY,分别代表不使用ssl、优先使用ssl、必须使用ssl、验证CA、验证身份。默认的行为是PREFERRED,example:

比如配置sslMode为DISABLED,那么客户端就不会使用ssl进行通信,而是使用明文。

1
r2dbc:mysql://localhost:3306/test?sslMode=DISABLED

mysql-client-disable-tls

总结

客户端 服务端 结果
DISABLED ssl=0 PLAIN
DISABLED ssl=1 PLAIN
PREFERRED ssl=0 PLAIN
PREFERRED ssl=1 TLS
REQUIRED ssl=0 Fail
REQUIRED ssl=1 TLS
VERIFY_CA ssl=0 Fail
VERIFY_CA ssl=1 + CA配置 TLS,客户端验证证书
VERIFY_IDENTITY ssl=0 Fail
VERIFY_IDENTITY ssl=1 + CA配置 TLS,客户端验证证书和域名

注:

  • VERIFY_CA:确保服务器证书由受信任的CA签发,但不验证证书的主机名或IP地址。
  • VERIFY_IDENTITY:不仅验证证书的CA签发,还额外验证证书的主机名或IP地址与服务器的实际地址是否一致。
0%