谷动谷力

 找回密码
 立即注册
查看: 1317|回复: 0
打印 上一主题 下一主题
收起左侧

【网络开发】网络收包讲解02

[复制链接]
跳转到指定楼层
楼主
发表于 2023-8-7 17:55:51 | 只看该作者 |只看大图 回帖奖励 |倒序浏览 |阅读模式
本帖最后由 sunsili 于 2024-4-9 19:57 编辑

【网络开发】网络收包讲解02


基于 Linux 内核 6.0、64 位系统和 Intel 网卡驱动 igb。由于篇幅过长切分多篇,所有参考内容在最后一篇。

整体流程图
三、网络接口层

3.1 概述
数据包在本层主要处理流程有五:
  • 网卡收到数据包,DMA 方式写入Ring Buffer,发出硬中断;
  • 内核收到硬中断,NAPI 加入本 CPU 的轮询列表,发出软中断;
  • 内核收到软中断,轮询 NAPI 并执行poll函数从Ring Buffer取数据;
  • GRO 操作(默认开启),合并多个数据包为一个数据包,如果 RPS 关闭,则把数据包递交到协议栈;
  • RPS 操作(默认关闭),如果开启,使数据包通过别的(也可能是当前的) CPU 递交到协议栈;

图4 L1流程图
图5 L1调用链

3.1.1 Ring Buffer
这一层里有一个大名鼎鼎的数据结构Ring Buffer,后面统称它的中文名字叫环形缓冲区。
接收Ring Buffer实际上有两个环形队列,一个是 CPU 使用的rx_buffer_info数组,数组元素是rx_buffer,另一个是网卡硬件使用的desc数组,元素是rx_desc,它们都存储在主内存上的内核空间。igb网卡驱动中rx_buffer是 igb_rx_buffer,rx_desc是e1000_adv_rx_desc。
igb_rx_buffer 结构体代码如下,结构图如图2:
  1. struct igb_rx_buffer {
  2.     /* DMA 地址 */
  3.     dma_addr_t dma;
  4.     /* 物理页,与 dma 指向同一个内存区域 */
  5.     struct page *page;
  6. #if (BITS_PER_LONG > 32) || (PAGE_SIZE >= 65536)
  7.     __u32 page_offset;
  8. #else
  9.     __u16 page_offset;
  10. #endif
  11.     __u16 pagecnt_bias;
  12. };
复制代码

图6
e1000_adv_rx_desc 结构体代码如下,结构图如图3:
  1. union e1000_adv_rx_desc {
  2.     struct {
  3.         __le64 pkt_addr; /* Packet buffer address */
  4.         __le64 hdr_addr; /* Header buffer address */
  5.     } read;
  6.     struct {
  7.         struct {
  8.             struct {
  9.                 __le16 pkt_info; /* RSS type, Packet type */
  10.                 __le16 hdr_info; /* Split Head, buf len */
  11.             } lo_dword;
  12.             union {
  13.                 __le32 rss; /* RSS Hash */
  14.                 struct {
  15.                     __le16 ip_id; /* IP id */
  16.                     __le16 csum;  /* Packet Checksum */
  17.                 } csum_ip;
  18.             } hi_dword;
  19.         } lower;
  20.         struct {
  21.             __le32 status_error; /* ext status/error */
  22.             __le16 length;       /* Packet length */
  23.             __le16 vlan;         /* VLAN tag */
  24.         } upper;
  25.     } wb; /* writeback */
  26. };
复制代码

图7
3.2 网卡收到数据包
网卡收到数据包后,通过 DMA 写入Ring Buffer(rx_ring)内rx_buffer_info数组的下一个可用元素(igb_rx_buffer)的 dma 指向的内核内存,dma实际是网卡可以使用的总线地址,一个网络帧可能占用多个igb_rx_buffer。
这是第一次复制,从网卡到 Ring Buffer 的复制。

3.3 内核收到硬中断

复制完后,如果硬中断没有被关闭,则网卡发出硬中断。假设启动时硬中断类型选择的是MSI-X,那么硬中断注册的函数就 igb_msix_ring,从下面的代码可以看出,硬中断处理函数逻辑非常简单,仅仅调用了 igb_write_itr 和 napi_schedule 两个函数,igb_msix_ring 函数代码如下:
  1. static irqreturn_t igb_msix_ring(int irq, void *data)
  2. {
  3.     struct igb_q_vector *q_vector = data;
  4.     /* Write the ITR value calculated from the previous interrupt. */
  5.     igb_write_itr(q_vector);
  6.     napi_schedule(&q_vector->napi);

  7.     return IRQ_HANDLED;
  8. }
复制代码

igb_write_itr 负责更新特定的硬件中的寄存器,而 napi_schedule 才是重点工作,负责调度 NAPI,napi_schedule 调用__napi_schedule 再调用 ____napi_schedule 函数,后者部分代码如下:
  1. static inline void ____napi_schedule(struct softnet_data *sd, struct napi_struct *napi)
  2. {
  3.     list_add_tail(&napi->poll_list, &sd->poll_list);
  4.     __raise_softirq_irqoff(NET_RX_SOFTIRQ);
  5. }
复制代码

其主要逻辑有二:
  • 把napi_struct结构体的poll_list添加到当前 CPU 所关联的softnet_data结构体的poll_list链表尾部;
  • 然后调用 __raise_softirq_irqoff 函数触发NET_RX_SOFTIRQ软中断,从而内核执行网络子系统初始化时注册的 net_rx_action 软中断处理函数。

3.4 内核收到软中断

处理硬中断的 CPU 同样也会执行该硬中断触发的软中断注册的处理函数,软中断函数 net_rx_action 在ksoftirqd内核进程执行。net_rx_action 遍历当前 CPU 队列中的 NAPI 列表,依次取出列表中的 NAPI 结构对其进行 napi_poll 操作。net_rx_action 函数非常重要。

