以下为个人学习笔记整理。项目源码来自 kcp github 开源库

# kcp 源码阅读

# 收发包流程

Kcp 握手 | 挥手 规则其实是上层应用自己定义的,因此实现上五花八门,这里简单介绍一下:

  • Kcp 握手:

    • 客户端请求连接,服务器创建 kcp 对象,生成 conv 编号,通知客户端
    • 客户端通过 conv 初始化 kcp,双端 conv 一致,开始通信
    • 特殊情况:
      • 客户端连接请求丢失:客户端未收到 conv 触发重传
      • 服务器同步 conv 丢失:同理,客户端触发重传再次建立连接,服务器再次生成 conv 并同步
      • 客户端发起请求后断开(未通知服务器):服务器 kcp 对象过保活期,直接销毁
  • Kcp 挥手。挥手有两种,一种是非正常挥手,一种是正常挥手:

    • 非正常挥手。即自己断开后未通知对端

      • 客户端网络断开:服务器收不到断开请求,心跳结束后主动销毁 kcp,可以考虑额外主推一次断开请求给客户端,避免临时断网导致客户端无法重连
      • 客户端应用被杀:同上
      • 服务器宕机:客户端心跳失活,一般考虑重连 or 退出
    • 正常挥手。自身断开前先通知对端

      • 客户端主动请求断开:服务器收到主动断开后,清理缓存数据并推送断开数据包后销毁 kcp 对象,客户端收到主动断开回包后处理断开逻辑,正常关闭。
      • 服务器强制踢客户端:客户端收到断开请求后,强制销毁 kcp,正常关闭。
    • 特殊情况:

      • 客户端发送断开请求丢失:走超时重传或者直接强杀都行,反正服务器保活到了也会清理 kcp
      • 服务器推送断开包丢失:同理,心跳包失活,客户端主动断开或者重连

Kcp 本身的细节其实挺多,例如重连需不需要单独一种包类型用来区分,还是默认发送数据的情况下,如果双端 kcp 都保活(但心跳异常)情况下直接重连;如果断开连接后依旧收到了对端的数据包如何处理;如果由于超时发起了多次连接请求,收到了多个 conv 编号情况下的处理,这些问题都是挺值得思考的。🤔

数据包处理双端都基本一致,通过 update 机制一段时间处理一次发包,更新 bufack ,判断快传和重传更新窗口大小。

收包走 boost asiosocket 回调 async_receive_from ,从 socket 内读取数据到 kcp buffer 并执行用于定义的回调函数进行处理:

  • 更新 ack_list ,用于下次 flush 的时候回包。
  • 并尝试读取 kcp buffer 数据,如果数据不完整就等下一个包数据,并继续监听 socket
  • 如果数据能够读取出来,则读取、解包后,送到上层逻辑处理,并继续监听 socket

image-20220223153932659

# 相关数据结构

# Kcp 包头

image-20220223145332492

// 解包代码,压包同理
data = ikcp_decode32u(data, &conv);
data = ikcp_decode8u(data, &cmd);
data = ikcp_decode8u(data, &frg);
data = ikcp_decode16u(data, &wnd);
data = ikcp_decode32u(data, &ts);
data = ikcp_decode32u(data, &sn);
data = ikcp_decode32u(data, &una);
data = ikcp_decode32u(data, &len);
// data copy
memcpy(seg->data, data, len);

# 本地 Kcp Segment 结构

包体在内存中的组织形式:

image-20220221150023054

struct IKCPSEG
{
	struct IQUEUEHEAD node;
	IUINT32 conv;
	IUINT32 cmd;
	IUINT32 frg;
	IUINT32 wnd;
	IUINT32 ts;
	IUINT32 sn;
	IUINT32 una;
	IUINT32 len;
	IUINT32 resendts;
	IUINT32 rto;
	IUINT32 fastack;
	IUINT32 xmit;
	IUINT8  reliable;   // 定制字段,实现了部分数据可以非可靠机制
	char data[1];
};
struct IQUEUEHEAD {
	struct IQUEUEHEAD *next, *prev;
};

