以下为个人学习笔记整理。项目源码来自 kcp github 开源库
# kcp 源码阅读
# 收发包流程
Kcp 握手 | 挥手 规则其实是上层应用自己定义的,因此实现上五花八门,这里简单介绍一下:
Kcp 握手:
- 客户端请求连接,服务器创建 kcp 对象,生成 conv 编号,通知客户端
- 客户端通过 conv 初始化 kcp,双端 conv 一致,开始通信
- 特殊情况:
- 客户端连接请求丢失:客户端未收到 conv 触发重传
- 服务器同步 conv 丢失:同理,客户端触发重传再次建立连接,服务器再次生成 conv 并同步
- 客户端发起请求后断开(未通知服务器):服务器 kcp 对象过保活期,直接销毁
Kcp 挥手。挥手有两种,一种是非正常挥手,一种是正常挥手:
非正常挥手。即自己断开后未通知对端
- 客户端网络断开:服务器收不到断开请求,心跳结束后主动销毁 kcp,可以考虑额外主推一次断开请求给客户端,避免临时断网导致客户端无法重连
- 客户端应用被杀:同上
- 服务器宕机:客户端心跳失活,一般考虑重连 or 退出
正常挥手。自身断开前先通知对端
- 客户端主动请求断开:服务器收到主动断开后,清理缓存数据并推送断开数据包后销毁 kcp 对象,客户端收到主动断开回包后处理断开逻辑,正常关闭。
- 服务器强制踢客户端:客户端收到断开请求后,强制销毁 kcp,正常关闭。
特殊情况:
- 客户端发送断开请求丢失:走超时重传或者直接强杀都行,反正服务器保活到了也会清理 kcp
- 服务器推送断开包丢失:同理,心跳包失活,客户端主动断开或者重连
Kcp 本身的细节其实挺多,例如重连需不需要单独一种包类型用来区分,还是默认发送数据的情况下,如果双端 kcp 都保活(但心跳异常)情况下直接重连;如果断开连接后依旧收到了对端的数据包如何处理;如果由于超时发起了多次连接请求,收到了多个 conv 编号情况下的处理,这些问题都是挺值得思考的。🤔
数据包处理双端都基本一致,通过 update 机制一段时间处理一次发包,更新 buf
和 ack
,判断快传和重传更新窗口大小。
收包走 boost asio
的 socket
回调 async_receive_from
,从 socket
内读取数据到 kcp buffer
并执行用于定义的回调函数进行处理:
- 更新
ack_list
,用于下次flush
的时候回包。 - 并尝试读取
kcp buffer
数据,如果数据不完整就等下一个包数据,并继续监听socket
- 如果数据能够读取出来,则读取、解包后,送到上层逻辑处理,并继续监听
socket
# 相关数据结构
# Kcp 包头
// 解包代码,压包同理 | |
data = ikcp_decode32u(data, &conv); | |
data = ikcp_decode8u(data, &cmd); | |
data = ikcp_decode8u(data, &frg); | |
data = ikcp_decode16u(data, &wnd); | |
data = ikcp_decode32u(data, &ts); | |
data = ikcp_decode32u(data, &sn); | |
data = ikcp_decode32u(data, &una); | |
data = ikcp_decode32u(data, &len); | |
// data copy | |
memcpy(seg->data, data, len); |
# 本地 Kcp Segment 结构
包体在内存中的组织形式:
struct IKCPSEG | |
{ | |
struct IQUEUEHEAD node; | |
IUINT32 conv; | |
IUINT32 cmd; | |
IUINT32 frg; | |
IUINT32 wnd; | |
IUINT32 ts; | |
IUINT32 sn; | |
IUINT32 una; | |
IUINT32 len; | |
IUINT32 resendts; | |
IUINT32 rto; | |
IUINT32 fastack; | |
IUINT32 xmit; | |
IUINT8 reliable; // 定制字段,实现了部分数据可以非可靠机制 | |
char data[1]; | |
}; | |
struct IQUEUEHEAD { | |
struct IQUEUEHEAD *next, *prev; | |
}; |
# Kcp 对象数据结构
struct IKCPCB | |
{ | |
IUINT32 conn_id; // 外部设置的唯一标识 | |
IUINT32 conv, mtu, mss, state; | |
IUINT32 snd_una, snd_nxt, rcv_nxt; | |
IUINT32 ts_recent, ts_lastack, ssthresh; | |
IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto, rx_rttval_ratio; | |
IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe; | |
IUINT32 current, interval, ts_flush, xmit; | |
IUINT32 nrcv_buf, nsnd_buf; | |
IUINT32 nrcv_que, nsnd_que; | |
IUINT32 nodelay, updated; | |
IUINT32 ts_probe, probe_wait; | |
IUINT32 dead_link, incr; | |
struct IQUEUEHEAD snd_queue; | |
struct IQUEUEHEAD rcv_queue; | |
struct IQUEUEHEAD snd_buf; | |
struct IQUEUEHEAD rcv_buf; | |
IUINT32 *acklist; | |
IUINT32 ackcount; | |
IUINT32 ackblock; | |
void *user; | |
char *buffer; | |
char *unrel_buffer; | |
int fastresend; | |
int nocwnd, stream; | |
int logmask; | |
void (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user); | |
void (*writelog)(const char *log, struct IKCPCB *kcp, void *user); | |
}; |
# 核心接口
# ikcp_create
创建一个 kcp 链接,发起端会构建一个随机数 conv
用来唯一标识,双端的 conv
一致才可以正常的处理消息。
ikcpcb* ikcp_create(IUINT32 conv, void *user, IUINT32 conn_id, IUINT32 sys_current) | |
{ | |
ikcpcb *kcp = (ikcpcb*)ikcp_malloc(sizeof(struct IKCPCB)); | |
// init kcp args | |
// init buffer | |
kcp->buffer = (char*)ikcp_malloc((kcp->mtu + IKCP_OVERHEAD) * 3); | |
if (kcp->buffer == NULL) { | |
ikcp_free(kcp); | |
return NULL; | |
} | |
// init kcp args | |
return kcp; | |
} |
# ikcp_setoutput | kcp->output | ikcp_output
kcp 的数据发送接口,调用之后默认视作数据包已经发送。
可以通过 ikcp_setoutput
来自定义 kcp->output
操作。
// output segment | |
static void ikcp_output(ikcpcb *kcp, const void *data, int size) | |
{ | |
assert(kcp); | |
assert(kcp->output); | |
if (ikcp_canlog(kcp, IKCP_LOG_OUTPUT)) { | |
ikcp_log(kcp, IKCP_LOG_OUTPUT, "[RO] %ld bytes", (long)size); | |
} | |
if (size == 0) return; | |
kcp->output((const char*)data, size, kcp, kcp->user); | |
} |
# ikcp_update
定时进行调用,核心逻辑在 ikcp_flush
。
//--------------------------------------------------------------------- | |
// update state (call it repeatedly, every 10ms-100ms), or you can ask | |
// ikcp_check when to call it again (without ikcp_input/_send calling). | |
// 'current' - current timestamp in millisec. | |
//--------------------------------------------------------------------- | |
void ikcp_update(ikcpcb *kcp, IUINT32 current, IUINT32 sys_current) | |
{ | |
IINT32 slap; | |
kcp->current = current; | |
if (kcp->updated == 0) { | |
kcp->updated = 1; | |
kcp->ts_flush = kcp->current; | |
} | |
slap = _itimediff(kcp->current, kcp->ts_flush); | |
if (slap >= 10000 || slap < -10000) { | |
kcp->ts_flush = kcp->current; | |
slap = 0; | |
} | |
if (slap >= 0) { | |
kcp->ts_flush += kcp->interval; | |
if (_itimediff(kcp->current, kcp->ts_flush) >= 0) { | |
kcp->ts_flush = kcp->current + kcp->interval; | |
} | |
ikcp_flush(kcp, sys_current); | |
} | |
} |
# ikcp_flush
发送之前打包好待发送的数据包:
- 同步之前收到包的
ack
信息 - 【可选】尝试请求对端窗口大小
- 【可选】尝试回复对端窗口请求
- 计算所需发送的包体数据并发包:
- 发送非可靠包
- 发送正常包
- 发送超时包
- 发送重传包
- 根据发送的包体种类更新各项参数(拥塞控制、慢启动)
//--------------------------------------------------------------------- | |
// ikcp_flush | |
//--------------------------------------------------------------------- | |
void ikcp_flush(ikcpcb *kcp, IUINT32 sys_current) | |
{ | |
// 'ikcp_update' haven't been called. | |
if (kcp->updated == 0) return; | |
// 【step.1】通知上一次 flush 至今收到的所有包的 ack 信息 | |
for (i = 0; i < kcp->ackcount; i++) { | |
// 尝试和包并发包 | |
} | |
// 【step.2】设置下一次探查对端的窗口大小的时间点 | |
if (kcp->rmt_wnd == 0) { | |
if (kcp->probe_wait != 0) { | |
if (_itimediff(kcp->current, kcp->ts_probe) >= 0) { | |
//... | |
kcp->probe |= IKCP_ASK_SEND; | |
} | |
} | |
} | |
// 【step.2.1】满足探查条件会捎上一个探查包,通知对端发送窗口大小 | |
if (kcp->probe & IKCP_ASK_SEND) { | |
//... | |
} | |
// 【step.2.2】发送探查回复包,通知对端自身窗口大小 | |
// 在 ikcp_recv 中设置该标记,用于快速通知对端自身窗口有空余,可以开始发包了 | |
if (kcp->probe & IKCP_ASK_TELL) { | |
//... | |
} | |
// 【step.3】尝试发送数据包 | |
// 【step.3.1】计算拥塞窗口大小,表示对端还能接收到的最大包数量 | |
cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd); | |
if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd); | |
// 【step.3.2】把需要发送的数据包加入发送队列 | |
while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) { | |
// 1 ~ 10 ~ 15 16 | |
// --------------- | |
// | ack | unack | snd_nxt | |
// --------------- | |
// | cwnd = 20 | | |
// | send cnt = 15 | | |
// 核心思想:拥塞窗口大小(cwnd) = 本端已发送对端未收到(unack) + 本次发送() | |
} | |
// 【step.3.3】计算快速重传阈值,超过该值触发重传 | |
// 例如 resent = 3,像对端发送 1,2,3,4,5 包 | |
// 收到了 2 号包的 ack,那么 1 号包就算作 fastack + 1 | |
//fastack >= resent 尝试触发「快速重传」不再等超时了 | |
resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff; | |
// 【step.3.4】设置超时重传的计算规则 | |
rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0; | |
// 【step.3.5】开始读取缓存队列的数据进行发包 | |
for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) { | |
IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node); | |
int needsend = 0; | |
// 【step.3.5.1】非可靠包特殊处理,不放入 ack_list 也不计算重传时间 | |
if (segment->cmd == IKCP_CMD_UNRELIABLE) { | |
// ... | |
continue; | |
} | |
// 【step.3.5.2】计算需要重传的包体 | |
// 非重传包初始化各项参数,重传时间,重传次数... | |
if (segment->xmit == 0) { | |
//... | |
needsend = 1; | |
} | |
// 「超时重传」更新各项参数,重传时间,重传次数... | |
else if (_itimediff(current, segment->resendts) >= 0) { | |
needsend = 2; | |
lost = 1; // 记录出现了报文丢失 | |
//... | |
// 根据 nodelay 重新计算下次重传时间 | |
if (kcp->nodelay == 0) | |
segment->rto += kcp->rx_rto; //tcp 规则 2 倍增长 | |
else | |
segment->rto += kcp->rx_rto / 2; // 快传方式 1.5 倍增长 | |
segment->resendts = current + segment->rto; | |
} | |
// 「快速重传」根据后置丢包数,快速进行重传 | |
else if (segment->fastack >= resent) { | |
needsend = 3; | |
change++; // 标识快重传发生 | |
//... | |
} | |
if (needsend) { | |
// 合包并发包 | |
// 记录本次发送的 Segment 数量 | |
// 超过最大重传次数,表示网络可能出现故障,直接断开连接 | |
if (segment->xmit >= kcp->dead_link) kcp->state = -1; | |
} | |
} | |
//【step.3.5.3】把剩余残留未凑成满包的数据发送(合包后剩下的残渣) | |
//【step.3.6】清理掉缓存数据内的「非可靠包」,因为不需要记录 ack 了 | |
// 【step.3.7】触发快速重传,需要调整拥塞窗口大小 | |
if (change) { | |
// ... | |
} | |
// 【step.3.8】触发了超时重传,需要调整拥塞窗口大小 | |
if (lost) { | |
// ... | |
} | |
} |
# ikcp_input
输入数据是通过系统层的 UDP
传输机制发回的 KCP
包。
通过 ikcp_input
接口处理不同类型的包体来实现 KCP
逻辑:
- 保证可靠性逻辑(重传机制)
- 窗口大小请求等规则
- 把收到的数据规范化,带有
Segment
头信息,并加入rev_buf
(无序) - 根据
sn
再对rev_buf
内的Segment
进行排序并放入到rev_queue
(有序) - 为后续
ikcp_recv
操作做准备
如下是一个 PUSH
包的部分流程:
//--------------------------------------------------------------------- | |
// input data | |
//--------------------------------------------------------------------- | |
int ikcp_input(ikcpcb *kcp, const char *data, long size) | |
{ | |
//【step.1】解 kcp 包 | |
while (1) { | |
//【step.1.1】检查 conv 值 | |
if (conv != kcp->conv) return -1; | |
//【step.1.2】解 kcp 包头,获取包体类型 | |
//【step.1.3】遇到非法包类型直接退出 | |
if (cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK && | |
cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS && | |
cmd != IKCP_CMD_UNRELIABLE) | |
return -3; | |
//【step.1.4】设置远端窗口大小 | |
kcp->rmt_wnd = wnd; | |
//【step.1.5】根据远端发来的第一个未收包编号 | |
// 说明未收包之前的包体都收到了,清理本地包缓存 | |
ikcp_parse_una(kcp, una); | |
//【step.1.6】更新本地的未收包编号 | |
// 如果缓存内没有包体,那么编号为发送包 + 1,否则为最早的缓存包 | |
ikcp_shrink_buf(kcp); | |
//【step.1.7】处理 ACK 包 | |
if (cmd == IKCP_CMD_ACK) { | |
//【step.1.7.1】通过发包到 ack 时间差评估网络状况调整相应的参数 | |
ikcp_update_ack(kcp, _itimediff(kcp->current, ts)); | |
//【step.1.7.2】清理本地已发包缓存和未收包编号 | |
ikcp_parse_ack(kcp, sn); | |
ikcp_shrink_buf(kcp); | |
//【step.1.7.3】记录最大的 ack 包编号 | |
} | |
//【step.1.8】处理对端数据包 | |
else if (cmd == IKCP_CMD_PUSH) { | |
//【step.1.8.1】本地窗口还能容纳包数据情况下,解包,否则直接丢弃 | |
if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) { | |
//【step.1.8.2】更新 ack_list 内的信息 | |
ikcp_ack_push(kcp, sn, ts); | |
//【step.1.8.3】如果是之前没收到的包 sn > kcp->rcv_nxt | |
// 创建包结构,并丢到 rev_buf 里面(无序的) | |
if (_itimediff(sn, kcp->rcv_nxt) >= 0) { | |
// 初始化 segment 包头 | |
ikcp_parse_data(kcp, seg); | |
} | |
} | |
} | |
//【step.1.9】处理对端请求窗口大小包 | |
else if (cmd == IKCP_CMD_WASK) { | |
// 标记一下,下次 flush 的时候需要回复本端窗口大小 | |
kcp->probe |= IKCP_ASK_TELL; | |
} | |
//【step.1.10】对端收到了本地通知的窗口大小后的回包,不做处理 | |
else if (cmd == IKCP_CMD_WINS) { | |
// do nothing... | |
} | |
//【step.1.11】收到了非可靠包,不需要记录在 ack_list 也不用清理缓存直接解完丢 rev_buff 里 | |
else if (cmd == IKCP_CMD_UNRELIABLE) { | |
// ... | |
} else { | |
return -3; | |
} | |
} | |
//【step.2】根据最大未收包编号,更新之前的未收包的丢包次数,用于快速重传 | |
if (flag != 0) { | |
ikcp_parse_fastack(kcp, maxack); | |
} | |
//【step.3】拥塞控制 | |
// 之前的未收包,在本次 ack 后,有些收到了,说明数据正常送达了 | |
if (_itimediff(kcp->snd_una, una) > 0) { | |
// 【step.3.1】如果拥塞窗口小于对端可接收大小,进入流量控制和拥塞控制 | |
if (kcp->cwnd < kcp->rmt_wnd) { | |
// ... | |
//【step.3.1.1】当前拥塞窗口小于阈值,准备慢启动扩张拥塞窗口大小 | |
if (kcp->cwnd < kcp->ssthresh) { | |
kcp->cwnd++; | |
kcp->incr += mss; | |
//【step.3.1.2】当前可发送数据小于一个分片大小,扩张发包大小和拥塞窗口大小 | |
} else { | |
if (kcp->incr < mss) kcp->incr = mss; | |
kcp->incr += (mss * mss) / kcp->incr + (mss / 16); | |
if ((kcp->cwnd + 1) * mss <= kcp->incr) { | |
kcp->cwnd++; | |
} | |
} | |
//【step.3.1.3】如果拥塞窗口大于阈值,限制窗口大小和发包大小 | |
if (kcp->cwnd > kcp->rmt_wnd) { | |
kcp->cwnd = kcp->rmt_wnd; | |
kcp->incr = kcp->rmt_wnd * mss; | |
} | |
} | |
} | |
return 0; | |
} |
# ikcp_recv
批量读取 rev_queue
内按照 sn
顺序编排的包:
- 先对同一组
frg
的包进行读取,读到frg = 0
的情况下停止(合包处理) - 把所有数据合并到
buff
内,并在做一次rev_buf -> rev_queue
的排序和移动 - 判断
recover
,打上标记用于下次ikcp_flush
通知对端窗口大小,减少阻塞时间
//--------------------------------------------------------------------- | |
// user/upper level recv: returns size, returns below zero for EAGAIN | |
//--------------------------------------------------------------------- | |
int ikcp_recv(ikcpcb *kcp, char *buffer, int len) | |
{ | |
// 【step.1】计算一下读取数据大小 | |
// 【step.1.1】读到下一个 frg = 0 包内所有 data 大小的总和(不包括包头) | |
peeksize = ikcp_peeksize(kcp); | |
// 【step.1.2】判断一堆边界情况 | |
// 【step.1.3】如果之前收包队列满了,表示对端不会再发数据了。 | |
// 需要标记一下,后面通知对端新的窗口,让对端能够继续发送数据 | |
if (kcp->nrcv_que >= kcp->rcv_wnd) | |
recover = 1; | |
// 【step.2】读取数据 | |
//merge fragment(合包),按照 data block 为单位进行读取,读到 frg = 0 停止 | |
for (p = kcp->rcv_queue.next; p != &kcp->rcv_queue; ) { | |
// 读取数据 | |
if (fragment == 0) | |
break; | |
} | |
// 【step.3】对 rev_buf 排序后加入到 rev_queue | |
// 由于读取了一些 rev_queue 内容,因此可以再从 rev_buf 挪一部分过来 | |
while (! iqueue_is_empty(&kcp->rcv_buf)) { | |
// move available data from rcv_buf -> rcv_queue | |
} | |
// 【step.4】打上通知对端窗口大小的标记 | |
if (kcp->nrcv_que < kcp->rcv_wnd && recover) { | |
// ready to send back IKCP_CMD_WINS in ikcp_flush | |
// tell remote my window size | |
kcp->probe |= IKCP_ASK_TELL; | |
} | |
return sz; | |
} |
# 规则汇总
Kcp 的发包有很多细节的规则,这里做一个梳理,并进一步的解释
# 「Stream」模式和「非 Stream」模式
kcp 支持两种模式的发包:
# Stream
假设图中每个 data block
都来自不同应用层的数据包
# 非 Stream
- 「Stream」模式情况下,
data block
的数据是可以分段读取每个data
- 「非 Stream」模式的解包必须一次性解出整个
data block
- 「Stream」模式情况下,多个
data block
可以拼接在一起进行发送,因此data block 2
内的部分数据可以被拆分到第一个传输单元内,但是「非 Stream」不支持该操作,会对单个data block
进行装包。
# 如何解出拼接过的数据?
例如我们需要读取蓝色方块的数据
- 假设第一个包数据来了,但是数据有点少,还不足一个包头,那么先存着,等下一个包。
- 下个包来了,发现可以解出包头了,但是后面的数据还少了,再等下一个包。
- 下个包来了,数据也够了,才开始进行解包。
这里的
data head
和data len
已经是业务层面的规则了,并非kcp
部分,实现上可以按照实际情况来
# 快速重传 | 超时重传
- 快速重传:后续包收到了,但是前面的包没收到,就会触发快速重传
- 超时重传:数据包发送后一段时间内没有收到
ack
回复,就会触发超时重传
# 合包规则
合包其实本质上是减少发包次数,尽可能利用带宽
# 滑动窗口
# 慢启动 | 拥塞窗口
慢启动本质是为了在严重丢包网络环境下,如果网络发生好转后,一点点放开发送窗口大小,增加每次发送的数据量
慢启动的实现一般是通过控制拥塞窗口大小,进而控制发包数量。
每次执行 ikcp_input
操作获取接受包时,如果发现之前发送的包被对端 ack
后( snd_una
变了),会尝试扩大拥塞窗口大小:
- 如果没有超过阈值的情况下,每次
+1
- 如果超过阈值后,每次增加
1/ikcp_incr + 1/16
,如果变化后下取整能够多出一个单位,则窗口+1
if (_itimediff(kcp->snd_una, una) > 0) { | |
if (kcp->cwnd < kcp->rmt_wnd) { | |
IUINT32 mss = kcp->mss; | |
if (kcp->cwnd < kcp->ssthresh) { | |
kcp->cwnd++; | |
kcp->incr += mss; | |
} else { | |
if (kcp->incr < mss) kcp->incr = mss; | |
kcp->incr += (mss * mss) / kcp->incr + (mss / 16); | |
if ((kcp->cwnd + 1) * mss <= kcp->incr) { | |
kcp->cwnd++; | |
} | |
} | |
if (kcp->cwnd > kcp->rmt_wnd) { | |
kcp->cwnd = kcp->rmt_wnd; | |
kcp->incr = kcp->rmt_wnd * mss; | |
} | |
} | |
} |
当然,如果网络环境突然异常 —— 出现丢包(超时重传) | 链路切换导致乱序(快速重传),也会通过拥塞窗口减少发包数量避免无谓的超时重传:
- 出现丢包:这种情况下,可能网络直接就不好用了,发都发不过去,直接把拥塞窗口设置为
1
,强行限流。
if (lost) { | |
kcp->ssthresh = cwnd / 2; | |
if (kcp->ssthresh < IKCP_THRESH_MIN) | |
kcp->ssthresh = IKCP_THRESH_MIN; | |
kcp->cwnd = 1; | |
kcp->incr = kcp->mss; | |
} |
- 出现乱序:网络波动比较大,频繁链路切换或者其他因素导致乱序,适当的降低窗口大小:
- 拥塞窗口阈值
ssthresh
减半,拥塞窗口调整为ssthresh + resent(表示收到的乱序包数量,一般情况下等于 fastack)
- 拥塞窗口阈值
if (change) { | |
IUINT32 inflight = kcp->snd_nxt - kcp->snd_una; | |
kcp->ssthresh = inflight / 2; | |
if (kcp->ssthresh < IKCP_THRESH_MIN) | |
kcp->ssthresh = IKCP_THRESH_MIN; | |
kcp->cwnd = kcp->ssthresh + resent; | |
kcp->incr = kcp->cwnd * kcp->mss; | |
} |