3.4.1 net_rx_action 函数
/
  1. / 部分代码
  2. static __latent_entropy void net_rx_action(struct softirq_action *h)
  3. {
  4.     struct softnet_data *sd = this_cpu_ptr(&softnet_data);
  5.     unsigned long time_limit = jiffies + usecs_to_jiffies(READ_ONCE(netdev_budget_usecs));
  6.     int budget = READ_ONCE(netdev_budget);
  7.     LIST_HEAD(list);
  8.     LIST_HEAD(repoll);

  9.     local_irq_disable();
  10.     list_splice_init(&sd->poll_list, &list);
  11.     local_irq_enable();

  12.     for (;;) {
  13.         struct napi_struct *n;

  14.         skb_defer_free_flush(sd);

  15.         if (list_empty(&list)) {
  16.             if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))
  17.                 goto end;
  18.             break;
  19.         }

  20.         n = list_first_entry(&list, struct napi_struct, poll_list);
  21.         budget -= napi_poll(n, &repoll);

  22.        /* If softirq window is exhausted then punt.
  23.         * Allow this to run for 2 jiffies since which will allow
  24.         * an average latency of 1.5/HZ.
  25.         */
  26.         if (unlikely(budget <= 0 || time_after_eq(jiffies, time_limit))) {
  27.             sd->time_squeeze++;
  28.             break;
  29.         }
  30.     }

  31.     local_irq_disable();

  32.     list_splice_tail_init(&sd->poll_list, &list);
  33.     list_splice_tail(&repoll, &list);
  34.     list_splice(&list, &sd->poll_list);
  35.     if (!list_empty(&sd->poll_list))
  36.         __raise_softirq_irqoff(NET_RX_SOFTIRQ);
  37.     /* 通过 smp_call_function_single_async 远程激活 sd->rps_ipi_list 中的其他 CPU 的软中断,
  38.      * 使其他 CPU 执行初始化时注册的软中断函数 csd = rps_trigger_softirq 来处理数据包 */
  39.     net_rps_action_and_irq_enable(sd);
  40. end:;
  41. }
复制代码

napi_poll 函数调用 __napi_poll 函数对napi_struct进行操作,然后判断napi_struct是否加回poll_list列表尾部,budget是控制消费rx_buffer的数量,避免 CPU 一直被软中断占用。

3.4.2 __napi_poll 函数
  1. // 部分代码
  2. static int __napi_poll(struct napi_struct *n, bool *repoll)
  3. {
  4.     int work, weight;
  5.     weight = n->weight;

  6.     work = 0;
  7.     if (test_bit(NAPI_STATE_SCHED, &n->state)) {
  8.         work = n->poll(n, weight);
  9.         trace_napi_poll(n, work, weight);
  10.     }

  11.     if (likely(work < weight))
  12.         return work;

  13.     if (unlikely(napi_disable_pending(n))) {
  14.         napi_complete(n);
  15.         return work;
  16.     }

  17.     *repoll = true;

  18.     return work;
  19. }
复制代码

上面代码中weight代表 RX 队列的处理优先级(网卡驱动对应权重是固定的 64),napi_struct里的poll函数被调用,igb驱动对应的是先前注册的 igb_poll 函数。

3.4.3 igb_poll 函数

  1. / 部分代码
  2. static int igb_poll(struct napi_struct *napi, int budget)
  3. {
  4.     struct igb_q_vector *q_vector = container_of(napi, struct igb_q_vector, napi);
  5.     bool clean_complete = true;
  6.     int work_done = 0;

  7. #ifdef CONFIG_IGB_DCA
  8.     if (q_vector->adapter->flags & IGB_FLAG_DCA_ENABLED)
  9.         igb_update_dca(q_vector);
  10. #endif
  11.     if (q_vector->tx.ring)
  12.         clean_complete = igb_clean_tx_irq(q_vector, budget);
  13.     if (q_vector->rx.ring) {
  14.         int cleaned = igb_clean_rx_irq(q_vector, budget);
  15.         work_done += cleaned;
  16.         if (cleaned >= budget)
  17.             clean_complete = false;
  18.     }
  19.     /* If all work not completed, return budget and keep polling */
  20.     if (!clean_complete)
  21.         return budget;
  22.     /* Exit the polling mode, but don't re-enable interrupts if stack might
  23.      * poll us due to busy-polling
  24.      */
  25.     if (likely(napi_complete_done(napi, work_done)))
  26.         igb_ring_irq_enable(q_vector);

  27.     return work_done;
  28. }
复制代码

其主要逻辑如下:
  • 如果内核支持 DCA(Direct Cache Access),CPU 缓存命中率将会提升;
  • 调用 igb_clean_rx_irq 循环处理数据包,直到处理完毕或者budget耗尽,下面详细解读;
  • 检查clean_complete判断是否所有的工作已经完成;
  • 如果不是,返回剩下的budget值;
  • 否则调用 napi_complete_done 函数继续处理。
  • 调用 gro_normal_list 函数,因为数据包处理完了,及时把 igb_clean_rx_irq 处理完的多个包一次性送到协议栈;
  • 然后检查 NAPI 的poll_list是否都处理完,如果是则关闭 NAPI,并通过 igb_ring_irq_enable 重新打开硬中断,以保证下次中断会重新打开 NAPI。