# Kcp 对象数据结构

image-20220221172232658

struct IKCPCB
{
	IUINT32 conn_id; // 外部设置的唯一标识
	IUINT32 conv, mtu, mss, state;
	IUINT32 snd_una, snd_nxt, rcv_nxt;
	IUINT32 ts_recent, ts_lastack, ssthresh;
	IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto, rx_rttval_ratio;
	IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe;
	IUINT32 current, interval, ts_flush, xmit;
	IUINT32 nrcv_buf, nsnd_buf;
	IUINT32 nrcv_que, nsnd_que;
	IUINT32 nodelay, updated;
	IUINT32 ts_probe, probe_wait;
	IUINT32 dead_link, incr;
	struct IQUEUEHEAD snd_queue;
	struct IQUEUEHEAD rcv_queue;
	struct IQUEUEHEAD snd_buf;
	struct IQUEUEHEAD rcv_buf;
	IUINT32 *acklist;
	IUINT32 ackcount;
	IUINT32 ackblock;
	void *user;
	char *buffer;
	char *unrel_buffer;
	int fastresend;
	int nocwnd, stream;
	int logmask;
	void (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user);
	void (*writelog)(const char *log, struct IKCPCB *kcp, void *user);
};

# 核心接口

# ikcp_create

创建一个 kcp 链接,发起端会构建一个随机数 conv 用来唯一标识,双端的 conv 一致才可以正常的处理消息。

ikcpcb* ikcp_create(IUINT32 conv, void *user, IUINT32 conn_id, IUINT32 sys_current)
{
	ikcpcb *kcp = (ikcpcb*)ikcp_malloc(sizeof(struct IKCPCB));
	// init kcp args
    // init buffer
	kcp->buffer = (char*)ikcp_malloc((kcp->mtu + IKCP_OVERHEAD) * 3);
	if (kcp->buffer == NULL) {
		ikcp_free(kcp);
		return NULL;
	}
    
	// init kcp args
	return kcp;
}

# ikcp_setoutput | kcp->output | ikcp_output

kcp 的数据发送接口,调用之后默认视作数据包已经发送。

可以通过 ikcp_setoutput 来自定义 kcp->output 操作。

// output segment
static void ikcp_output(ikcpcb *kcp, const void *data, int size)
{
	assert(kcp);
	assert(kcp->output);
	if (ikcp_canlog(kcp, IKCP_LOG_OUTPUT)) {
		ikcp_log(kcp, IKCP_LOG_OUTPUT, "[RO] %ld bytes", (long)size);
	}
	if (size == 0) return;
	kcp->output((const char*)data, size, kcp, kcp->user);
}

# ikcp_update

定时进行调用,核心逻辑在 ikcp_flush

//---------------------------------------------------------------------
// update state (call it repeatedly, every 10ms-100ms), or you can ask 
// ikcp_check when to call it again (without ikcp_input/_send calling).
// 'current' - current timestamp in millisec. 
//---------------------------------------------------------------------
void ikcp_update(ikcpcb *kcp, IUINT32 current, IUINT32 sys_current)
{
	IINT32 slap;
	kcp->current = current;
	if (kcp->updated == 0) {
		kcp->updated = 1;
		kcp->ts_flush = kcp->current;
	}
	slap = _itimediff(kcp->current, kcp->ts_flush);
	if (slap >= 10000 || slap < -10000) {
		kcp->ts_flush = kcp->current;
		slap = 0;
	}
	if (slap >= 0) {
		kcp->ts_flush += kcp->interval;
		if (_itimediff(kcp->current, kcp->ts_flush) >= 0) {
			kcp->ts_flush = kcp->current + kcp->interval;
		}
		ikcp_flush(kcp, sys_current);
	}
}

# ikcp_flush