3.4.4 igb_clean_rx_irq 函数
  1. //部分代码
  2. static int igb_clean_rx_irq(struct igb_q_vector *q_vector, const int budget)
  3. {
  4.     struct igb_adapter *adapter = q_vector->adapter;
  5.     struct igb_ring *rx_ring = q_vector->rx.ring;
  6.     struct sk_buff *skb = rx_ring->skb;
  7.     unsigned int total_bytes = 0, total_packets = 0;
  8.     u16 cleaned_count = igb_desc_unused(rx_ring);
  9.     int rx_buf_pgcnt;

  10.     while (likely(total_packets < budget)) {
  11.         union e1000_adv_rx_desc *rx_desc;
  12.         struct igb_rx_buffer *rx_buffer;
  13.         ktime_t timestamp = 0;
  14.         unsigned int size;
  15.         /* return some buffers to hardware, one at a time is too slow */
  16.         if (cleaned_count >= IGB_RX_BUFFER_WRITE) {
  17.             /* 1 */
  18.             igb_alloc_rx_buffers(rx_ring, cleaned_count);
  19.             cleaned_count = 0;
  20.         }
  21.         /* 2 */
  22.         rx_desc = IGB_RX_DESC(rx_ring, rx_ring->next_to_clean);
  23.         size = le16_to_cpu(rx_desc->wb.upper.length);
  24.         /* 3 */
  25.         rx_buffer = igb_get_rx_buffer(rx_ring, size, &rx_buf_pgcnt);
  26.         /* 4 retrieve a buffer from the ring */
  27.         if (!skb) {
  28.             unsigned char *hard_start = pktbuf - igb_rx_offset(rx_ring);
  29.             unsigned int offset = pkt_offset + igb_rx_offset(rx_ring);

  30.             xdp_prepare_buff(&xdp, hard_start, offset, size, true);
  31.             xdp_buff_clear_frags_flag(&xdp);
  32. #if (PAGE_SIZE > 4096)
  33.             /* At larger PAGE_SIZE, frame_sz depend on len size */
  34.             xdp.frame_sz = igb_rx_frame_truesize(rx_ring, size);
  35. #endif
  36.             skb = igb_run_xdp(adapter, rx_ring, &xdp);
  37.         }
  38.         /* 5 retrieve a buffer from the ring */
  39.         if (skb)
  40.             igb_add_rx_frag(rx_ring, rx_buffer, skb, size);
  41.         else if (ring_uses_build_skb(rx_ring))
  42.             skb = igb_build_skb(rx_ring, rx_buffer, &xdp, timestamp);
  43.         else
  44.             skb = igb_construct_skb(rx_ring, rx_buffer, &xdp, timestamp);
  45.         /* 6 */
  46.         igb_put_rx_buffer(rx_ring, rx_buffer, rx_buf_pgcnt);
  47.         cleaned_count++;
  48.         /* 7 fetch next buffer in frame if non-eop */
  49.         if (igb_is_non_eop(rx_ring, rx_desc))
  50.             continue;
  51.         /* 8 verify the packet layout is correct */
  52.         if (igb_cleanup_headers(rx_ring, rx_desc, skb)) {
  53.             skb = NULL;
  54.             continue;
  55.         }
  56.         /* 9 probably a little skewed due to removing CRC */
  57.         total_bytes += skb->len;
  58.         /* 10 populate checksum, timestamp, VLAN, and protocol */
  59.         igb_process_skb_fields(rx_ring, rx_desc, skb);
  60.         /* 11 GRO,合并数据包 */
  61.         napi_gro_receive(&q_vector->napi, skb);
  62.         /* reset skb pointer */
  63.         skb = NULL;
  64.         /* 12 update budget accounting */
  65.         total_packets++;
  66.     }
  67.     /* place incomplete frames back on ring for completion */
  68.     rx_ring->skb = skb;
  69.     if (cleaned_count)
  70.         igb_alloc_rx_buffers(rx_ring, cleaned_count);

  71.     return total_packets;
  72. }
复制代码

igb_clean_rx_irq 函数中的 while 循环完成下面操作:
  • 首先申请一批 rx_buffer 和 rx_desc,通常 IGB_RX_BUFFER_WRITE(16)个,避免一个个申请,效率低,操作由 igb_alloc_rx_buffers 函数完成:使用 dev_alloc_pages 申请新的物理页保存到 rx_buffer->page,然后通过 dma_map_page_attrs 将 page 映射结果保存到 rx_buffer->dma ;修改 rx_desc->read.pkt_addr(rx_buffer->dma + rx_buffer->page_offset),rx_desc->wb.upper.length = 0,方便网卡将收到的数据包 DMA 到 rx_desc->read.pkt_addr 地址,这是第一次复制,从网卡到 Ring Buffer 的复制;
  • 从 Ring Buffer 中取出下一个可读位置(next_to_clean)的 rx_desc,检查它状态是否正常,然后从 rx_desc 获取接收的数据 buffer 大小(wb.upper.length);
  • 通过 igb_get_rx_buffer 函数将下一个可读位置(next_to_clean)的 rx_buffer 获取到;
  • 计算数据包开始地址,page_address(rx_buffer->page) + rx_buffer->page_offset,转换成 xdp_buff 地址,然后交给 BPF 的 xdp 处理;
  • 内核把 rx_buffer 的 page(物理页)对应的 buffer 数据拷贝到 Ring Buffer 的 skb(sk_buff)中,然后把 skb 直接传给协议栈,这是第二次复制,从 Ring Buffer 到网络协议栈的复制。为了减少复制次数,skb 直到上层处理完以后才会被 __kfree_skb 释放;
  • 通过 igb_put_rx_buffer 函数将 rx_buffer->page=NULL,如果可以重用,将 page、dma 等数据移动到rx_ring->next_to_alloc 位置的 rx_buffer;反之,解除 DMA 映射,回收内存;
  • 通过 igb_is_non_eop 函数检查 rx_desc 是不是包含 eop(End of Packet),如果包含,说明 skb 中已经收录一个完整的网络包(帧);反之,需要获取下一个 rx_buffer 里的数据继续复制到 skb 中直到 rx_desc 包含 eop;也就是说一个网络包(存储在 skb 中)可能包含 1 个或多个 rx_buffer 中的 buffer 数据,也可以说 1 个 skb 对应 1 个或多个 Ring Buffer 队列里连续的元素;
  • 通过 igb_cleanup_headers 检查网络包(skb)的头部等信息是否正确;
  • 把 skb 的长度累计到 total_bytes,用于统计数据;
  • 调用 igb_process_skb_fields 设置skb 的 checksum、timestamp、VLAN 和 protocol 等信息,这些信息由硬件提供;
  • 将构建好的 skb 通过 napi_gro_receive 函数上交到网络协议栈,具体细节移步 2.4 章节;
  • 累加处理数据包个数 total_packets,用于消耗 budget;
  • 如果没数据或者 budget 耗尽就退出循环,否则回到 1;