发送之前打包好待发送的数据包:

  • 同步之前收到包的 ack 信息
  • 【可选】尝试请求对端窗口大小
  • 【可选】尝试回复对端窗口请求
  • 计算所需发送的包体数据并发包:
    • 发送非可靠包
    • 发送正常包
    • 发送超时包
    • 发送重传包
  • 根据发送的包体种类更新各项参数(拥塞控制、慢启动)
//---------------------------------------------------------------------
// ikcp_flush
//---------------------------------------------------------------------
void ikcp_flush(ikcpcb *kcp, IUINT32 sys_current)
{
	// 'ikcp_update' haven't been called. 
	if (kcp->updated == 0) return;
	// 【step.1】通知上一次 flush 至今收到的所有包的 ack 信息
	for (i = 0; i < kcp->ackcount; i++) {
		// 尝试和包并发包
	}
    
	// 【step.2】设置下一次探查对端的窗口大小的时间点
    if (kcp->rmt_wnd == 0) {
        if (kcp->probe_wait != 0) {
            if (_itimediff(kcp->current, kcp->ts_probe) >= 0) {
				//...
                kcp->probe |= IKCP_ASK_SEND;
            }
        }
    }
	// 【step.2.1】满足探查条件会捎上一个探查包,通知对端发送窗口大小
	if (kcp->probe & IKCP_ASK_SEND) {
		//...
	}
	// 【step.2.2】发送探查回复包,通知对端自身窗口大小
    // 在 ikcp_recv 中设置该标记,用于快速通知对端自身窗口有空余,可以开始发包了
	if (kcp->probe & IKCP_ASK_TELL) {
		//...
	}
    // 【step.3】尝试发送数据包
	// 【step.3.1】计算拥塞窗口大小,表示对端还能接收到的最大包数量
	cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
	if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);
	// 【step.3.2】把需要发送的数据包加入发送队列
	while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
		// 1 ~ 10   ~    15    16
        // ---------------
        // | ack | unack |   snd_nxt
        // ---------------
        //       |         cwnd = 20       |
        //               |  send cnt = 15  |
        // 核心思想:拥塞窗口大小(cwnd) = 本端已发送对端未收到(unack) + 本次发送()
	}
	// 【step.3.3】计算快速重传阈值,超过该值触发重传
    // 例如 resent = 3,像对端发送 1,2,3,4,5 包
    // 收到了 2 号包的 ack,那么 1 号包就算作 fastack + 1
    //fastack >= resent 尝试触发「快速重传」不再等超时了
	resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;
	// 【step.3.4】设置超时重传的计算规则
    rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0;
	// 【step.3.5】开始读取缓存队列的数据进行发包
	for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
		IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
		int needsend = 0;
        // 【step.3.5.1】非可靠包特殊处理,不放入 ack_list 也不计算重传时间
		if (segment->cmd == IKCP_CMD_UNRELIABLE) {
			// ...
			continue;
		}
        // 【step.3.5.2】计算需要重传的包体
        // 非重传包初始化各项参数,重传时间,重传次数...
		if (segment->xmit == 0) {
			//...
            needsend = 1;
		}
        // 「超时重传」更新各项参数,重传时间,重传次数...
		else if (_itimediff(current, segment->resendts) >= 0) {
			needsend = 2;
            lost = 1; // 记录出现了报文丢失
            //...
			// 根据 nodelay 重新计算下次重传时间
			if (kcp->nodelay == 0)
                segment->rto += kcp->rx_rto; //tcp 规则 2 倍增长
			else 
                segment->rto += kcp->rx_rto / 2; // 快传方式 1.5 倍增长
			segment->resendts = current + segment->rto;
		}
        // 「快速重传」根据后置丢包数,快速进行重传
		else if (segment->fastack >= resent) {
			needsend = 3;
            change++;  // 标识快重传发生
			//...
		}
		if (needsend) {
			// 合包并发包
	
            // 记录本次发送的 Segment 数量
            
            // 超过最大重传次数,表示网络可能出现故障,直接断开连接
			if (segment->xmit >= kcp->dead_link) kcp->state = -1;
		}
	}
	//【step.3.5.3】把剩余残留未凑成满包的数据发送(合包后剩下的残渣)
	//【step.3.6】清理掉缓存数据内的「非可靠包」,因为不需要记录 ack 了
	// 【step.3.7】触发快速重传,需要调整拥塞窗口大小
	if (change) {
        // ...
	}
    // 【step.3.8】触发了超时重传,需要调整拥塞窗口大小
	if (lost) {
		// ...
	}
}