上面第 5 步中,skb 的创建有两种情况,当网卡配置了 legacy 模式,使用 igb_build_skb(napi_build_skb)创建 skb 并复制 rx_buffer->page 数据;否则使用 igb_construct_skb(napi_alloc_skb) 创建 skb 并复制 rx_buffer->page 数据。当 skb 不为空时,就是前一个包被 GRO 合并了,使用 igb_add_rx_frag 复制数据。
budget 的大小会影响到 CPU 的利用率,当数据包特别多的情况下,budget 越大可以减少数据包的延时,但是会影响 CPU 处理其他任务。budget 默认 300,可以调整使用下面命令修改:
$ sysctl -w net.core.netdev_budget=500
前面收包过程都是内核跟网卡硬件和驱动配合来完成的,也就是说不同网卡收包的具体实现可能不同(同一家厂商的网卡的实现基本相同),但是大体实现思路上是一样的,都是用到了 Ring Buffer、DMA、硬中断和软中断等操作。
后面就是由内核和用户程序来完成了,与网卡没有关系了。

3.5 GRO

3.5.1 概述

GRO(Generic Receive Offloading)是 LGO(Large Receive Offload,多数是在 NIC 上实现的一种硬件优化机制)的一种软件实现,从而能让所有 NIC 都支持这个功能。网络上大部分 MTU 都是 1500 字节,开启 Jumbo Frame 后能到 9000 字节,如果发送的数据超过 MTU 就需要切割成多个数据包。通过合并「足够类似」的包来减少传送给网络协议栈的包数,有助于减少 CPU 的使用量。GRO 使协议层只需处理一个 header,而将包含大量数据的整个大包送到用户程序。如果用tcpdump抓包看到机器收到了不现实的、非常大的包,这很可能是系统开启了 GRO。
GRO 和硬中断合并的思想类似,不过阶段不同。硬中断合并是在中断发起之前,而 GRO 已经在软中断处理中了。
查看 GRO 是否开启命令:
  1. $ ethtool -k eth0 | grep generic-receive-offload
  2. generic-receive-offload: on
复制代码

开启 GRO 命令:
  1. $ ethtool -K eth0 gro on
复制代码

napi_gro_receive 就是实现 GRO 机制的入口函数之一。

3.5.2 napi_gro_receive 函数
  1. // 部分代码
  2. gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
  3. {
  4.     gro_result_t ret;

  5.     skb_mark_napi_id(skb, napi);
  6.     trace_napi_gro_receive_entry(skb);

  7.     skb_gro_reset_offset(skb, 0);

  8.     ret = napi_skb_finish(napi, skb, dev_gro_receive(napi, skb));
  9.     trace_napi_gro_receive_exit(ret);

  10.     return ret;
  11. }
复制代码

其主要逻辑有二:
  • 调用 dev_gro_receive 函数具体完成多个数据包的合并,即把skb加入到 NAPI 中,这个操作调用链很长,根据包类型 TCP/UDP 分别判断数据包的完整性和判断需不需要合并;
  • 把上步的返回结果传入 napi_skb_finish 函数继续处理。

3.5.3 napi_skb_finish 函数
  1. static gro_result_t napi_skb_finish(struct napi_struct *napi, struct sk_buff *skb, gro_result_t ret) {
  2.     switch (ret) {
  3.     case GRO_NORMAL:
  4.         gro_normal_one(napi, skb, 1);
  5.         break;

  6.     case GRO_MERGED_FREE:
  7.         if (NAPI_GRO_CB(skb)->free == NAPI_GRO_FREE_STOLEN_HEAD)
  8.             napi_skb_free_stolen_head(skb);
  9.         else if (skb->fclone != SKB_FCLONE_UNAVAILABLE)
  10.             __kfree_skb(skb);
  11.         else
  12.             __kfree_skb_defer(skb);
  13.         break;

  14.     case GRO_HELD:
  15.     case GRO_MERGED:
  16.     case GRO_CONSUMED:
  17.         break;
  18.     }

  19.     return ret;
  20. }
复制代码

  • 如果是 ret 是 GRO_MERGED_FREE,说明 skb 已经被合并,释放 skb;
  • 如果是 ret 是 GRO_NORMAL,会调用 gro_normal_one,它会更新当前 napi->rx_count 计数, 当数量足够多时,将调用 gro_normal_list 函数,将多个包一次性送到协议栈。


3.5.4 gro_normal_one 函数
  1. static inline void gro_normal_one(struct napi_struct *napi, struct sk_buff *skb, int segs) {
  2.     list_add_tail(&skb->list, &napi->rx_list);
  3.     napi->rx_count += segs;
  4.     if (napi->rx_count >= READ_ONCE(gro_normal_batch))
  5.         gro_normal_list(napi);
  6. }
复制代码

这里的阈值gro_normal_batch默认是 8,即攒够 8 个数据包一起送到协议栈,可以通过sysctl修改,命令如下:
  1. $ sysctl net.core.gro_normal_batch
  2. net.core.gro_normal_batch = 8
复制代码

3.5.5 gro_normal_list 函数
  1. /* Pass the currently batched GRO_NORMAL SKBs up to the stack. */
  2. static inline void gro_normal_list(struct napi_struct *napi)
  3. {
  4.     if (!napi->rx_count)    // 没有包直接返回
  5.         return;
  6.     netif_receive_skb_list_internal(&napi->rx_list);
  7.     INIT_LIST_HEAD(&napi->rx_list); // 初始化 napi->rx_list
  8.     napi->rx_count = 0;     // 计数清零
  9. }
复制代码

到这里 GRO 的工作就完成了,然后经过 netif_receive_skb_list_internal 函数多层调用,最终调用 __netif_receive_skb_core 函数把数据包递交网络协议栈。

3.5.6 napi_complete_done 函数
  1. // 部分代码
  2. bool napi_complete_done(struct napi_struct *n, int work_done)
  3. {
  4.     unsigned long flags, val, new, timeout = 0;
  5.     bool ret = true;

  6.     if (unlikely(n->state & (NAPIF_STATE_NPSVC | NAPIF_STATE_IN_BUSY_POLL)))
  7.         return false;

  8.     if (work_done) {
  9.         if (n->gro_bitmask)
  10.             timeout = READ_ONCE(n->dev->gro_flush_timeout);
  11.         n->defer_hard_irqs_count = READ_ONCE(n->dev->napi_defer_hard_irqs);
  12.     }
  13.     if (n->defer_hard_irqs_count > 0) {
  14.         n->defer_hard_irqs_count--;
  15.         timeout = READ_ONCE(n->dev->gro_flush_timeout);
  16.         if (timeout)
  17.             ret = false;
  18.     }
  19.     if (n->gro_bitmask) {
  20.         napi_gro_flush(n, !!timeout);
  21.     }

  22.     gro_normal_list(n);
  23.     ...
  24. }
复制代码

3.4.4 中提到过,poll 函数(igb_poll)在检查是否已经将现有的所有数据包合并完成,如果完成了,则调用 napi_complete_done 函数直接调用 gro_normal_list 函数,及时把 dev_gro_receive 处理完的多个包一次性送到协议栈;

3.6 RPS

3.6.1 概述

RPS(Receive Packet Steering)是 RSS 的一种软件实现。
  • 因为是软件实现的,所以任何网卡都可以使用 RPS,单队列和多队列网卡都可以使用;
  • RPS 在数据包从 Ring Buffer 中取出来后开始工作,将 Packet hash 到对应 CPU 的 backlog 中,并触发 IPI(Inter-processorInterrupt,进程间中断)告知目标 CPU 来处理 backlog。该 Packet 将被目标 CPU 交到协议栈。从而实现将负载分散到多个 CPU 的目的;
  • 单队列网卡使用 RPS 可以提升传输效率,多队列网卡在硬中断不均匀时同样可以使用来提升效率;

IPI 既像软件中断又像硬件中断,它的产生像软件中断,是在程序中用代码发送的,而它的处理像硬件中断

3.6.2 GRO 后执行 RPS

GRO 机制的最后一个函数 gro_normal_list 调用了 netif_receive_skb_list_internal 函数,后者和 netif_receive_skb_internal 函数均有下面类似的代码:
  1. void netif_receive_skb_list_internal(struct list_head *head)
  2. {
  3. #ifdef CONFIG_RPS
  4.     if (static_branch_unlikely(&rps_needed)) {
  5.         list_for_each_entry_safe(skb, next, head, list) {
  6.             struct rps_dev_flow voidflow, *rflow = &voidflow;
  7.             /* 目标 CPU 的 id */
  8.             int cpu = get_rps_cpu(skb->dev, skb, &rflow);
  9.             if (cpu >= 0) {
  10.                 /* Will be handled, remove from list */
  11.                 skb_list_del_init(skb);
  12.                 enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
  13.             }
  14.         }
  15.     }
  16. #endif
  17. }
复制代码

上面代码判断是否设置了 RPS 对数据包进行不同的处理:
  • 如果没有配置 RPS,netif_receive_skb* 将数据包交到网络协议栈;
  • 如果配置了 RPS,netif_receive_skb* 调用 get_rps_cpu 来计算网络包的 hash 并决定压入哪个 CPU 的 backlog,具体压入操作由 enqueue_to_backlog 函数完成。


3.6.3 压入 backlog 队列

enqueue_to_backlog 函数如下:
  1. // 部分代码
  2. static int enqueue_to_backlog(struct sk_buff *skb, int cpu, unsigned int *qtail) {
  3.     enum skb_drop_reason reason;
  4.     struct softnet_data *sd;
  5.     unsigned long flags;
  6.     unsigned int qlen;

  7.     reason = SKB_DROP_REASON_NOT_SPECIFIED;
  8.     sd = &per_cpu(softnet_data, cpu);

  9.     rps_lock_irqsave(sd, &flags);
  10.     if (!netif_running(skb->dev))
  11.         goto drop;
  12.     qlen = skb_queue_len(&sd->input_pkt_queue);
  13.     if (qlen <= READ_ONCE(netdev_max_backlog) && !skb_flow_limit(skb, qlen)) {
  14.         if (qlen) {
  15. enqueue:
  16.             __skb_queue_tail(&sd->input_pkt_queue, skb);
  17.             input_queue_tail_incr_save(sd, qtail);
  18.             rps_unlock_irq_restore(sd, &flags);
  19.             return NET_RX_SUCCESS;
  20.         }

  21.         if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state))
  22.             // 将目标 CPU 的 sd 挂到当前 CPU 的 sd 的 rps_ipi_list 便于后续向目标 CPU 发送 IPI 信号。
  23.             napi_schedule_rps(sd);
  24.         goto enqueue;
  25.     }
  26.     reason = SKB_DROP_REASON_CPU_BACKLOG;

  27. drop:
  28.     sd->dropped++;
  29.     rps_unlock_irq_restore(sd, &flags);

  30.     dev_core_stats_rx_dropped_inc(skb->dev);
  31.     kfree_skb_reason(skb, reason);
  32.     return NET_RX_DROP;
  33. }