# ikcp_input

输入数据是通过系统层的 UDP 传输机制发回的 KCP 包。

通过 ikcp_input 接口处理不同类型的包体来实现 KCP 逻辑:

  • 保证可靠性逻辑(重传机制)
  • 窗口大小请求等规则
  • 把收到的数据规范化,带有 Segment 头信息,并加入 rev_buf (无序)
  • 根据 sn 再对 rev_buf 内的 Segment 进行排序并放入到 rev_queue (有序)
  • 为后续 ikcp_recv 操作做准备

如下是一个 PUSH 包的部分流程:

image-20220222152800500

//---------------------------------------------------------------------
// input data
//---------------------------------------------------------------------
int ikcp_input(ikcpcb *kcp, const char *data, long size)
{
    //【step.1】解 kcp 包
	while (1) {
		//【step.1.1】检查 conv 值
		if (conv != kcp->conv) return -1;
        //【step.1.2】解 kcp 包头,获取包体类型
        
        //【step.1.3】遇到非法包类型直接退出
		if (cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
			cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS &&
			cmd != IKCP_CMD_UNRELIABLE) 
			return -3;
        //【step.1.4】设置远端窗口大小
        kcp->rmt_wnd = wnd;
        //【step.1.5】根据远端发来的第一个未收包编号
        // 说明未收包之前的包体都收到了,清理本地包缓存
        ikcp_parse_una(kcp, una);
        //【step.1.6】更新本地的未收包编号
        // 如果缓存内没有包体,那么编号为发送包 + 1,否则为最早的缓存包
        ikcp_shrink_buf(kcp);
		//【step.1.7】处理 ACK 包
		if (cmd == IKCP_CMD_ACK) {
            //【step.1.7.1】通过发包到 ack 时间差评估网络状况调整相应的参数
			ikcp_update_ack(kcp, _itimediff(kcp->current, ts));
            
            //【step.1.7.2】清理本地已发包缓存和未收包编号
			ikcp_parse_ack(kcp, sn);
			ikcp_shrink_buf(kcp);
            //【step.1.7.3】记录最大的 ack 包编号
		}
        //【step.1.8】处理对端数据包
		else if (cmd == IKCP_CMD_PUSH) {
			//【step.1.8.1】本地窗口还能容纳包数据情况下,解包,否则直接丢弃
			if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) {
                //【step.1.8.2】更新 ack_list 内的信息
				ikcp_ack_push(kcp, sn, ts);
                
                //【step.1.8.3】如果是之前没收到的包 sn > kcp->rcv_nxt
                // 创建包结构,并丢到 rev_buf 里面(无序的)
				if (_itimediff(sn, kcp->rcv_nxt) >= 0) {
					// 初始化 segment 包头
					ikcp_parse_data(kcp, seg);
				}
			}
		}
        //【step.1.9】处理对端请求窗口大小包
		else if (cmd == IKCP_CMD_WASK) {
			// 标记一下,下次 flush 的时候需要回复本端窗口大小
			kcp->probe |= IKCP_ASK_TELL;
		}
        //【step.1.10】对端收到了本地通知的窗口大小后的回包,不做处理
		else if (cmd == IKCP_CMD_WINS) {
			// do nothing...
		}
        //【step.1.11】收到了非可靠包,不需要记录在 ack_list 也不用清理缓存直接解完丢 rev_buff 里
		else if (cmd == IKCP_CMD_UNRELIABLE) {
			// ...
		} else {
			return -3;
		}
	}
    //【step.2】根据最大未收包编号,更新之前的未收包的丢包次数,用于快速重传
	if (flag != 0) {
		ikcp_parse_fastack(kcp, maxack);
	}
    //【step.3】拥塞控制
    // 之前的未收包,在本次 ack 后,有些收到了,说明数据正常送达了
	if (_itimediff(kcp->snd_una, una) > 0) {
        // 【step.3.1】如果拥塞窗口小于对端可接收大小,进入流量控制和拥塞控制
		if (kcp->cwnd < kcp->rmt_wnd) {
			// ...
            //【step.3.1.1】当前拥塞窗口小于阈值,准备慢启动扩张拥塞窗口大小
			if (kcp->cwnd < kcp->ssthresh) {
				kcp->cwnd++;
				kcp->incr += mss;
			//【step.3.1.2】当前可发送数据小于一个分片大小,扩张发包大小和拥塞窗口大小
            } else {
				if (kcp->incr < mss) kcp->incr = mss;
				kcp->incr += (mss * mss) / kcp->incr + (mss / 16);
				if ((kcp->cwnd + 1) * mss <= kcp->incr) {
					kcp->cwnd++;
				}
			}
            //【step.3.1.3】如果拥塞窗口大于阈值,限制窗口大小和发包大小
			if (kcp->cwnd > kcp->rmt_wnd) {
				kcp->cwnd = kcp->rmt_wnd;
				kcp->incr = kcp->rmt_wnd * mss;
			}
		}
	}
	return 0;
}

# ikcp_recv

批量读取 rev_queue 内按照 sn 顺序编排的包:

  • 先对同一组 frg 的包进行读取,读到 frg = 0 的情况下停止(合包处理)
  • 把所有数据合并到 buff 内,并在做一次 rev_buf -> rev_queue 的排序和移动
  • 判断 recover ,打上标记用于下次 ikcp_flush 通知对端窗口大小,减少阻塞时间
//---------------------------------------------------------------------
// user/upper level recv: returns size, returns below zero for EAGAIN
//---------------------------------------------------------------------
int ikcp_recv(ikcpcb *kcp, char *buffer, int len)
{
	// 【step.1】计算一下读取数据大小
    // 【step.1.1】读到下一个 frg = 0 包内所有 data 大小的总和(不包括包头)
	peeksize = ikcp_peeksize(kcp);
	// 【step.1.2】判断一堆边界情况
    
    // 【step.1.3】如果之前收包队列满了,表示对端不会再发数据了。
    // 需要标记一下,后面通知对端新的窗口,让对端能够继续发送数据
	if (kcp->nrcv_que >= kcp->rcv_wnd)
		recover = 1;
    // 【step.2】读取数据
	//merge fragment(合包),按照 data block 为单位进行读取,读到 frg = 0 停止
	for (p = kcp->rcv_queue.next; p != &kcp->rcv_queue; ) {
		// 读取数据
        if (fragment == 0) 
			break;
	}
    // 【step.3】对 rev_buf 排序后加入到 rev_queue
    // 由于读取了一些 rev_queue 内容,因此可以再从 rev_buf 挪一部分过来
	while (! iqueue_is_empty(&kcp->rcv_buf)) {
		// move available data from rcv_buf -> rcv_queue
	}
	// 【step.4】打上通知对端窗口大小的标记
	if (kcp->nrcv_que < kcp->rcv_wnd && recover) {
		// ready to send back IKCP_CMD_WINS in ikcp_flush
		// tell remote my window size
		kcp->probe |= IKCP_ASK_TELL;
	}
	return sz;
}

# 规则汇总

Kcp 的发包有很多细节的规则,这里做一个梳理,并进一步的解释

# 「Stream」模式和「非 Stream」模式

kcp 支持两种模式的发包:

# Stream

假设图中每个 data block 都来自不同应用层的数据包

image-20220222171735081

# 非 Stream