复制代码

  • 当目标 CPU 的sd(softnet_data )中input_pkt_queue队列长度同时不超过netdev_max_backlog和flow limit的值,将skb数据包压入input_pkt_queue,否则将会被丢弃。
  • 调用 napi_schedule_rps,将目标 CPU 的sd挂到本 CPU 的sd的rps_ipi_list便于后续向目标 CPU 发送 IPI 信号;
  • 当返回到 net_rx_action 函数中,最后一步经过调用链 net_rps_action_and_irq_enable -> net_rps_send_ipi -> smp_call_function_single_async 远程激活sd->rps_ipi_list中的其他 CPU 的软中断,使其他 CPU 执行初始化时注册的软中断函数 csd = rps_trigger_softirq 来处理数据包;
  • rps_trigger_softirq 函数将 backlog(napi)加入 poll_list 里,然后发出软中断信号 NET_RX_SOFTIRQ;
  • 当处理软中断函数 net_rx_action 处理poll_list时,backlog 的 poll 是 process_backlog 函数,process_backlog 函数消费 CPU 的input_pkt_queue队列数据包,经过 __netif_receive_skb 函数多层调用,最终也调用 __netif_receive_skb_core 函数把数据包递交网络协议栈。

伪文件 /proc/net/softnet_stat 的第 10 列记录了每个 CPU 收到了多少次 IPI。
图8
上面的命令最终会调用下面的 softnet_seq_show 函数:
  1. static int softnet_seq_show(struct seq_file *seq, void *v)
  2. {
  3.     struct softnet_data *sd = v;
  4.     unsigned int flow_limit_count = 0;

  5. #ifdef CONFIG_NET_FLOW_LIMIT
  6.     struct sd_flow_limit *fl;

  7.     rcu_read_lock();
  8.     fl = rcu_dereference(sd->flow_limit);
  9.     if (fl)
  10.         flow_limit_count = fl->count;
  11.     rcu_read_unlock();
  12. #endif

  13.     seq_printf(seq, "%08x %08x %08x %08x %08x %08x %08x %08x %08x %08x %08x %08x %08x\n",
  14.         sd->processed, sd->dropped, sd->time_squeeze, 0, 0, 0, 0,
  15.         0, /* was fastroute */
  16.         0, /* was cpu_collision */
  17.         sd->received_rps, flow_limit_count, softnet_backlog_len(sd),
  18.         (int)seq->index);
  19.     return 0;
  20. }
复制代码

从代码可以看出 IPI 的次数记录在了 softnet_data 的 received_rps 里,除了 IPI 还有第 12 列的 backlog 队列长度。

3.7 递交协议栈

如图4 调用链所示,netif_receive_skb_list_internal 函数经过多层调用,最后会执行 __netif_receive_skb_core 函数,这是网络数据包接收的核心函数,负责处理接收到的数据包并决定如何传递给上层协议处理。这里面做的事情非常多, 按顺序包括:
  • 准备工作;
  • XDP 处理;
  • VLAN 标记;
  • TAP 处理;
  • TC 处理;
  • Netfilter 处理;
  • 递交协议栈。

有的网卡会在 poll 函数里调用 netif_receive_skb 将数据包交到上层网络栈继续处理。最后发现同样会调用到 __netif_receive_skb_core 函数。
__netif_receive_skb_core 函数代码如下:
  1. static int __netif_receive_skb_core(struct sk_buff **pskb, bool pfmemalloc, struct packet_type **ppt_prev)
  2. {
  3.     struct packet_type *ptype, *pt_prev;
  4.     rx_handler_func_t *rx_handler;
  5.     struct sk_buff *skb = *pskb;
  6.     struct net_device *orig_dev;
  7.     bool deliver_exact = false;
  8.     int ret = NET_RX_DROP;
  9.     __be16 type;
  10.     // 检查网络包的时间戳。
  11.     net_timestamp_check(!READ_ONCE(netdev_tstamp_prequeue), skb);
  12.     // 跟踪网络数据包的接收过程,用于调试和性能分析。
  13.     trace_netif_receive_skb(skb);
  14.     // 将接收到的数据包的网络设备指针保存到 orig_dev 变量中,以备后续使用。
  15.     orig_dev = skb->dev;
  16.     // 重置网络头部的偏移量,使其指向正确的位置。
  17.     skb_reset_network_header(skb);
  18.     if (!skb_transport_header_was_set(skb))
  19.         //如果传输头部未设置,则重置传输层头部的偏移量,使其指向正确的位置。
  20.         skb_reset_transport_header(skb);
  21.     // 重置数据包的 MAC 长度。
  22.     skb_reset_mac_len(skb);
  23.     // 将 pt_prev 变量初始化为空,用于存储上一个处理函数。
  24.     pt_prev = NULL;

  25. another_round: //这是一个标签,用于在处理过程中跳转到此处重新执行一轮处理。
  26.     // 设置数据包的 skb_iif 字段,表示skb 是从哪个网络设备接收的。
  27.     skb->skb_iif = skb->dev->ifindex;
  28.     // 增加当前 CPU 上的 softnet_data.processed 字段的计数。
  29.     __this_cpu_inc(softnet_data.processed);
  30.     // 如果启用了 Generic XDP(软件实现 XDP 功能),则调用do_xdp_generic()函数执行 XDP 通用程序的处理。
  31.     if (static_branch_unlikely(&generic_xdp_needed_key)) {
  32.         int ret2;
  33.         migrate_disable();
  34.         ret2 = do_xdp_generic(rcu_dereference(skb->dev->xdp_prog), skb);
  35.         migrate_enable();
  36.         // 如果返回结果不是XDP_PASS,则将返回值设置为NET_RX_DROP并跳转到标签out处。
  37.         if (ret2 != XDP_PASS) {
  38.             ret = NET_RX_DROP;
  39.             goto out;
  40.         }
  41.     }
  42.     // 如果数据包是以太网 VLAN 数据包,则调用skb_vlan_untag()函数将 VLAN 标签从数据包中移除。
  43.     if (eth_type_vlan(skb->protocol)) {
  44.         skb = skb_vlan_untag(skb);
  45.         // 如果 skb 为空,则跳转到 out 标签
  46.         if (unlikely(!skb))
  47.             goto out;
  48.     }
  49.     // 如果需要跳过 TC 分类,则直接跳转到 skip_classify 标签。
  50.     if (skb_skip_tc_classify(skb))
  51.         goto skip_classify;
  52.     // 如果 pfmemalloc 为 true,则跳转到 skip_taps 标签。
  53.     if (pfmemalloc)
  54.         goto skip_taps;
  55.     // 这个循环遍历全局的注册的协议处理函数 ptype_all 链表,依次调用 deliver_skb 函数传递数据包给每个注册的协议处理程序。
  56.     list_for_each_entry_rcu(ptype, &ptype_all, list) {
  57.         if (pt_prev)
  58.             ret = deliver_skb(skb, pt_prev, orig_dev); // 抓包:dev_add_pack(&po->prot_hook) 注册的钩子函数
  59.         pt_prev = ptype;
  60.     }
  61.     // 这个循环遍历接收数据包的网络设备的协议处理函数 ptype_all 链表,同样依次调用 deliver_skb 函数传递数据包给每个注册的协议处理程序。
  62.     list_for_each_entry_rcu(ptype, &skb->dev->ptype_all, list) {
  63.         if (pt_prev)
  64.             ret = deliver_skb(skb, pt_prev, orig_dev); // 抓包:dev_add_pack(&po->prot_hook) 注册的钩子函数
  65.         pt_prev = ptype;
  66.     }
复制代码


skip_taps: // 如果是使用 goto 跳转过来的,那跳过了抓包逻辑(libpcap、tcpdump 等)

  1. #ifdef CONFIG_NET_INGRESS // 这部分代码用于处理网络数据包的入口(ingress)功能,即在数据包进入网络协议栈之前进行处理。
  2.     // 如果需要进行 TC ingress 处理
  3.     if (static_branch_unlikely(&ingress_needed_key)) {
  4.         bool another = false;
  5.         // 跳过 egress
  6.         nf_skip_egress(skb, true);
  7.         // 处理 ingress
  8.         skb = sch_handle_ingress(skb, &pt_prev, &ret, orig_dev, &another);
  9.         // 如果还需要进行下一轮处理,则跳转到 another_round 标签
  10.         if (another) //TC BPF 优化,通过 another round 将包从宿主机网卡直接送到容器 netns 内网卡 ?
  11.             goto another_round;
  12.         if (!skb)
  13.             goto out;
  14.         // 跳过 egress
  15.         nf_skip_egress(skb, false);
  16.         // 处理 Netfilter ingress
  17.         if (nf_ingress(skb, &pt_prev, &ret, orig_dev) < 0)
  18.             goto out;
  19.     }
  20. #endif
  21.     // 重置数据包的重定向标志
  22.     skb_reset_redirect(skb);
  23. skip_classify: // 如果是使用 goto 跳转过来的,那跳过了抓包、TC、Netfilter 逻辑
  24.     // 如果 pfmemalloc 为 true,并且 skb 没有设置 pfmemalloc 协议,则跳转到 drop 标签
  25.     if (pfmemalloc && !skb_pfmemalloc_protocol(skb))
  26.         goto drop;
  27.     if (skb_vlan_tag_present(skb)) {
  28.         // 如果数据包中存在 VLAN 标签,则调用 deliver_skb() 函数将数据包传递给之前注册的协议处理函数进行处理
  29.         if (pt_prev) {
  30.             ret = deliver_skb(skb, pt_prev, orig_dev);
  31.             pt_prev = NULL;
  32.         }
  33.         // 调用 vlan_do_receive() 函数处理 VLAN 相关操作
  34.         if (vlan_do_receive(&skb))
  35.             goto another_round;
  36.         else if (unlikely(!skb))
  37.             goto out;
  38.     }
  39.     // 获取接收该数据包的网络设备的接收处理函数(rx_handler)
  40.     rx_handler = rcu_dereference(skb->dev->rx_handler);
  41.     if (rx_handler) {
  42.         // 如果接收处理函数存在,则调用 deliver_skb() 函数将数据包传递给接收处理函数进行处理
  43.         if (pt_prev) {
  44.             ret = deliver_skb(skb, pt_prev, orig_dev);
  45.             pt_prev = NULL;
  46.         }
  47.         // 根据接收处理函数的返回值,有不同的处理逻辑
  48.         switch (rx_handler(&skb)) {
  49.         case RX_HANDLER_CONSUMED:
  50.             ret = NET_RX_SUCCESS;
  51.             goto out;
  52.         case RX_HANDLER_ANOTHER:
  53.             goto another_round;
  54.         case RX_HANDLER_EXACT:
  55.             deliver_exact = true;
  56.             break;
  57.         case RX_HANDLER_PASS:
  58.             break;
  59.         default:
  60.             BUG();
  61.         }
  62.     }
  63.     // 如果存在 VLAN 标签,并且网络设备不使用 DSA(Distributed Switch Architecture)
  64.     if (unlikely(skb_vlan_tag_present(skb)) && !netdev_uses_dsa(skb->dev)) {
  65. check_vlan_id:
  66.         if (skb_vlan_tag_get_id(skb)) {
  67.             // VLAN ID 非 0,并且无法找到 VLAN 设备
  68.             skb->pkt_type = PACKET_OTHERHOST;
  69.         } else if (eth_type_vlan(skb->protocol)) {
  70.             // 外部头部是 802.1P 带有 VLAN 0,内部头部是 802.1Q 或 802.1AD,并且无法找到 VLAN ID 0 对应的 VLAN 设备
  71.             __vlan_hwaccel_clear_tag(skb);
  72.             skb = skb_vlan_untag(skb);
  73.             if (unlikely(!skb))
  74.                 goto out;
  75.             if (vlan_do_receive(&skb))
  76.                 goto another_round;
  77.             else if (unlikely(!skb))
  78.                 goto out;
  79.             else
  80.                 goto check_vlan_id;
  81.         }
  82.         __vlan_hwaccel_clear_tag(skb);
  83.     }
  84.     // 获取数据包的协议类型
  85.     type = skb->protocol;

  86.     if (likely(!deliver_exact))
  87.         // 如果没有设置精确匹配,将调用 deliver_ptype_list_skb() 函数传递数据包给指定的注册的协议处理函数处理。
  88.         deliver_ptype_list_skb(skb, &pt_prev, orig_dev, type, &ptype_base[ntohs(type) & PTYPE_HASH_MASK]);
  89.     // 调用 deliver_ptype_list_skb() 函数传递数据包给指定的协议处理函数处理
  90.     deliver_ptype_list_skb(skb, &pt_prev, orig_dev, type, &orig_dev->ptype_specific);
  91.     if (unlikely(skb->dev != orig_dev))
  92.         // 如果数据包的网络设备与接收时的网络设备不一致,将调用 deliver_ptype_list_skb() 函数传递数据包给指定的协议处理函数处理。
  93.         deliver_ptype_list_skb(skb, &pt_prev, orig_dev, type, &skb->dev->ptype_specific);

  94.     if (pt_prev) {
  95.         // 如果存在上一个协议处理函数,将调用该处理函数来处理数据包。说明数据包有未处理的分片数据,调用 skb_orphan_frags_rx 函数处理剩余的分片数据。
  96.         if (unlikely(skb_orphan_frags_rx(skb, GFP_ATOMIC)))
  97.             goto drop;
  98.         *ppt_prev = pt_prev;
  99.     } else {
  100.         // 如果不存在上一个协议处理函数,表示没有合适的处理函数来处理数据包,将丢弃数据包并增加接收丢弃计数。
  101. drop:
  102.         if (!deliver_exact)
  103.             // 更新网卡的 rx_dropped 统计
  104.             dev_core_stats_rx_dropped_inc(skb->dev);
  105.         else
  106.             // 更新网卡的 rx_nohandler 统计
  107.             dev_core_stats_rx_nohandler_inc(skb->dev);
  108.         kfree_skb_reason(skb, SKB_DROP_REASON_UNHANDLED_PROTO);
  109.         ret = NET_RX_DROP;
  110.     }

  111. out:
  112.     //将处理完的 skb 赋值回 pskb 指针
  113.     *pskb = skb;
  114.     return ret;
  115. }