image-20220222171940486

  • 「Stream」模式情况下, data block 的数据是可以分段读取每个 data
  • 「非 Stream」模式的解包必须一次性解出整个 data block
  • 「Stream」模式情况下,多个 data block 可以拼接在一起进行发送,因此 data block 2 内的部分数据可以被拆分到第一个传输单元内,但是「非 Stream」不支持该操作,会对单个 data block 进行装包。

# 如何解出拼接过的数据?

例如我们需要读取蓝色方块的数据

image-20220222172714830

  • 假设第一个包数据来了,但是数据有点少,还不足一个包头,那么先存着,等下一个包。
  • 下个包来了,发现可以解出包头了,但是后面的数据还少了,再等下一个包。
  • 下个包来了,数据也够了,才开始进行解包。

这里的 data headdata len 已经是业务层面的规则了,并非 kcp 部分,实现上可以按照实际情况来

# 快速重传 | 超时重传

  • 快速重传:后续包收到了,但是前面的包没收到,就会触发快速重传
  • 超时重传:数据包发送后一段时间内没有收到 ack 回复,就会触发超时重传

# 合包规则

合包其实本质上是减少发包次数,尽可能利用带宽

image-20220222174042490

# 滑动窗口

image-20220222180212342

# 慢启动 | 拥塞窗口

慢启动本质是为了在严重丢包网络环境下,如果网络发生好转后,一点点放开发送窗口大小,增加每次发送的数据量

慢启动的实现一般是通过控制拥塞窗口大小,进而控制发包数量。

每次执行 ikcp_input 操作获取接受包时,如果发现之前发送的包被对端 ack 后( snd_una 变了),会尝试扩大拥塞窗口大小:

  • 如果没有超过阈值的情况下,每次 +1
  • 如果超过阈值后,每次增加 1/ikcp_incr + 1/16 ,如果变化后下取整能够多出一个单位,则窗口 +1
if (_itimediff(kcp->snd_una, una) > 0) {
    if (kcp->cwnd < kcp->rmt_wnd) {
        IUINT32 mss = kcp->mss;
        if (kcp->cwnd < kcp->ssthresh) {
            kcp->cwnd++;
            kcp->incr += mss;
        }	else {
            if (kcp->incr < mss) kcp->incr = mss;
            kcp->incr += (mss * mss) / kcp->incr + (mss / 16);
            if ((kcp->cwnd + 1) * mss <= kcp->incr) {
                kcp->cwnd++;
            }
        }
        if (kcp->cwnd > kcp->rmt_wnd) {
            kcp->cwnd = kcp->rmt_wnd;
            kcp->incr = kcp->rmt_wnd * mss;
        }
    }
}

当然,如果网络环境突然异常 —— 出现丢包(超时重传) | 链路切换导致乱序(快速重传),也会通过拥塞窗口减少发包数量避免无谓的超时重传:

  • 出现丢包:这种情况下,可能网络直接就不好用了,发都发不过去,直接把拥塞窗口设置为 1 ,强行限流。
if (lost) {
    kcp->ssthresh = cwnd / 2;
    if (kcp->ssthresh < IKCP_THRESH_MIN)
        kcp->ssthresh = IKCP_THRESH_MIN;
    kcp->cwnd = 1;
    kcp->incr = kcp->mss;
}
  • 出现乱序:网络波动比较大,频繁链路切换或者其他因素导致乱序,适当的降低窗口大小:
    • 拥塞窗口阈值 ssthresh 减半,拥塞窗口调整为 ssthresh + resent(表示收到的乱序包数量,一般情况下等于 fastack)
if (change) {
    IUINT32 inflight = kcp->snd_nxt - kcp->snd_una;
    kcp->ssthresh = inflight / 2;
    if (kcp->ssthresh < IKCP_THRESH_MIN)
        kcp->ssthresh = IKCP_THRESH_MIN;
    kcp->cwnd = kcp->ssthresh + resent;
    kcp->incr = kcp->cwnd * kcp->mss;
}
更新于 阅读次数

请我[恰饭]~( ̄▽ ̄)~*

鑫酱(●'◡'●) 微信支付

微信支付

鑫酱(●'◡'●) 支付宝

支付宝