复制代码

3.7.1 准备工作

函数开始时,对传入的数据包进行一些准备工作,如:
  • 处理 skb 时间戳;
  • 重置网络头;
  • 重置传输头;
  • 重置 MAC 长度;
  • 设置数据包的接收接口索引。


3.7.2 XDP(eXpress Data Path)处理

这里的 XDP 是软件层面的实现,当硬件网卡不支持 offload 模式的 XDP,可以选择 Generic 模式的 XDP。
  • 前者早在 igb_clean_rx_irq 中执行(前面讲过)避免了后面很多流程所以效率很高;
  • 后者效率低,做了很多无用功,所以主要用来功能验证和测试。


3.7.3 VLAN 处理
如果数据包使用了 VLAN 标记,首先去除 VLAN 标记,并判断是否成功。
  • 如果成功,继续处理去除标记后的数据包;
  • 否则,跳过该数据包。


3.7.4 TAP 处理
根据数据包的 packet_type,按照 ptype_all 链表中的顺序遍历所有的 packet_type,逐个尝试将数据包交给相应的处理函数进行处理。比如交给前面初始化注册的函数,一般通过 libpcap 库埋的探测点(TAP),用于 tcpdump 抓包。
net/packet/af_packet.c:     dev_add_pack(&po->prot_hook);    //用于抓包
net/packet/af_packet.c:     dev_add_pack(&f->prot_hook);     //用于抓包
3.7.5 TC 处理
TC(Traffic Control)是 Linux 的流量控制子系统,通过调用 sch_handle_ingress 函数进入 TC ingress 处理。
  • 以前主要用于限速;
  • 5.10 版本之后,可以使用 TC BPF 编程来做流量的透明拦截和负载均衡。

3.7.6 Netfilter 处理

Netfilter 是 Linux 的包过滤子系统,iptables 是其用户空间的客户端。通过调用 nf_ingress 函数进入 Netfilter ingress 处理。

3.7.7 递交协议栈

根据协议类型packet_type.type在 ptype_base 哈希表中找到对应函数保存在packet_type.func,最终通过 deliver_skb 函数调用packet_type.func把skb交到对应的处理函数处理。
  • 例如 packet_type.func = prot_hook,就会递交到 af_packe,可以被 tcpdump 抓包;
  • 例如 packet_type.func = ip_rcv,就会递交到协议栈入口。

  1. static inline int deliver_skb(struct sk_buff *skb, struct packet_type *pt_prev, struct net_device *orig_dev)
  2. {
  3.     if (unlikely(skb_orphan_frags_rx(skb, GFP_ATOMIC)))
  4.         return -ENOMEM;
  5.     refcount_inc(&skb->users);
  6.     return pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
  7. }
复制代码


+10
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|Archiver|手机版|深圳市光明谷科技有限公司|光明谷商城|Sunshine Silicon Corpporation ( 粤ICP备14060730号|Sitemap

GMT+8, 2024-4-29 19:34 , Processed in 0.118336 second(s), 42 queries .

Powered by Discuz! X3.2 Licensed

© 2001-2013 Comsenz Inc.

快速回复 返回顶部 返回列表