2009年5月12日星期二

NAPI 技术在 Linux 网络驱动上的应用和完善

NAPI 是 Linux 上采用的一种提高网络处理效率的技术,它的核心概念就是不采用中断的方式读取数据,而代之以 POLL 的方法来轮询数据,类似于底半方式(bottom-half 的处理模式);但是目前在 Linux 的 NAPI 工作效率比较差,本文在分析 NAPI 的同时,提供了一种高效的改善方式供大家参考。

前言:

NAPI 是 Linux 上采用的一种提高网络处理效率的技术,它的核心概念就是不采用中断的方式读取数据,而代之以首先采用中断唤醒数据接收的服务程序,然后 POLL 的方法来轮询数据,(类似于底半(bottom-half)处理模式);从我们在实验中所得到的数据来看,在随着网络的接收速度的增加,NIC 触发的中断能做到不断减少,目前 NAPI 技术已经在网卡驱动层和网络层得到了广泛的应用,驱动层次上已经有 E1000 系列网卡,RTL8139 系列网卡,3c50X 系列等主流的网络适配器都采用了这个技术,而在网络层次上,NAPI 技术已经完全被应用到了著名的 netif_rx 函数中间,并且提供了专门的 POLL 方法--process_backlog 来处理轮询的方法;根据实验数据表明采用NAPI技术可以大大改善短长度数据包接收的效率,减少中断触发的时间;由于 RTL8139CP 是一种应用比较广泛的网络适配器,所以本文以其为例,说明了NAPI技术在网络适配器上的应用和基本原理。

但是 NAPI 存在一些比较严重的缺陷:而对于上层的应用程序而言,系统不能在每个数据包接收到的时候都可以及时地去处理它,而且随着传输速度增加,累计的数据包将会耗费大量的内存,经过实验表明在 Linux 平台上这个问题会比在 FreeBSD 上要严重一些;另外采用 NAPI 所造成的另外一个问题是对于大的数据包处理比较困难,原因是大的数据包传送到网络层上的时候耗费的时间比短数据包长很多(即使是采用 DMA 方式),所以正如前面所说的那样,NAPI 技术适用于对高速率的短长度数据包的处理,在本文的末尾提出了 NAPI 的改善方法,和实验数据。





回页首


使用 NAPI 先决条件:

驱动可以继续使用老的 2.4 内核的网络驱动程序接口,NAPI 的加入并不会导致向前兼容性的丧失,但是 NAPI 的使用至少要得到下面的保证:

A. 要使用 DMA 的环形输入队列(也就是 ring_dma,这个在 2.4 驱动中关于 Ethernet 的部分有详细的介绍),或者是有足够的内存空间缓存驱动获得的包。

B. 在发送/接收数据包产生中断的时候有能力关断 NIC 中断的事件处理,并且在关断 NIC 以后,并不影响数据包接收到网络设备的环形缓冲区(以下简称 rx-ring)处理队列中。

NAPI 对数据包到达的事件的处理采用轮询方法,在数据包达到的时候,NAPI 就会强制执行dev->poll 方法。而和不象以前的驱动那样为了减少包到达时间的处理延迟,通常采用中断的方法来进行。

应当注意的是,经过测试如果 DEC Tulip 系列(DE21x4x芯片)以及 National Semi 的部分网卡芯片,的测试表明如果把从前中断处理的部分都改换用设备的 POLL 方法去执行,那么会造成轻微的延迟,因此在进行 MII(介质无关)的操作上就需要一些小小的诀窍,详见 mii_check_media的函数处理流程,本文不做详细讨论。

在下面显示的例子表示了在 8139 中如何把处理过程放在 dev 的 poll 方法中,把所有的原来中断应该处理的过程放在了 POLL 方法里面,篇幅起见,我们只介绍接收的 POLL 方法。

在下面的 8139CP 驱动程序介绍中表明了可以把在中断程序中所做的任何事情放在 POLL 方法中去做,当然不同的 NIC 在中断中所要处理的状态和事件是不一样的。

对于所有的 NIC 设备,以下两种类型的 NIC 接收事件寄存器响应机制:

  1. COR 机制:当用户程序读状态/事件寄存器,读完成的时候寄存器和NIC的rx-ring中表示的状态队列将被清零,natsemi 和 sunbmac 的 NIC 会这样做,在这种情况下,必须把 NIC 所有以前的中断响应的处理部分都移动到 POLL 方法中去。
  2. COW 机制:用户程序写状态寄存器的时候,必须对要写的位先写 1 清 0,如下面要介绍的 8139CP 就是这样的类型,大多数的 NIC 都属于这种类型,而且这种类型对 NAPI 响应得最好,它只需要把接收的数据包处理部分放置在 POLL 方法中,而接收事件的状态处理部分放在原先的中断控制程序中,我们等下将要介绍的 8139CP 类型网卡就是属于这种类型。

C. 有防止 NIC 队列中排队的数据包冲突的能力。

当关断发送/接收事件中断的时候,NAPI 将在 POLL 中被调用处理,由于 POLL 方法的时候,NIC 中断已经不能通知包到达,那么这个时候在如果在完成轮询,并且中断打开以后,会马上有一个 NIC 中断产生,从而触发一次 POLL 事件,这种在中断关断时刻到达的包我们称为"rotting";这样就会在 POLL 机制和 NIC 中断之间产生一个竞争,解决的方法就是利用网卡的接收状态位,继续接收环形队列缓冲 rx-ring 中的数据,直到没有数据接收以后,才使能中断。





回页首


锁定和防冲突机制:

- 1.SMP 的保证机制:保证同时只有一个处理器调用网络设备的 POLL 方法,因为我们将在下面看到同时只有一个处理器可以对调用 netif_rx_schedule 挂在 POLL 队列中的 NIC 设备调用POLL 方法。

- 2. 网络核心层(net core)调用设备驱动程序使用循环方式发送数据包,在设备驱动层接收数据包的时候完全无锁的接收,而网络核心层则同样要保证每次只有一个处理器可以使用软中断处理接收队列。

- 3. 在多个处理器对 NIC 的 rx-ring 访问的时刻只能发生在对循环队列调用关闭(close)和挂起(suspend)方法的时候(在这个时刻会试图清除接收循环队列)

- 4. 数据同步的问题(对于接收循环队列来说),驱动程序是不需要考虑的网络层上的程序已经把这些事情做完了。

- 5. 如果没有把全部的部分交给 POLL 方法处理,那么 NIC 中断仍然需要使能,接收链路状态发生变化和发送完成中断仍然和以前的处理步骤一样,这样处理的假设是接收中断是设备负载最大的的情况,当然并不能说这样一定正确。

下面的部分将详细介绍在接收事件中调用设备的 POLL 方法。





回页首


NAPI 提供的重要函数和数据结构和函数:

核心数据结构:

struct softnet_data 结构内的字段就是 NIC 和网络层之间处理队列,这个结构是全局的,它从 NIC中断和 POLL 方法之间传递数据信息。其中包含的字段有:

struct softnet_data
{
int throttle; /*为 1 表示当前队列的数据包被禁止*/
int cng_level; /*表示当前处理器的数据包处理拥塞程度*/
int avg_blog; /*某个处理器的平均拥塞度*/
struct sk_buff_head input_pkt_queue; /*接收缓冲区的sk_buff队列*/
struct list_head poll_list; /*POLL设备队列头*/
struct net_device output_queue; /*网络设备发送队列的队列头*/
struct sk_buff completion_queue; /*完成发送的数据包等待释放的队列*/
struct net_device backlog_dev; /*表示当前参与POLL处理的网络设备*/
};


核心 API:

1. netif_rx_schedule(dev)

这个函数被中断服务程序调用,将设备的 POLL 方法添加到网络层次的 POLL 处理队列中去,排队并且准备接收数据包,在使用之前需要调用 netif_rx_reschedule_prep,并且返回的数为 1,并且触发一个 NET_RX_SOFTIRQ 的软中断通知网络层接收数据包。

2. netif_rx_schedule_prep(dev)

确定设备处于运行,而且设备还没有被添加到网络层的 POLL 处理队列中,在调用 netif_rx_schedule之前会调用这个函数。

3. netif_rx_complete(dev)

把当前指定的设备从 POLL 队列中清除,通常被设备的 POLL 方法调用,注意如果在 POLL 队列处于工作状态的时候是不能把指定设备清除的,否则将会出错。


 




回页首


如何在8139CP使用NAPI:


 

从 POLL 方法的本质意义上来说就在于尽量减少中断的数目,特别在于大量的小长度的数据包的时候,减少中断,以达到不要让整个操作系统花费太多的时间在中断现场的保护和恢复上,以便把赢得的时间用来在我网络层上的处理数据的传输,例如在下面介绍的 8139CP 中断的处理过程中,目的就在于尽快把产生中断的设备挂在 poll_list,并且关闭接收中断,最后直接调用设备的POLL方法来处理数据包的接收,直到收到数据包收无可收,或者是达到一个时间片内的调度完成。

RTL8139C+ 的数据接收环形缓冲队列:

RTL8139C+ 的接收方式是一种全新的缓冲方式,能显著的降低CPU接收数据造成的花费,适合大型的服务器使用,适合 IP,TCP,UDP 等多种方式的数据下载,以及连接 IEEE802.1P,802.1Q,VLAN等网络形式;在 8139CP 中分别有 64 个连续的接收/发送描述符单元,对应三个不同的环形缓冲队列--一个是高优先级传输描述符队列,一个是普通优先级传输符描述队列,一个是接收符描述队列,每个环形缓冲队列右 64 个4个双字的连续描述符组成,每个描述符有 4 个连续的双字组成,每个描述符的开始地址在 256 个字节的位置对齐,接收数据之前,软件需要预先分配一个 DMA 缓冲区,一般对于传输而言,缓冲区最大为 8Kbyte 并且把物理地址链接在描述符的 DMA 地址描述单元,另外还有两个双字的单元表示对应的 DMA 缓冲区的接收状态。

在 /driver/net/8139CP.C 中对于环形缓冲队列描述符的数据单元如下表示:

struct cp_desc { u32 opts1;/*缓冲区状态控制符,包含缓冲区大小,缓冲区传输启动位*/ u32 opts2;/*专门用于VLAN部分*/ u64 addr; /*缓冲区的DMA地址*/ };

8139CP 的 NIC 中断:

static irqreturn_t
cp_interrupt (int irq, void *dev_instance, struct pt_regs *regs)
{
struct net_device *dev = dev_instance;
struct cp_private *cp = dev->priv;
u16 status;
/*检查rx-ring中是否有中断到达*/
status = cpr16(IntrStatus);
if (!status || (status == 0xFFFF))
return IRQ_NONE;
if (netif_msg_intr(cp))
printk(KERN_DEBUG "%s: intr, status %04x cmd %02x cpcmd %04x\n",
dev->name, status, cpr8(Cmd), cpr16(CpCmd));
/*清除NIC中断控制器的内容*/
cpw16(IntrStatus, status & ~cp_rx_intr_mask);
spin_lock(&cp->lock);
/*接收状态寄存器表示有数据包到达*/
if (status & (RxOK | RxErr | RxEmpty | RxFIFOOvr)) {
/*把当前的产生中断的NIC设备挂在softnet_data中的POLL队列上,等待网络上层上的应用程序处理*/
if (netif_rx_schedule_prep(dev)) {
/*关闭接收中断使能*/
cpw16_f(IntrMask, cp_norx_intr_mask);
__netif_rx_schedule(dev);
}
}
/*发送中断的处理过程以及8139C+的专门软中断的处理过程,这里我们不关心*/
if (status & (TxOK | TxErr | TxEmpty | SWInt))
cp_tx(cp);
/*如果发生链路变化的情况,需要检查介质无关接口(MII)的载波状态同样也发生变化,
否则就要准备重新启动MII接口*/
if (status & LinkChg)
mii_check_media(&cp->mii_if, netif_msg_link(cp), FALSE);
/*如果PCI总线发生错误,需要对8139C+的设备重新复位*/
if (status & PciErr) {
u16 pci_status;
pci_read_config_word(cp->pdev, PCI_STATUS, &pci_status);
pci_write_config_word(cp->pdev, PCI_STATUS, pci_status);
printk(KERN_ERR "%s: PCI bus error, status=%04x, PCI status=%04x\n",
dev->name, status, pci_status);
/* TODO: reset hardware */
}
spin_unlock(&cp->lock);
return IRQ_HANDLED;
}


把 NIC 挂在 POLL 队列(poll_list)上

在 8139CP 的中断程序可以看到 __netif_rx_schedule 的调用方式,它把 NIC 设备挂在softnet_data 结构中的 poll_list 队列上,以便及时的返回中断,让专门数据包处理 bottom-half部分来进行处理,我们先来看一下 __netif_rx_schedule 的内部工作流程。

static inline void __netif_rx_schedule(struct net_device *dev)
{
unsigned long flags;
local_irq_save(flags);
dev_hold(dev);
/*把当前NIC设备挂在POLL(poll_list)队列中,等待唤醒软中断以后进行轮询*/
list_add_tail(&dev->poll_list, &__get_cpu_var(softnet_data).poll_list);
/*确定当前该设备所要准备接收的包大小*/
if (dev->quota < 0)
dev->quota += dev->weight;
else
dev->quota = dev->weight;
/*启动软中断,在表示所有中断的状态字irq_cpustat_t中关于软中断字段__softirq_pending中,
把关于网络轮循接收软中断位置1,等待调度时机来临时候运行该中断的句柄net_rx_action。*/
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
local_irq_restore(flags);
}

由 __netif_rx_schedule 启动的软中断的处理过程分析

软中断事件触发前已经在此设备子系统初始化时刻调用 subsys_initcall(net_dev_init) 在软中断控制台上被激活,挂在任务队列 tasklet 上准备在任务调度 schedule 的时刻运行它了,这个里面最主要的部分是调用了 8139C+ 网络设备的 POLL 方法(dev->poll),从网络设备的 rx-ring队列中获得数据,本来它应当放在网络设备中断服务程序中执行的,按照我们前面解释的那样,POLL方法以空间换取时间的机制把它放在软中断部分来执行轮循机制(采用类似老的 Bottom-half 机制也可以达到同样效果,而且更加容易理解一些)在每次进行进程调度的时候就会执行网络设备软中断,轮询 rx-ring 对 NIC 进行数据的接收。

软中断的处理过程:

static void net_rx_action(struct softirq_action *h)
{
struct softnet_data *queue = &__get_cpu_var(softnet_data);
unsigned long start_time = jiffies;
int budget = netdev_max_backlog;/*表示队列的最大长度*/
/*锁定当前线程,多处理器的情况之下不能被其他处理器中断处理*/
preempt_disable();
local_irq_disable();
/*检查POLL队列(poll_list)上是否有设备在准备等待轮询取得数据*/
while (!list_empty(&queue->poll_list)) {
struct net_device *dev;
/*这里保证执行当前的 POLL 过程的时间不超过一个时间片,这样不至于被软中断占用太多的时间,
这样在一次调度的时间内执行完毕当前的 POLL 过程,budget 表示一个时间片内最大数据传输的"块数",
块的意思为每个 POLL 所完成 sk_buff数量,每块中间的 sk_buff 数量为 dev->quota 决定,在 8139CP 驱动中,
budget 为 300,而 quota 为 16 表示每给时间片最多可以接收到 4.8K 的 sk_buff 数量*/
if (budget <= 0 || jiffies - start_time > 1)
goto softnet_break;
local_irq_enable();
/*从公共的 softnet_data 数据结构中的轮循队列上获得等待轮循的设备结构*/
dev = list_entry(queue->poll_list.next,
struct net_device, poll_list);
/*调用设备的POLL方法从NIC上的Ring Buffer中读入数据*/
if (dev->quota <= 0 || dev->poll(dev, &budget)) {
/*完成一次POLL过程的数据的接收,重新定义设备接收数据的"配额"
(事实上就是sk_buff缓冲区的数量,每次调用POLL方法的时候可以创建并且最
多可以向上层提交的sk_buff缓冲区数目,这个参数很重要在高速处理的时候有需要慎重优化这个数值,
在有大量数据接收的情况下,需要增加该数值)*/
local_irq_disable();
list_del(&dev->poll_list);
list_add_tail(&dev->poll_list, &queue->poll_list);
if (dev->quota < 0)
dev->quota += dev->weight;
else
dev->quota = dev->weight;
} else {
/*发生了错误的数据接收状况,或者没有完成"规定"配额的数据接收,并且没有新的数据进来,
这个也可能表示已经完成了传输的过程,调用__netif_rx_complete把网络设备从POLL队列上清除
(介绍POLL过程的时候详细介绍)*/
dev_put(dev);
local_irq_disable();
}
}
out:
local_irq_enable();
preempt_enable();
return;
softnet_break:
__get_cpu_var(netdev_rx_stat).time_squeeze++;
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
goto out;
}


8139CP 驱动中的轮询方法

dev->poll 方法:

这个方法通常被网络层在向驱动的接收循环队列获取新的数据包时刻调用,而驱动的接收循环队列中可以向网络层交付的包数量则在 dev->quota 字段中表示,我们来看 8139cp 中 POLL 的原型:

static int cp_rx_poll (struct net_device *dev, int *budget)

参数 budget 的上层任务所需要底层传递的数据包的数量,这个数值不能超过netdev_max_backlog 的值。

总而言之,POLL 方法被网络层调用,只负责按照网络层的要求值("预算"值)提交对应数量的数据包。8139CP 的 POLL 方法注册通常在设备驱动程序模块初始化(调用 probe)的时候进行,如下:

static int cp_init_one (struct pci_dev *pdev, const struct pci_device_id *ent)
{
… …
dev->poll = cp_rx_poll;
… …
}

设备的 POLL 方法正如前所说的是被网络层上的软中断 net_rx_action 调用,我们现在来看具体的流程:

static int cp_rx_poll (struct net_device *dev, int *budget)
{
struct cp_private *cp = netdev_priv(dev);
unsigned rx_tail = cp->rx_tail;
/*设定每次进行调度的时候从设备发送到网络层次最大的数据包的大小*/
unsigned rx_work = dev->quota;
unsigned rx;
rx_status_loop:
rx = 0;
/*重新打开NIC中断,在 cp_interrupt 中断句柄中中断关闭了,现在 POLl 已经开始处理环行缓冲队列中的数据,
所以中断可以打开,准备接收新的数据包*/
cpw16(IntrStatus, cp_rx_intr_mask);
while (1) {/*POLL循环的开始*/
u32 status, len;
dma_addr_t mapping;
struct sk_buff *skb, *new_skb;
struct cp_desc *desc;
unsigned buflen;
/*从下标为rx_tail的内存中的环行缓冲队列接收队列rx_skb上"摘下"套接字缓冲区*/
skb = cp->rx_skb[rx_tail].skb;
if (!skb)
BUG();
desc = &cp->rx_ring[rx_tail];
/*检查在 NIC 的环形队列(rx_ring)上的最后的数据接收状态,是否有出现接收或者 FIFO 的错误,是否*/
status = le32_to_cpu(desc->opts1);
if (status & DescOwn)
break;
len = (status & 0x1fff) - 4;
mapping = cp->rx_skb[rx_tail].mapping;
if ((status & (FirstFrag | LastFrag)) != (FirstFrag | LastFrag)) {
/* we don't support incoming fragmented frames.
* instead, we attempt to ensure that the
* pre-allocated RX skbs are properly sized such
* that RX fragments are never encountered
*/
cp_rx_err_acct(cp, rx_tail, status, len);
cp->net_stats.rx_dropped++;
cp->cp_stats.rx_frags++;
goto rx_next;
}
if (status & (RxError | RxErrFIFO)) {
cp_rx_err_acct(cp, rx_tail, status, len);
goto rx_next;
}
if (netif_msg_rx_status(cp))
printk(KERN_DEBUG "%s: rx slot %d status 0x%x len %d\n",
cp->dev->name, rx_tail, status, len);
buflen = cp->rx_buf_sz + RX_OFFSET;
/*创建新的套接字缓冲区*/
new_skb = dev_alloc_skb (buflen);
if (!new_skb) {
cp->net_stats.rx_dropped++;
goto rx_next;
}
skb_reserve(new_skb, RX_OFFSET);
new_skb->dev = cp->dev;
/*解除原先映射的环行队列上的映射区域*/
pci_unmap_single(cp->pdev, mapping,
buflen, PCI_DMA_FROMDEVICE);
/*检查套接字缓冲区(sk_buff)上得到的数据校验和是否正确*/
/* Handle checksum offloading for incoming packets. */
if (cp_rx_csum_ok(status))
skb->ip_summed = CHECKSUM_UNNECESSARY;
else
skb->ip_summed = CHECKSUM_NONE;
/*按照数据的实际大小重新定义套接字缓冲区的大小*/
skb_put(skb, len);
mapping =
cp->rx_skb[rx_tail].mapping =
/*DMA影射在前面新创建的套接字缓冲区虚拟地址new_buf->tail到实际的物理地址上,
并且把这个物理地址挂在接收缓冲区的队列中*/
pci_map_single(cp->pdev, new_skb->tail,
buflen, PCI_DMA_FROMDEVICE);
/*把新建立的缓冲区的虚拟地址挂在接收缓冲区的队列中,在下一次访问rx_skb数组的这个结构时候,
POLL方法会从这个虚拟地址读出接收到的数据包*/
cp->rx_skb[rx_tail].skb = new_skb;
/*在cp_rx_skb调用netif_rx_skb,填充接收数据包队列,等待网络层在Bottom half队列中调用ip_rcv接收网络数据,
这个函数替代了以前使用的netif_rx*/
cp_rx_skb(cp, skb, desc);
rx++;
rx_next:
/*把前面映射的物理地址挂在NIC设备的环行队列上(也就是rx_ring上,它是在和NIC中物理存储区进行了DMA映射的,
而不是驱动在内存中动态建立的),准备提交给下层(NIC)进行数据传输*/
cp->rx_ring[rx_tail].opts2 = 0;
cp->rx_ring[rx_tail].addr = cpu_to_le64(mapping);
/*在相应的传输寄存器中写入控制字,把rx_ring的控制权从驱动程序交还给NIC硬件*/
if (rx_tail == (CP_RX_RING_SIZE - 1))
desc->opts1 = cpu_to_le32(DescOwn | RingEnd |
cp->rx_buf_sz);
else
desc->opts1 = cpu_to_le32(DescOwn | cp->rx_buf_sz);
/*步进到下一个接收缓冲队列的下一个单元*/
rx_tail = NEXT_RX(rx_tail);
if (!rx_work--)
break;
}
cp->rx_tail = rx_tail;
/*递减配额值quota,一旦quota递减到0表示这次的POLL传输已经完成了使命,
就等待有数据到来的时候再次唤醒软中断执行POLL方法*/
dev->quota -= rx;
*budget -= rx;
/* if we did not reach work limit, then we're done with
* this round of polling
*/
if (rx_work) {
/*如果仍然有数据达到,那么返回POLL方法循环的开始,继续接收数据*/
if (cpr16(IntrStatus) & cp_rx_intr_mask)
goto rx_status_loop;
/*这里表示数据已经接收完毕,而且没有新的接收中断产生了,这个时候使能NIC的接收中断,
并且调用__netif_rx_complete把已经完成POLL的设备从poll_list上摘除,等待下一次中断产生的时候,
再次把设备挂上poll_list队列中。*/
local_irq_disable();
cpw16_f(IntrMask, cp_intr_mask);
__netif_rx_complete(dev);
local_irq_enable();
return 0; /* done */
}
return 1; /* not done */
}


其他的使用 NAPI 的驱动程序和 8139CP 大同小异,只是使用了网络层专门提供的 POLL 方法--proecess_backlog(/net/dev.c),在 NIC 中断接收到了数据包后,调用网络层上的 netif_rx(/net/dev.c)将硬件中断中接收到数据帧存入 sk_buff 结构, 然后检查硬件帧头,识别帧类型, 放入接收队列(softnet_data 结构中的 input_pkt_queue 队列上), 激活接收软中断作进一步处理. 软中断函数(net_rx_action)提取接收包,而 process_backlog(也就是 POLL 方法)向上层提交数据。





回页首


能让接收速度更快一些吗?

我们现在来思考一下如何提高 NAPI 效率的问题,在说到效率这个问题之前我们先看一下在linux 的文档中 NAPI_HOWTO.txt 中提供一个模型用来构造自己 NIC 的 POLL 方法,不过和 8139 有一些不一样,其中 NIC 设备描述中有一个 dirty_rx 字段是在 8139CP 中没有使用到的。

dirty_rx 就是已经开辟了 sk_buff 缓冲区指针和已经提交到 NIC 的 rx_ring 参与接收的缓冲,但是还没有完成传输的缓冲区和已经完成传输的缓冲区的数量总和,与之相类似的是 cur_rx 这个表示的是下一个参与传输的缓冲区指针,我们在 NAPI_HOWTO.txt 的举例中可以看到这个字段的一些具体使用方式:


 
 /*cur_rx为下一个需要参与传输的缓冲区指针,
如果cur_rx指针大于dirty_rx那么表示已经有在rx-ring中开辟的rx-ring中的每个传输缓冲已经被耗尽了,
这个时候需要调用refill_rx_ring 把一些已经向网络层提交了数据的rx-ring接收单元开辟新的缓冲区,
增加dirty_rx的数值,为下一次数据接收做准备,*/
if (tp->cur_rx - tp->dirty_rx > RX_RING_SIZE/2 ||
tp->rx_buffers[tp->dirty_rx % RX_RING_SIZE].skb == NULL)
refill_rx_ring(dev);
/*如果已经当前的cur_rx和dirty_rx之间相差不超过总的rx_ring接收单元的一半,
而且剩下的一半中间有空的传输单元,那么我们不必担心了,因为还有足够的缓冲区可以使用(凭经验推断的),
就可以退出当前的程序,等待下一次软中断调用POLL来处理在这之间收到的数据,
(NAPI_HOWTO.txt中是重新启动时钟计数,这样做是在没有使能NIC中断处理的情况下)*/
if (tp->rx_buffers[tp->dirty_rx % RX_RING_SIZE].skb == NULL)
restart_timer();
/*如果执行到这里了,那表示有几种情况可能发生,第一当前的cur_rx和dirty_rx之间相差不超过总的rx_ring接收单元的一半,
调用refill_rx_ring后dirty_rx并未增加,(也许在rx-ring中大量的单元收到数据没有得到网络层函数的处理),
结果dirty_rx没有增加,而且也没有空闲的单元来接收新到的数据,这样就要重新调用netif_rx_schedule 来唤醒软中断,
调用设备的POLL方法,采集在rx-ring的数据。*/
else netif_rx_schedule(dev); /* we are back on the poll list */

在 RTL-8169 的驱动程序中就使用了 dirty_rx 这个字段,但是在 8139CP 中并未使用,其实这个并非 8139CP 驱动不成熟的表现,大家阅读 NAPI_HOWTO.txt 中可以知道,现在 8139CP 中并未严格按照 NAPI 所提出的要求去做,如果大家有兴趣的话,可以比较一下 8139CP 和 RTL-8169 这两个驱动之间的不同,大家会发现虽然两者都没有在 NIC 中断处理中去完成数据从驱动层向网络层上的转发,而都放在了软中断之中完成,但是在 8139 中利用了自己的一些独特的硬件特性,使 NIC 在利用关断中断接收数据的同时利用数据包到达位(RxOK)通知到达事件,然后采用 POLL 方法把数据从 NIC 直接转发到上层;而 RTL8169 还需要借助 softnet_data 结构中的 input_pkt_queue(套接字缓冲(sk_buff)输入队列)来完成从 NIC 中断到软中断之间的 sk_buff 数据调度;这样对于 8139CP 来说最大的好处就是不要了 dirty_rx 字段和 cur_rx 字段来让 POLL 方法以及 NIC 中断知道当前的传输单元的状况,还包括不需要不时定期的调用 refill_rx_ring 来刷新 rx-ring 获得空闲的传输单元;说到坏处我想就是重写了一个 POLL 方法,完全不能借用 /net/core/dev.c 中的 process_backlog 来作为自己的POLL方法,不过这个代价值得。

说了这么多,似乎都和提高效率没有关系,其实正好相反,通过这些了解我们对 softnet_data中的一些字段的意思应该更加清晰了,下面所叙述的,提高效率的方法就是在 8139CP 的基础上借用了 NAPI_ HOWTO.txt 中的一些方法,从实际上的使用效果来看,在某些应用场合之下比 Linux的 8139CP 的确是有了一定的提高,我们首先看看在 Linux2.6.6 的内核使用 8139CP 在x86(PIII-900Mhz)平台上的数据包接收处理情况:比较表如下:

Psize    Ipps       Tput     Rxint            Done
----------------------------------------------------
60 490000 254560 21 10
128 358750 259946 27 11
256 334454 450034 34 18
512 234550 556670 201239 193455
1024 119061 995645 884526 882300
1440 74568 995645 995645 987154


上表中表示:

"Pszie"表示包的大小 
"Ipps" 每秒钟系统可以接收的包数量 
"Tput" 每次POLL超过 1M 个数据包的总量 
"Rxint" 接收中断数量 
"Done" 载入 rx-ring 内数据所需要的 POLL 次数,这个数值也表示了我们需要清除 rx-ring 的次数。

从上表可以看出,8139CP 中当接收速率达到 490K packets/s 的时候仅仅只有 21 个中断产生,只需要 10 次 POLL 就可以完成数据从 rx_ring 的接收,然而对于大数据包低速率的情况,接收中断就会急剧增加,直到最后每个数据包都需要一次 POLL 的方法来进行处理,最后的结果就是每个中断都需要一次 POLL 的方法,最后造成效率的急剧下降,以至于系统的效率就会大大降低,所以 NAPI 适用于大量的数据包而且尽可能是小的数据包,但是对于大的数据包,而且低速率的,反而会造成系统速度的下降。

如果要改善这种情况,我们可以考虑采用以下的方法,我们在 MIPS,Xsacle 和 SA1100 平台上进行一系列的测试取得了良好的效果:

1. 完全取消 NIC 中断,使用 RXOK 位置控制接收中断,

2. 采用定时器中断 timer_list 的控制句柄,根据硬件平台设定一个合适的间隔周期(间隔周期依据平台不同而异),对 rx-ring 直接进行 POLL 轮询,我们在 MIPS 和 Xscale 上直接使用了中断向量 0--irq0 作为对 rx-ring 进行轮询的 top-half(注意我们在上述两个平台上选择的 HZ 数值是 1000,而通常这个数值是 100,并且重新编写了 Wall-time 的记数程序,让 Wall-Time 的记数还是以 10MS 为间隔),当然也可以根据自己的平台和应用程序的状况选择合适的定时时间。

3. 借助 softnet_data 中的 input_pkt_queue 队列,在时钟中断 bottom-half 中完成 POLL 方法之后,并不直接把数据传输到网络层进行处理,而是把 sk_buff 挂在 input_pkt_queue队列上,唤醒软中断在过后处理,当然可以想象,这样需要付出一定的内存代价,而且实时性能也要差一些。

4. 使用 dirty_rx 字段和 refill_rx_ring 函数,在调用完 POLL 方法以后,而且网络层程序比较空闲的时候为一些 rx-ring 中的单元建立新缓冲挂在环形缓冲队列上,这样可以在新的数据包达到的时候节省时间,操作系统不必要手忙脚乱地开辟新的空间去应付新来的数据。

5. 最后请注意:我们上层的应用程序是以网络数据转发为主的,并没有在应用层面上有很多后台进程的复杂的应用,上述的 1 到 4 点中所做的显而易见是以牺牲系统效率整体效率而单独改善了网络数据的处理。

我们再来看改善的 8139CP 驱动程序使用 8139CP 在 x86(PIII-900Mhz) 平台上的接收情况:

Psize    Ipps       Tput     Rxint       Done
----------------------------------------------------
60 553500 354560 17 7
128 453000 350400 19 10
256 390050 324500 28 13
512 305600 456670 203 455
1024 123440 340020 772951 123005
1440 64568 344567 822394 130000

从上图来看,数据传输的效率和波动性有很明显的改善,在高速率和低速率的时候所需要的POLL 次数的差距以前的 8139CP 驱动程序那么显著了,这样的而且最大的包接收能力也提高到了 553K/s,我们在 MIPS 系列以及 Xscale 系列平台上最大的包接收能力可以提高大约 15%-25%。

最后使用 NAPI 并不是改善网络效率的唯一途径,只能算是权益之策,根本的解决途径还是在于上层应用程序能够独占网络设备,或者是提供大量的缓冲资源,如果这样,根据我们的实验数据表明可以提高 100%-150% 以上的接收效率。

Linux内核NAPI机制分析

简介:
NAPI 是 Linux 上采用的一种提高网络处理效率的技术,它的核心概念就是不采用中断的方式读取数据,而代之以首先采用中断唤醒数据接收的服务程序然后 POLL 的方法来轮询数据。随着网络的接收速度的增加,NIC 触发的中断能做到不断减少,目前 NAPI 技术已经在网卡驱动层和网络层得到了广泛的应用,驱动层次上已经有 E1000 系列网卡,RTL8139 系列网卡,3c50X 系列等主流的网络适配器都采用了这个技术,而在网络层次上,NAPI 技术已经完全被应用到了著名的 netif_rx 函数中间,并且提供了专门的 POLL 方法--process_backlog 来处理轮询的方法;根据实验数据表明采用NAPI技术可以大大改善短长度数据包接收的效率,减少中断触发的时间。

但是 NAPI 存在一些比较严重的缺陷:
1. 对于上层的应用程序而言,系统不能在每个数据包接收到的时候都可以及时地去处理它,而且随着传输速度增加,累计的数据包将会耗费大量的内存,经过实验表明在 Linux 平台上这个问题会比在 FreeBSD 上要严重一些;
2. 另外一个问题是对于大的数据包处理比较困难,原因是大的数据包传送到网络层上的时候耗费的时间比短数据包长很多(即使是采用 DMA 方式),所以正如前面所说的那样,NAPI 技术适用于对高速率的短长度数据包的处理

使用 NAPI 先决条件:
驱动可以继续使用老的 2.4 内核的网络驱动程序接口,NAPI 的加入并不会导致向前兼容性的丧失,但是 NAPI 的使用至少要得到下面的保证:
1. 要使用 DMA 的环形输入队列(也就是 ring_dma,这个在 2.4 驱动中关于 Ethernet 的部分有详细的介绍),或者是有足够的内存空间缓存驱动获得的包。
2. 在发送/接收数据包产生中断的时候有能力关断 NIC 中断的事件处理,并且在关断 NIC 以后,并不影响数据包接收到网络设备的环形缓冲区(以下简称 rx-ring)处理队列中。

NAPI 对数据包到达的事件的处理采用轮询方法,在数据包到达的时候,NAPI 就会强制执行dev->poll 方法。而不像以前的驱动那样为了减少包到达时间的处理延迟,通常采用中断的方法来进行。
 

E1000网卡驱动程序对NAPI的支持:
上面已经介绍过了,使用NAPI需要在编译内核的时候选择打开相应网卡设备的NAPI支持选项,对于E1000网卡来说就是CONFIG_E1000_NAPI宏。
E1000网卡的初始化函数,也就是通常所说的probe方法,定义为e1000_probe():
static int __devinit e1000_probe(struct pci_dev *pdev, const struct pci_device_id *ent)
{
struct net_device *netdev;
struct e1000_adapter *adapter;
static int cards_found = 0;
unsigned long mmio_start;
int mmio_len;
int pci_using_dac;
int i;
int err;
uint16_t eeprom_data;

if((err = pci_enable_device(pdev)))
return err;
/*
在这里设置PCI设备的DMA掩码,如果这个设备支持DMA传输,则掩码置位。
*/
if(!(err = pci_set_dma_mask(pdev, PCI_DMA_64BIT))) {
pci_using_dac = 1;
} else {
if((err = pci_set_dma_mask(pdev, PCI_DMA_32BIT))) {
E1000_ERR("No usable DMA configuration, aborting\n");
return err;
}
pci_using_dac = 0;
}

if((err = pci_request_regions(pdev, e1000_driver_name)))
return err;

pci_set_master(pdev);
/*
为e1000网卡对应的net_device结构分配内存。
*/
netdev = alloc_etherdev(sizeof(struct e1000_adapter));
if(!netdev) {
err = -ENOMEM;
goto err_alloc_etherdev;
}

SET_MODULE_OWNER(netdev);

pci_set_drvdata(pdev, netdev);
adapter = netdev->priv;
adapter->netdev = netdev;
adapter->pdev = pdev;
adapter->hw.back = adapter;

mmio_start = pci_resource_start(pdev, BAR_0);
mmio_len = pci_resource_len(pdev, BAR_0);

adapter->hw.hw_addr = ioremap(mmio_start, mmio_len);
if(!adapter->hw.hw_addr) {
err = -EIO;
goto err_ioremap;
}

for(i = BAR_1; i <= BAR_5; i++) {
if(pci_resource_len(pdev, i) == 0)
continue;
if(pci_resource_flags(pdev, i) & IORESOURCE_IO) {
adapter->hw.io_base = pci_resource_start(pdev, i);
break;
}
}
/*
将e1000网卡驱动程序的相应函数注册到net_device结构的成员函数上。这里值得注意的是如果定义了设备的CONFIG_E1000_NAPI宏,则设备对应的poll方法被注册为e1000_clean
在网络设备初始化时(net_dev_init()函数)将所有的设备的poll方法注册为系统默认函数process_backlog(),该函数的处理方法就是从CPU相关队列softnet_data的输入数据包队列中读取skb,然后调用netif_receive_skb()函数提交给上层协议继续处理。设备的poll方法是在软中断处理函数中调用的
*/
netdev->open = &e1000_open;
netdev->stop = &e1000_close;
netdev->hard_start_xmit = &e1000_xmit_frame;
netdev->get_stats = &e1000_get_stats;
netdev->set_multicast_list = &e1000_set_multi;
netdev->set_mac_address = &e1000_set_mac;
netdev->change_mtu = &e1000_change_mtu;
netdev->do_ioctl = &e1000_ioctl;
netdev->tx_timeout = &e1000_tx_timeout;
netdev->watchdog_timeo = 5 * HZ;
#ifdef CONFIG_E1000_NAPI
netdev->poll = &e1000_clean;
netdev->weight = 64;
#endif
netdev->vlan_rx_register = e1000_vlan_rx_register;
netdev->vlan_rx_add_vid = e1000_vlan_rx_add_vid;
netdev->vlan_rx_kill_vid = e1000_vlan_rx_kill_vid;
/*
这些就是利用ifconfig能够看到的内存起始地址,以及基地址。
*/
netdev->irq = pdev->irq;
netdev->mem_start = mmio_start;
netdev->mem_end = mmio_start + mmio_len;
netdev->base_addr = adapter->hw.io_base;

adapter->bd_number = cards_found;

if(pci_using_dac)
netdev->features |= NETIF_F_HIGHDMA;

/* MAC地址是存放在网卡设备的EEPROM上的,现在将其拷贝出来。 */
e1000_read_mac_addr(&adapter->hw);
memcpy(netdev->dev_addr, adapter->hw.mac_addr, netdev->addr_len);
if(!is_valid_ether_addr(netdev->dev_addr)) {
err = -EIO;
goto err_eeprom;
}
/*
这里初始化三个定时器列表,以后对内核Timer的实现进行分析,这里就不介绍了。
*/
init_timer(&adapter->tx_fifo_stall_timer);
adapter->tx_fifo_stall_timer.function = &e1000_82547_tx_fifo_stall;
adapter->tx_fifo_stall_timer.data = (unsigned long) adapter;

init_timer(&adapter->watchdog_timer);
adapter->watchdog_timer.function = &e1000_watchdog;
adapter->watchdog_timer.data = (unsigned long) adapter;

init_timer(&adapter->phy_info_timer);
adapter->phy_info_timer.function = &e1000_update_phy_info;
adapter->phy_info_timer.data = (unsigned long) adapter;

INIT_TQUEUE(&adapter->tx_timeout_task,
(void (*)(void *))e1000_tx_timeout_task, netdev);
/*
这里调用网络设备注册函数将当前网络设备注册到系统的dev_base[]设备数组当中,并且调用设备的probe函数,对于以太网来说,就是ethif_probe()函数。相关的说明见内核网络设备操作部分的分析。
调用关系:register_netdev ()->register_netdevice()
*/
register_netdev(netdev);

netif_carrier_off(netdev);
netif_stop_queue(netdev);

e1000_check_options(adapter);
}

在分析网卡接收数据包的过程中,设备的open方法是值得注意的,因为在这里对网卡设备的各种数据结构进行了初始化,特别是环形缓冲区队列。E1000网卡驱动程序的open方法注册为e1000_open():
static int e1000_open(struct net_device *netdev)
{
struct e1000_adapter *adapter = netdev->priv;
int err;

/* allocate transmit descriptors */

if((err = e1000_setup_tx_resources(adapter)))
goto err_setup_tx;

/* allocate receive descriptors */

if((err = e1000_setup_rx_resources(adapter)))
goto err_setup_rx;

if((err = e1000_up(adapter)))
goto err_up;
}
事实上e1000_open()函数调用了e1000_setup_rx_resources()函数为其环形缓冲区分配资源。e1000设备的接收方式是一种缓冲方式,能显著的降低CPU接收数据造成的花费,接收数据之前,软件需要预先分配一个 DMA 缓冲区,一般对于传输而言,缓冲区最大为 8Kbyte 并且把物理地址链接在描述符的 DMA 地址描述单元,另外还有两个双字的单元表示对应的 DMA 缓冲区的接收状态。
在 /driver/net/e1000/e1000/e1000.h 中对于环形缓冲队列描述符的数据单元如下表示:
struct e1000_desc_ring {
void *desc; /* 指向描述符环状缓冲区的指针。*/
dma_addr_t dma; /* 描述符环状缓冲区物理地址,也就是DMA缓冲区地址*/
unsigned int size; /* 描述符环状缓冲区的长度(用字节表示)*/
unsigned int count; /* 缓冲区内描述符的数量,这个是系统初始化时规定好的,它决定该环形缓冲区有多少描述符(或者说缓冲区)可用*/
unsigned int next_to_use; /* 下一个要使用的描述符。*/
unsigned int next_to_clean; /* 下一个待删除描述符。*/
struct e1000_buffer *buffer_info; /* 缓冲区信息结构数组。*/
};

static int e1000_setup_rx_resources(struct e1000_adapter *adapter)
{
/*将环形缓冲区取下来*/
struct e1000_desc_ring *rxdr = &adapter->rx_ring;
struct pci_dev *pdev = adapter->pdev;
int size;

size = sizeof(struct e1000_buffer) * rxdr->count;
/*
为每一个描述符缓冲区分配内存,缓冲区的数量由count决定。
*/
rxdr->buffer_info = kmalloc(size, GFP_KERNEL);
if(!rxdr->buffer_info) {
return -ENOMEM;
}
memset(rxdr->buffer_info, 0, size);

/* Round up to nearest 4K */

rxdr->size = rxdr->count * sizeof(struct e1000_rx_desc);
E1000_ROUNDUP(rxdr->size, 4096);
/*
调用pci_alloc_consistent()函数为系统分配DMA缓冲区。
*/
rxdr->desc = pci_alloc_consistent(pdev, rxdr->size, &rxdr->dma);

if(!rxdr->desc) {
kfree(rxdr->buffer_info);
return -ENOMEM;
}
memset(rxdr->desc, 0, rxdr->size);

rxdr->next_to_clean = 0;
rxdr->next_to_use = 0;

return 0;
}
在e1000_up()函数中,调用request_irq()向系统申请irq中断号,然后将e1000_intr()中断处理函数注册到系统当中,系统有一个中断向量表irq_desc[](?)。然后使能网卡的中断。
接下来就是网卡处于响应中断的模式,这里重要的函数是 e1000_intr()中断处理函数,关于这个函数的说明在内核网络设备操作笔记当中,这里就不重复了,但是重点强调的是中断处理函数中对NAPI部分的处理方法,因此还是将该函数的源码列出,不过省略了与NAPI无关的处理过程:
static irqreturn_t e1000_intr(int irq, void *data, struct pt_regs *regs)
{
struct net_device *netdev = data;
struct e1000_adapter *adapter = netdev->priv;
uint32_t icr = E1000_READ_REG(&adapter->hw, ICR);
#ifndef CONFIG_E1000_NAPI
unsigned int i;
#endif

if(!icr)
return IRQ_NONE; /* Not our interrupt */

#ifdef CONFIG_E1000_NAPI
/*
如果定义了采用NAPI模式接收数据包,则进入这个调用点。
首先调用netif_rx_schedule_prep(dev),确定设备处于运行,而且设备还没有被添加到网络层的 POLL 处理队列中,在调用 netif_rx_schedule之前会调用这个函数。
接下来调用 __netif_rx_schedule(dev),将设备的 POLL 方法添加到网络层次的 POLL 处理队列中去,排队并且准备接收数据包,在使用之前需要调用 netif_rx_reschedule_prep,并且返回的数为 1,并且触发一个 NET_RX_SOFTIRQ 的软中断通知网络层接收数据包。
处理完成。
*/
if(netif_rx_schedule_prep(netdev)) {

/* Disable interrupts and register for poll. The flush
of the posted write is intentionally left out.
*/

atomic_inc(&adapter->irq_sem);
E1000_WRITE_REG(&adapter->hw, IMC, ~0);
__netif_rx_schedule(netdev);
}
#else
/*
在中断模式下,就会调用net_if()函数将数据包插入接收队列中,等待软中断处理。
*/
for(i = 0; i < E1000_MAX_INTR; i++)
if(!e1000_clean_rx_irq(adapter) &
!e1000_clean_tx_irq(adapter))
break;
#endif

return IRQ_HANDLED;
}
下面介绍一下__netif_rx_schedule(netdev)函数的作用:
static inline void __netif_rx_schedule(struct net_device *dev)
{
unsigned long flags;
/* 获取当前CPU。 */
int cpu = smp_processor_id();

local_irq_save(flags);
dev_hold(dev);
/*将当前设备加入CPU相关全局队列softnet_data的轮询设备列表中,不过值得注意的是,这个列表中的设备不一定都执行轮询接收数据包,这里的poll_list只是表示当前设备需要接收数据,具体采用中断还是轮询的方式,取决于设备提供的poll方法。*/
list_add_tail(&dev->poll_list, &softnet_data[cpu].poll_list);
if (dev->quota < 0)
/*对于e1000网卡的轮询机制,weight(是权,负担的意思)这个参数是64。而quota的意思是配额,限额。这两个参数在随后的轮询代码中出现频繁。*/
dev->quota += dev->weight;
else
dev->quota = dev->weight;
/*
调用函数产生网络接收软中断。也就是系统将运行net_rx_action()处理网络数据。
*/
__cpu_raise_softirq(cpu, NET_RX_SOFTIRQ);
local_irq_restore(flags);
}
在内核网络设备操作阅读笔记当中已经介绍过net_rx_action()这个重要的网络接收软中断处理函数了,不过这里为了清楚的分析轮询机制,需要再次分析这段代码:
static void net_rx_action(struct softirq_action *h)
{
int this_cpu = smp_processor_id();
/*获取当前CPU的接收数据队列。*/
struct softnet_data *queue = &softnet_data[this_cpu];
unsigned long start_time = jiffies;
/*呵呵,这里先做个预算,限定我们只能处理这么多数据(300个)。*/
int budget = netdev_max_backlog;

br_read_lock(BR_NETPROTO_LOCK);
local_irq_disable();
/*
进入一个循环,因为软中断处理函数与硬件中断并不是同步的,因此,我们此时并不知道数据包属于哪个设备,因此只能采取逐个查询的方式,遍历整个接收设备列表。
*/
while (!list_empty(&queue->poll_list)) {
struct net_device *dev;
/*如果花费超过预算,或者处理时间超过1秒,立刻从软中断处理函数跳出,我想这可能是系统考虑效率和实时性,一次不能做过多的工作或者浪费过多的时间。*/
if (budget <= 0 || jiffies - start_time > 1)
goto softnet_break;

local_irq_enable();
/*从当前列表中取出一个接收设备。并根据其配额判断是否能够继续接收数据,如果配额不足(<=0),则立刻将该设备从设备列表中删除。并且再次插入队列当中,同时为该设备分配一定的配额,允许它继续处理数据包。
如果此时配额足够,则调用设备的 poll方法,对于e1000网卡来说,如果采用中断方式处理数据,则调用系统默认poll方法process_backlog(),而对于采用NAPI 来说,则是调用e1000_clean()函数了。记住这里第一次传递的预算是300 ^_^。*/
dev = list_entry(queue->poll_list.next, struct net_device, poll_list);

if (dev->quota <= 0 || dev->poll(dev, &budget)) {
local_irq_disable();
list_del(&dev->poll_list);
list_add_tail(&dev->poll_list, &queue->poll_list);
if (dev->quota < 0)
dev->quota += dev->weight;
else
dev->quota = dev->weight;
} else {
dev_put(dev);
local_irq_disable();
}
}

local_irq_enable();
br_read_unlock(BR_NETPROTO_LOCK);
return;

softnet_break:
netdev_rx_stat[this_cpu].time_squeeze++;
/*再次产生软中断,准备下一次数据包处理。*/
__cpu_raise_softirq(this_cpu, NET_RX_SOFTIRQ);

local_irq_enable();
br_read_unlock(BR_NETPROTO_LOCK);
}

 

 

下面介绍一下e1000网卡的轮询poll处理函数e1000_clean(),这个函数只有定义了NAPI宏的情况下才有效:
#ifdef CONFIG_E1000_NAPI
static int e1000_clean(struct net_device *netdev, int *budget)
{
struct e1000_adapter *adapter = netdev->priv;
/*计算一下我们要做的工作量,取系统给定预算(300)和我们网卡设备的配额之间的最小值,这样做同样是为了效率和实时性考虑,不能让一个设备在接收设备上占用太多的资源和时间。*/
int work_to_do = min(*budget, netdev->quota);
int work_done = 0;
/*处理网卡向外发送的数据,这里我们暂时不讨论。*/
e1000_clean_tx_irq(adapter);
/*处理网卡中断收到的数据包,下面详细讨论这个函数的处理方法。*/
e1000_clean_rx_irq(adapter, &work_done, work_to_do);
/*从预算中减掉我们已经完成的任务,预算在被我们支出,^_^。同时设备的配额也不断的削减。*/
*budget -= work_done;
netdev->quota -= work_done;
/*如果函数返回时,完成的工作没有达到预期的数量,表明接收的数据包并不多,很快就全部处理完成了,我们就彻底完成了这次轮询任务,调用 netif_rx_complete(),把当前指定的设备从 POLL 队列中清除(注意如果在 POLL 队列处于工作状态的时候是不能把指定设备清除的,否则将会出错),然后使能网卡中断。*/
if(work_done < work_to_do) {
netif_rx_complete(netdev);
e1000_irq_enable(adapter);
}
/*如果完成的工作大于预期要完成的工作,则表明存在问题,返回1,否则正常返回0。*/
return (work_done >= work_to_do);
}

设备轮询接收机制中最重要的函数就是下面这个函数,当然它同时也可以为中断接收机制所用,只不过处理过程有一定的差别。
static boolean_t
#ifdef CONFIG_E1000_NAPI
e1000_clean_rx_irq(struct e1000_adapter *adapter, int *work_done,
int work_to_do)
#else
e1000_clean_rx_irq(struct e1000_adapter *adapter)
#endif
{
/*这里很清楚,获取设备的环形缓冲区指针。*/
struct e1000_desc_ring *rx_ring = &adapter->rx_ring;
struct net_device *netdev = adapter->netdev;
struct pci_dev *pdev = adapter->pdev;
struct e1000_rx_desc *rx_desc;
struct e1000_buffer *buffer_info;
struct sk_buff *skb;
unsigned long flags;
uint32_t length;
uint8_t last_byte;
unsigned int i;
boolean_t cleaned = FALSE;
/*把i置为下一个要清除的描述符索引,因为在环形缓冲区队列当中,我们即使已经处理完一个缓冲区描述符,也不是将其删除,而是标记为已经处理,这样如果有新的数据需要使用缓冲区,只是将已经处理的缓冲区覆盖而已。*/
i = rx_ring->next_to_clean;
rx_desc = E1000_RX_DESC(*rx_ring, i);
/*如果i对应的描述符状态是已经删除,则将这个缓冲区取出来给新的数据使用*/
while(rx_desc->status & E1000_RXD_STAT_DD) {
buffer_info = &rx_ring->buffer_info[i];

#ifdef CONFIG_E1000_NAPI
/*在配置了NAPI的情况下,判断是否已经完成的工作?,因为是轮询机制,所以我们必须自己计算我们已经处理了多少数据。*/
if(*work_done >= work_to_do)
break;

(*work_done)++;
#endif

cleaned = TRUE;
/*这个是DMA函数,目的是解除与DMA缓冲区的映射关系,这样我们就可以访问这个缓冲区,获取通过DMA传输过来的数据包(skb)。驱动程序在分配环形缓冲区的时候就将缓冲区与DMA进行了映射。*/
pci_unmap_single(pdev,
buffer_info->dma,
buffer_info->length,
PCI_DMA_FROMDEVICE);

skb = buffer_info->skb;
length = le16_to_cpu(rx_desc->length);
/*对接收的数据包检查一下正确性。确认是一个正确的数据包以后,将skb的数据指针进行偏移。*/
skb_put(skb, length - ETHERNET_FCS_SIZE);

/* Receive Checksum Offload */
e1000_rx_checksum(adapter, rx_desc, skb);
/*获取skb的上层协议类型。这里指的是IP层的协议类型。*/
skb->protocol = eth_type_trans(skb, netdev);
#ifdef CONFIG_E1000_NAPI
/*调用函数直接将skb向上层协议处理函数递交,而不是插入什么队列等待继续处理,因此这里可能存在一个问题,如果数据包比较大,处理时间相对较长,则可能造成系统效率的下降。*/
netif_receive_skb(skb);

#else /* CONFIG_E1000_NAPI */
/*如果采用中断模式,则调用netif_rx()将数据包插入队列中,在随后的软中断处理函数中调用netif_receive_skb(skb)向上层协议处理函数递交。这里就体现出了中断处理机制和轮询机制之间的差别。*/
netif_rx(skb);
#endif /* CONFIG_E1000_NAPI */
/*用全局时间变量修正当前设备的最后数据包接收时间。*/
netdev->last_rx = jiffies;

rx_desc->status = 0;
buffer_info->skb = NULL;
/*这里是处理环形缓冲区达到队列末尾的情况,因为是环形的,所以到达末尾的下一个就是队列头,这样整个环形队列就不断的循环处理。然后获取下一个描述符的状态,看看是不是处于删除状态。如果处于这种状态就会将新到达的数据覆盖旧的的缓冲区,如果不处于这种状态跳出循环。并且将当前缓冲区索引号置为下一次查询的目标。*/
if(++i == rx_ring->count) i = 0;

rx_desc = E1000_RX_DESC(*rx_ring, i);
}

rx_ring->next_to_clean = i;
/*为下一次接收skb做好准备,分配sk_buff内存。出于效率的考虑,如果下一个要使用的缓冲区的sk_buff还没有分配,就分配,如果已经分配,则可以重用。*/
e1000_alloc_rx_buffers(adapter);

return cleaned;
}

下面分析的这个函数有助于我们了解环形接收缓冲区的结构和工作原理:
static void e1000_alloc_rx_buffers(struct e1000_adapter *adapter)
{
struct e1000_desc_ring *rx_ring = &adapter->rx_ring;
struct net_device *netdev = adapter->netdev;
struct pci_dev *pdev = adapter->pdev;
struct e1000_rx_desc *rx_desc;
struct e1000_buffer *buffer_info;
struct sk_buff *skb;
int reserve_len = 2;
unsigned int i;
/*接收队列中下一个用到的缓冲区索引,初始化是0。并且获取该索引对应的缓冲区信息结构指针buffer_info。*/
i = rx_ring->next_to_use;
buffer_info = &rx_ring->buffer_info[i];
/*如果该缓冲区还没有为sk_buff分配内存,则调用dev_alloc_skb函数分配内存,默认的e1000网卡的接收缓冲区长度是2048字节加上保留长度。
注意:在e1000_open()->e1000_up()中已经调用了这个函数为环形缓冲区队列中的每一个缓冲区分配了sk_buff内存,但是如果接收到数据以后,调用netif_receive_skb (skb)向上层提交数据以后,这段内存将始终被这个skb占用(直到上层处理完以后才会调用__kfree_skb释放,但已经跟这里没有关系了),换句话说,就是当前缓冲区必须重新申请分配sk_buff内存,为了下一个数据做准备。*/
while(!buffer_info->skb) {
rx_desc = E1000_RX_DESC(*rx_ring, i);

skb = dev_alloc_skb(adapter->rx_buffer_len + reserve_len);

if(!skb) {
/* Better luck next round */
break;
}
skb_reserve(skb, reserve_len);

skb->dev = netdev;
/*映射DMA缓冲区,DMA通道直接将收到的数据写到我们提供的这个缓冲区内,每次必须将缓冲区与DMA通道解除映射关系,才能读取缓冲区内容。*/
buffer_info->skb = skb;
buffer_info->length = adapter->rx_buffer_len;
buffer_info->dma =
pci_map_single(pdev,
skb->data,
adapter->rx_buffer_len,
PCI_DMA_FROMDEVICE);

rx_desc->buffer_addr = cpu_to_le64(buffer_info->dma);

if(++i == rx_ring->count) i = 0;
buffer_info = &rx_ring->buffer_info[i];
}
rx_ring->next_to_use = i;
}
 

网卡驱动和队列层中的数据包接收


一、从网卡说起

这并非是一个网卡驱动分析的专门文档,只是对网卡处理数据包的流程进行一个重点的分析。这里以Intel的e100驱动为例进行分析。
大多数网卡都是一个PCI设备,PCI设备都包含了一个标准的配置寄存器,寄存器中,包含了PCI设备的厂商ID、设备ID等等信息,驱动
程序使用来描述这些寄存器的标识符。如下:

struct pci_device_id {
        __u32 vendor, device;                /* Vendor and device ID or PCI_ANY_ID*/
        __u32 subvendor, subdevice;        /* Subsystem ID's or PCI_ANY_ID */
        __u32 class, class_mask;        /* (class,subclass,prog-if) triplet */
        kernel_ulong_t driver_data;        /* Data private to the driver */
};

这样,在驱动程序中,常常就可以看到定义一个struct pci_device_id 类型的数组,告诉内核支持不同类型的
PCI设备的列表,以e100驱动为例:

#define INTEL_8255X_ETHERNET_DEVICE(device_id, ich) {\
        PCI_VENDOR_ID_INTEL, device_id, PCI_ANY_ID, PCI_ANY_ID, \
        PCI_CLASS_NETWORK_ETHERNET << 8, 0xFFFF00, ich }
        
static struct pci_device_id e100_id_table[] = {
        INTEL_8255X_ETHERNET_DEVICE(0x1029, 0),
        INTEL_8255X_ETHERNET_DEVICE(0x1030, 0),
        INTEL_8255X_ETHERNET_DEVICE(0x1031, 3),
……/*略过一大堆支持的设备*/
        { 0, }
};

在内核中,一个PCI设备,使用struct pci_driver结构来描述,
struct pci_driver {
        struct list_head node;
        char *name;
        struct module *owner;
        const struct pci_device_id *id_table;        /* must be non-NULL for probe to be called */
        int  (*probe)  (struct pci_dev *dev, const struct pci_device_id *id);        /* New device inserted */
        void (*remove) (struct pci_dev *dev);        /* Device removed (NULL if not a hot-plug capable driver) */
        int  (*suspend) (struct pci_dev *dev, pm_message_t state);        /* Device suspended */
        int  (*resume) (struct pci_dev *dev);                        /* Device woken up */
        int  (*enable_wake) (struct pci_dev *dev, pci_power_t state, int enable);   /* Enable wake event */
        void (*shutdown) (struct pci_dev *dev);

        struct device_driver        driver;
        struct pci_dynids dynids;
};

因为在系统引导的时候,PCI设备已经被识别,当内核发现一个已经检测到的设备同驱动注册的id_table中的信息相匹配时,
它就会触发驱动的probe函数,以e100为例:
/*
* 定义一个名为e100_driver的PCI设备
* 1、设备的探测函数为e100_probe;
* 2、设备的id_table表为e100_id_table
*/
static struct pci_driver e100_driver = {
        .name =         DRV_NAME,
        .id_table =     e100_id_table,
        .probe =        e100_probe,
        .remove =       __devexit_p(e100_remove),
#ifdef CONFIG_PM
        .suspend =      e100_suspend,
        .resume =       e100_resume,
#endif

        .driver = {
                .shutdown = e100_shutdown,
        }

};

这样,如果系统检测到有与id_table中对应的设备时,就调用驱动的probe函数。

驱动设备在init函数中,调用pci_module_init函数初始化PCI设备e100_driver:

static int __init e100_init_module(void)
{
        if(((1 << debug) - 1) & NETIF_MSG_DRV) {
                printk(KERN_INFO PFX "%s, %s\n", DRV_DESCRIPTION, DRV_VERSION);
                printk(KERN_INFO PFX "%s\n", DRV_COPYRIGHT);
        }
        return pci_module_init(&e100_driver);
}

一切顺利的话,注册的e100_probe函数将被内核调用,这个函数完成两个重要的工作:
1、分配/初始化/注册网络设备;
2、完成PCI设备的I/O区域的分配和映射,以及完成硬件的其它初始化工作;

网络设备使用struct net_device结构来描述,这个结构非常之大,许多重要的参考书籍对它都有较为深入的描述,可以参考《Linux设备驱动程序》中网卡驱动设计的相关章节。我会在后面的内容中,对其重要的成员进行注释;

当probe函数被调用,证明已经发现了我们所支持的网卡,这样,就可以调用register_netdev函数向内核注册网络设备了,注册之前,一般会调用alloc_etherdev为以太网分析一个net_device,然后初始化它的重要成员。

除了向内核注册网络设备之外,探测函数另一项重要的工作就是需要对硬件进行初始化,比如,要访问其I/O区域,需要为I/O区域分配内存区域,然后进行映射,这一步一般的流程是:
1、request_mem_region()
2、ioremap()

对于一般的PCI设备而言,可以调用:
1、pci_request_regions()
2、ioremap()

pci_request_regions函数对PCI的6个寄存器都会调用资源分配函数进行申请(需要判断是I/O端口还是I/O内存),例如:

int pci_request_regions(struct pci_dev *pdev, char *res_name)
{
        int i;
        
        for (i = 0; i < 6; i++)
                if(pci_request_region(pdev, i, res_name))
                        goto err_out;
        return 0;



int pci_request_region(struct pci_dev *pdev, int bar, char *res_name)
{
        if (pci_resource_len(pdev, bar) == 0)
                return 0;
                
        if (pci_resource_flags(pdev, bar) & IORESOURCE_IO) {
                if (!request_region(pci_resource_start(pdev, bar),
                            pci_resource_len(pdev, bar), res_name))
                        goto err_out;
        }
        else if (pci_resource_flags(pdev, bar) & IORESOURCE_MEM) {
                if (!request_mem_region(pci_resource_start(pdev, bar),
                                        pci_resource_len(pdev, bar), res_name))
                        goto err_out;
        }
        
        return 0;

有了这些基础,我们来看设备的探测函数:
static int __devinit e100_probe(struct pci_dev *pdev,
        const struct pci_device_id *ent)
{
        struct net_device *netdev;
        struct nic *nic;
        int err;

        /*分配网络设备*/
        if(!(netdev = alloc_etherdev(sizeof(struct nic)))) {
                if(((1 << debug) - 1) & NETIF_MSG_PROBE)
                        printk(KERN_ERR PFX "Etherdev alloc failed, abort.\n");
                return -ENOMEM;
        }

        /*设置各成员指针函数*/
        netdev->open = e100_open;
        netdev->stop = e100_close;
        netdev->hard_start_xmit = e100_xmit_frame;
        netdev->get_stats = e100_get_stats;
        netdev->set_multicast_list = e100_set_multicast_list;
        netdev->set_mac_address = e100_set_mac_address;
        netdev->change_mtu = e100_change_mtu;
        netdev->do_ioctl = e100_do_ioctl;
        SET_ETHTOOL_OPS(netdev, &e100_ethtool_ops);
        netdev->tx_timeout = e100_tx_timeout;
        netdev->watchdog_timeo = E100_WATCHDOG_PERIOD;
        netdev->poll = e100_poll;
        netdev->weight = E100_NAPI_WEIGHT;
#ifdef CONFIG_NET_POLL_CONTROLLER
        netdev->poll_controller = e100_netpoll;
#endif
        /*设置网络设备名称*/
        strcpy(netdev->name, pci_name(pdev));

        /*取得设备私有数据结构*/
        nic = netdev_priv(netdev);
        /*网络设备指针,指向自己*/
        nic->netdev = netdev;
        /*PCIy设备指针,指向自己*/
        nic->pdev = pdev;
        nic->msg_enable = (1 << debug) - 1;
        
        /*将PCI设备的私有数据区指向网络设备*/
        pci_set_drvdata(pdev, netdev);

        /*激活PCI设备*/
        if((err = pci_enable_device(pdev))) {
                DPRINTK(PROBE, ERR, "Cannot enable PCI device, aborting.\n");
                goto err_out_free_dev;
        }

        /*判断I/O区域是否是I/O内存,如果不是,则报错退出*/
        if(!(pci_resource_flags(pdev, 0) & IORESOURCE_MEM)) {
                DPRINTK(PROBE, ERR, "Cannot find proper PCI device "
                        "base address, aborting.\n");
                err = -ENODEV;
                goto err_out_disable_pdev;
        }

        /*分配I/O内存区域*/
        if((err = pci_request_regions(pdev, DRV_NAME))) {
                DPRINTK(PROBE, ERR, "Cannot obtain PCI resources, aborting.\n");
                goto err_out_disable_pdev;
        }

        /*
         * 告之内核自己的DMA寻址能力,这里不是很明白,因为从0xFFFFFFFF来看,本来就是内核默认的32了
         * 为什么还要调用pci_set_dma_mask来重复设置呢?可能是对ULL而非UL不是很了解吧。
         */
        if((err = pci_set_dma_mask(pdev, 0xFFFFFFFFULL))) {
                DPRINTK(PROBE, ERR, "No usable DMA configuration, aborting.\n");
                goto err_out_free_res;
        }

        SET_MODULE_OWNER(netdev);
        SET_NETDEV_DEV(netdev, &pdev->dev);

        /*分配完成后,映射I/O内存*/
        nic->csr = ioremap(pci_resource_start(pdev, 0), sizeof(struct csr));
        if(!nic->csr) {
                DPRINTK(PROBE, ERR, "Cannot map device registers, aborting.\n");
                err = -ENOMEM;
                goto err_out_free_res;
        }

        if(ent->driver_data)
                nic->flags |= ich;
        else
                nic->flags &= ~ich;

        /*设置设备私有数据结构的大部份默认参数*/
        e100_get_defaults(nic);

        /* 初始化自旋锁,锅的初始化必须在调用 hw_reset 之前执行*/
        spin_lock_init(&nic->cb_lock);
        spin_lock_init(&nic->cmd_lock);

        /* 硬件复位,通过向指定I/O端口设置复位指令实现. */
        e100_hw_reset(nic);

        /*
         * PCI网卡被BIOS配置后,某些特性可能会被屏蔽掉。比如,多数BIOS都会清掉“master”位,
         * 这导致板卡不能随意向主存中拷贝数据。pci_set_master函数数会检查是否需要设置标志位,
         * 如果需要,则会将“master”位置位。
         * PS:什么是PCI master?
         * 不同于ISA总线,PCI总线的地址总线与数据总线是分时复用的。这样做的好处是,一方面
         * 可以节省接插件的管脚数,另一方面便于实现突发数据传输。在做数据传输时,由一个PCI
         * 设备做发起者(主控,Initiator或Master),而另一个PCI设备做目标(从设备,Target或Slave)。
         * 总线上的所有时序的产生与控制,都由Master来发起。PCI总线在同一时刻只能供一对设备完成传输。
         */
        pci_set_master(pdev);

        /*添加两个内核定时器,watchdog和blink_timer*/
        init_timer(&nic->watchdog);
        nic->watchdog.function = e100_watchdog;
        nic->watchdog.data = (unsigned long)nic;
        init_timer(&nic->blink_timer);
        nic->blink_timer.function = e100_blink_led;
        nic->blink_timer.data = (unsigned long)nic;

        INIT_WORK(&nic->tx_timeout_task,
                (void (*)(void *))e100_tx_timeout_task, netdev);

        if((err = e100_alloc(nic))) {
                DPRINTK(PROBE, ERR, "Cannot alloc driver memory, aborting.\n");
                goto err_out_iounmap;
        }

        /*phy寄存器初始化*/
        e100_phy_init(nic);

        if((err = e100_eeprom_load(nic)))
                goto err_out_free;

        memcpy(netdev->dev_addr, nic->eeprom, ETH_ALEN);
        if(!is_valid_ether_addr(netdev->dev_addr)) {
                DPRINTK(PROBE, ERR, "Invalid MAC address from "
                        "EEPROM, aborting.\n");
                err = -EAGAIN;
                goto err_out_free;
        }

        /* Wol magic packet can be enabled from eeprom */
        if((nic->mac >= mac_82558_D101_A4) &&
           (nic->eeprom[eeprom_id] & eeprom_id_wol))
                nic->flags |= wol_magic;

        /* ack any pending wake events, disable PME */
        pci_enable_wake(pdev, 0, 0);

        /*注册网络设备*/
        strcpy(netdev->name, "eth%d");
        if((err = register_netdev(netdev))) {
                DPRINTK(PROBE, ERR, "Cannot register net device, aborting.\n");
                goto err_out_free;
        }

        DPRINTK(PROBE, INFO, "addr 0x%lx, irq %d, "
                "MAC addr %02X:%02X:%02X:%02X:%02X:%02X\n",
                pci_resource_start(pdev, 0), pdev->irq,
                netdev->dev_addr[0], netdev->dev_addr[1], netdev->dev_addr[2],
                netdev->dev_addr[3], netdev->dev_addr[4], netdev->dev_addr[5]);

        return 0;

err_out_free:
        e100_free(nic);
err_out_iounmap:
        iounmap(nic->csr);
err_out_free_res:
        pci_release_regions(pdev);
err_out_disable_pdev:
        pci_disable_device(pdev);
err_out_free_dev:
        pci_set_drvdata(pdev, NULL);
        free_netdev(netdev);
        return err;
}
执行到这里,探测函数的使命就完成了,在对网络设备重要成员初始化时,有:
netdev->open = e100_open;
指定了设备的open函数为e100_open,这样,当第一次使用设备,比如使用ifconfig工具的时候,open函数将被调用。

二、打开设备

在探测函数中,设置了netdev->open = e100_open; 指定了设备的open函数为e100_open:

static int e100_open(struct net_device *netdev)
{
        struct nic *nic = netdev_priv(netdev);
        int err = 0;

        netif_carrier_off(netdev);
        if((err = e100_up(nic)))
                DPRINTK(IFUP, ERR, "Cannot open interface, aborting.\n");
        return err;
}

大多数涉及物理设备可以感知信号载波(carrier)的存在,载波的存在意味着设备可以工作
据个例子来讲:当一个用户拔掉了网线,也就意味着信号载波的消失。
netif_carrier_off:关闭载波信号;
netif_carrier_on:打开载波信号;
netif_carrier_ok:检测载波信号;

对于探测网卡网线是否连接,这一组函数被使用得较多;

接着,调用e100_up函数启动网卡,这个“启动”的过程,最重要的步骤有:
1、调用request_irq向内核注册中断;
2、调用netif_wake_queue函数来重新启动传输队例;

static int e100_up(struct nic *nic)
{
        int err;

        if((err = e100_rx_alloc_list(nic)))
                return err;
        if((err = e100_alloc_cbs(nic)))
                goto err_rx_clean_list;
        if((err = e100_hw_init(nic)))
                goto err_clean_cbs;
        e100_set_multicast_list(nic->netdev);
        e100_start_receiver(nic, 0);
        mod_timer(&nic->watchdog, jiffies);
        if((err = request_irq(nic->pdev->irq, e100_intr, SA_SHIRQ,
                nic->netdev->name, nic->netdev)))
                goto err_no_irq;
        netif_wake_queue(nic->netdev);
        netif_poll_enable(nic->netdev);
        /* enable ints _after_ enabling poll, preventing a race between
         * disable ints+schedule */
        e100_enable_irq(nic);
        return 0;

err_no_irq:
        del_timer_sync(&nic->watchdog);
err_clean_cbs:
        e100_clean_cbs(nic);
err_rx_clean_list:
        e100_rx_clean_list(nic);
        return err;

}

这样,中断函数e100_intr将被调用;
三、网卡中断

从本质上来讲,中断,是一种电信号,当设备有某种事件发生的时候,它就会产生中断,通过总线把电信号发送给中断控制器,如果中断的线是激活的,中断控制器就把电信号发送给处理器的某个特定引脚。处理器于是立即停止自己正在做的事,跳到内存中内核设置的中断处理程序的入口点,进行中断处理。
在内核中断处理中,会检测中断与我们刚才注册的中断号匹配,于是,注册的中断处理函数就被调用了。

当需要发/收数据,出现错误,连接状态变化等,网卡的中断信号会被触发。当接收到中断后,中断函数读取中断状态位,进行合法性判断,如判断中断信号是否是自己的等,然后,应答设备中断——OK,我已经知道了,你回去继续工作吧……
接着,它就屏蔽此中断,然后netif_rx_schedule函数接收,接收函数 会在未来某一时刻调用设备的poll函数(对这里而言,注册的是e100_poll)实现设备的轮询:

static irqreturn_t e100_intr(int irq, void *dev_id, struct pt_regs *regs)
{
        struct net_device *netdev = dev_id;
        struct nic *nic = netdev_priv(netdev);
        u8 stat_ack = readb(&nic->csr->scb.stat_ack);

        DPRINTK(INTR, DEBUG, "stat_ack = 0x%02X\n", stat_ack);

        if(stat_ack == stat_ack_not_ours ||        /* Not our interrupt */
           stat_ack == stat_ack_not_present)        /* Hardware is ejected */
                return IRQ_NONE;

        /* Ack interrupt(s) */
        writeb(stat_ack, &nic->csr->scb.stat_ack);

        /* We hit Receive No Resource (RNR); restart RU after cleaning */
        if(stat_ack & stat_ack_rnr)
                nic->ru_running = RU_SUSPENDED;

        e100_disable_irq(nic);
        netif_rx_schedule(netdev);

        return IRQ_HANDLED;
}

对于数据包的接收而言,我们关注的是poll函数中,调用e100_rx_clean进行数据的接收:

static int e100_poll(struct net_device *netdev, int *budget)
{
        struct nic *nic = netdev_priv(netdev);
       /*
         * netdev->quota是当前CPU能够从所有接口中接收数据包的最大数目,budget是在
         * 初始化阶段分配给接口的weight值,轮询函数必须接受二者之间的最小值。表示
         * 轮询函数本次要处理的数据包个数。
         */
        unsigned int work_to_do = min(netdev->quota, *budget);
        unsigned int work_done = 0;
        int tx_cleaned;

          /*进行数据包的接收和传输*/             
        e100_rx_clean(nic, &work_done, work_to_do);
        tx_cleaned = e100_tx_clean(nic);

         /*接收和传输完成后,就退出poll模块,重启中断*/
        /* If no Rx and Tx cleanup work was done, exit polling mode. */
        if((!tx_cleaned && (work_done == 0)) || !netif_running(netdev)) {
                netif_rx_complete(netdev);
                e100_enable_irq(nic);
                return 0;
        }

        *budget -= work_done;
        netdev->quota -= work_done;

        return 1;
}

static inline void e100_rx_clean(struct nic *nic, unsigned int *work_done,
        unsigned int work_to_do)
{
        struct rx *rx;
        int restart_required = 0;
        struct rx *rx_to_start = NULL;

        /* are we already rnr? then pay attention!!! this ensures that
         * the state machine progression never allows a start with a 
         * partially cleaned list, avoiding a race between hardware
         * and rx_to_clean when in NAPI mode */
        if(RU_SUSPENDED == nic->ru_running)
                restart_required = 1;

        /* Indicate newly arrived packets */
        for(rx = nic->rx_to_clean; rx->skb; rx = nic->rx_to_clean = rx->next) {
                int err = e100_rx_indicate(nic, rx, work_done, work_to_do);
                if(-EAGAIN == err) {
                        /* hit quota so have more work to do, restart once
                         * cleanup is complete */
                        restart_required = 0;
                        break;
                } else if(-ENODATA == err)
                        break; /* No more to clean */
        }

        /* save our starting point as the place we'll restart the receiver */
        if(restart_required)
                rx_to_start = nic->rx_to_clean;

        /* Alloc new skbs to refill list */
        for(rx = nic->rx_to_use; !rx->skb; rx = nic->rx_to_use = rx->next) {
                if(unlikely(e100_rx_alloc_skb(nic, rx)))
                        break; /* Better luck next time (see watchdog) */
        }

        if(restart_required) {
                // ack the rnr?
                writeb(stat_ack_rnr, &nic->csr->scb.stat_ack);
                e100_start_receiver(nic, rx_to_start);
                if(work_done)
                        (*work_done)++;
        }
}
四、网卡的数据接收

内核如何从网卡接受数据,传统的经典过程:
1、数据到达网卡;
2、网卡产生一个中断给内核;
3、内核使用I/O指令,从网卡I/O区域中去读取数据;


我们在许多网卡驱动中,都可以在网卡的中断函数中见到这一过程。

但是,这一种方法,有一种重要的问题,就是大流量的数据来到,网卡会产生大量的中断,内核在中断上下文中,会浪费大量的资源来处理中断本身。所以,一个问题是,“可不可以不使用中断”,这就是轮询技术,所谓NAPI技术,说来也不神秘,就是说,内核屏蔽中断,然后隔一会儿就去问网卡,“你有没有数据啊?”……

从这个描述本身可以看到,哪果数据量少,轮询同样占用大量的不必要的CPU资源,大家各有所长吧,呵呵……

OK,另一个问题,就是从网卡的I/O区域,包括I/O寄存器或I/O内存中去读取数据,这都要CPU去读,也要占用CPU资源,“CPU从I/O区域读,然后把它放到内存(这个内存指的是系统本身的物理内存,跟外设的内存不相干,也叫主内存)中”。于是自然地,就想到了DMA技术——让网卡直接从主内存之间读写它们的I/O数据,CPU,这儿不干你事,自己找乐子去:
1、首先,内核在主内存中为收发数据建立一个环形的缓冲队列(通常叫DMA环形缓冲区)。
2、内核将这个缓冲区通过DMA映射,把这个队列交给网卡;
3、网卡收到数据,就直接放进这个环形缓冲区了——也就是直接放进主内存了;然后,向系统产生一个中断;
4、内核收到这个中断,就取消DMA映射,这样,内核就直接从主内存中读取数据;


——呵呵,这一个过程比传统的过程少了不少工作,因为设备直接把数据放进了主内存,不需要CPU的干预,效率是不是提高不少?

对应以上4步,来看它的具体实现:
1、分配环形DMA缓冲区
Linux内核中,用skb来描述一个缓存,所谓分配,就是建立一定数量的skb,然后把它们组织成一个双向链表;

2、建立DMA映射
内核通过调用
dma_map_single(struct device *dev,void *buffer,size_t size,enum dma_data_direction direction)
建立映射关系。
struct device *dev,描述一个设备;
buffer:把哪个地址映射给设备;也就是某一个skb——要映射全部,当然是做一个双向链表的循环即可;
size:缓存大小;
direction:映射方向——谁传给谁:一般来说,是“双向”映射,数据在设备和内存之间双向流动;

对于PCI设备而言(网卡一般是PCI的),通过另一个包裹函数pci_map_single,这样,就把buffer交给设备了!设备可以直接从里边读/取数据。

3、这一步由硬件完成;

4、取消映射
dma_unmap_single,对PCI而言,大多调用它的包裹函数pci_unmap_single,不取消的话,缓存控制权还在设备手里,要调用它,把主动权掌握在CPU手里——因为我们已经接收到数据了,应该由CPU把数据交给上层网络栈;

当然,不取消之前,通常要读一些状态位信息,诸如此类,一般是调用
dma_sync_single_for_cpu()
让CPU在取消映射前,就可以访问DMA缓冲区中的内容。

关于DMA映射的更多内容,可以参考《Linux设备驱动程序》“内存映射和DMA”章节相关内容!

OK,有了这些知识,我们就可以来看e100的代码了,它跟上面讲的步骤基本上一样的——绕了这么多圈子,就是想绕到e100上面了,呵呵!


在e100_open函数中,调用e100_up,我们前面分析它时,略过了一个重要的东东,就是环形缓冲区的建立,这一步,是通过
e100_rx_alloc_list函数调用完成的:

static int e100_rx_alloc_list(struct nic *nic)
{
        struct rx *rx;
        unsigned int i, count = nic->params.rfds.count;

        nic->rx_to_use = nic->rx_to_clean = NULL;
        nic->ru_running = RU_UNINITIALIZED;

        /*结构struct rx用来描述一个缓冲区节点,这里分配了count个*/
        if(!(nic->rxs = kmalloc(sizeof(struct rx) * count, GFP_ATOMIC)))
                return -ENOMEM;
        memset(nic->rxs, 0, sizeof(struct rx) * count);

        /*虽然是连续分配的,不过还是遍历它,建立双向链表,然后为每一个rx的skb指针分员分配空间
        skb用来描述内核中的一个数据包,呵呵,说到重点了*/
        for(rx = nic->rxs, i = 0; i < count; rx++, i++) {
                rx->next = (i + 1 < count) ? rx + 1 : nic->rxs;
                rx->prev = (i == 0) ? nic->rxs + count - 1 : rx - 1;
                if(e100_rx_alloc_skb(nic, rx)) {                /*分配缓存*/
                        e100_rx_clean_list(nic);
                        return -ENOMEM;
                }
        }

        nic->rx_to_use = nic->rx_to_clean = nic->rxs;
        nic->ru_running = RU_SUSPENDED;

        return 0;
}



#define RFD_BUF_LEN (sizeof(struct rfd) + VLAN_ETH_FRAME_LEN)
static inline int e100_rx_alloc_skb(struct nic *nic, struct rx *rx)
{
        /*skb缓存的分配,是通过调用系统函数dev_alloc_skb来完成的,它同内核栈中通常调用alloc_skb的区别在于,
        它是原子的,所以,通常在中断上下文中使用*/
        if(!(rx->skb = dev_alloc_skb(RFD_BUF_LEN + NET_IP_ALIGN)))
                return -ENOMEM;

        /*初始化必要的成员 */
        rx->skb->dev = nic->netdev;
        skb_reserve(rx->skb, NET_IP_ALIGN);
        /*这里在数据区之前,留了一块sizeof(struct rfd) 这么大的空间,该结构的
        一个重要作用,用来保存一些状态信息,比如,在接收数据之前,可以先通过
        它,来判断是否真有数据到达等,诸如此类*/
        memcpy(rx->skb->data, &nic->blank_rfd, sizeof(struct rfd));
        /*这是最关键的一步,建立DMA映射,把每一个缓冲区rx->skb->data都映射给了设备,缓存区节点
        rx利用dma_addr保存了每一次映射的地址,这个地址后面会被用到*/
        rx->dma_addr = pci_map_single(nic->pdev, rx->skb->data,
                RFD_BUF_LEN, PCI_DMA_BIDIRECTIONAL);

        if(pci_dma_mapping_error(rx->dma_addr)) {
                dev_kfree_skb_any(rx->skb);
                rx->skb = 0;
                rx->dma_addr = 0;
                return -ENOMEM;
        }

        /* Link the RFD to end of RFA by linking previous RFD to
         * this one, and clearing EL bit of previous.  */
        if(rx->prev->skb) {
                struct rfd *prev_rfd = (struct rfd *)rx->prev->skb->data;
                /*put_unaligned(val,ptr);用到把var放到ptr指针的地方,它能处理处理内存对齐的问题
                prev_rfd是在缓冲区开始处保存的一点空间,它的link成员,也保存了映射后的地址*/
                put_unaligned(cpu_to_le32(rx->dma_addr),
                        (u32 *)&prev_rfd->link);
                wmb();
                prev_rfd->command &= ~cpu_to_le16(cb_el);
                pci_dma_sync_single_for_device(nic->pdev, rx->prev->dma_addr,
                        sizeof(struct rfd), PCI_DMA_TODEVICE);
        }

        return 0;
}

e100_rx_alloc_list函数在一个循环中,建立了环形缓冲区,并调用e100_rx_alloc_skb为每个缓冲区分配了空间,并做了
DMA映射。这样,我们就可以来看接收数据的过程了。

前面我们讲过,中断函数中,调用netif_rx_schedule,表明使用轮询技术,系统会在未来某一时刻,调用设备的poll函数:

static int e100_poll(struct net_device *netdev, int *budget)
{
        struct nic *nic = netdev_priv(netdev);
        unsigned int work_to_do = min(netdev->quota, *budget);
        unsigned int work_done = 0;
        int tx_cleaned;

        e100_rx_clean(nic, &work_done, work_to_do);
        tx_cleaned = e100_tx_clean(nic);

        /* If no Rx and Tx cleanup work was done, exit polling mode. */
        if((!tx_cleaned && (work_done == 0)) || !netif_running(netdev)) {
                netif_rx_complete(netdev);
                e100_enable_irq(nic);
                return 0;
        }

        *budget -= work_done;
        netdev->quota -= work_done;

        return 1;
}

目前,我们只关心rx,所以,e100_rx_clean函数就成了我们关注的对像,它用来从缓冲队列中接收全部数据(这或许是取名为clean的原因吧!):

static inline void e100_rx_clean(struct nic *nic, unsigned int *work_done,
        unsigned int work_to_do)
{
        struct rx *rx;
        int restart_required = 0;
        struct rx *rx_to_start = NULL;

        /* are we already rnr? then pay attention!!! this ensures that
         * the state machine progression never allows a start with a 
         * partially cleaned list, avoiding a race between hardware
         * and rx_to_clean when in NAPI mode */
        if(RU_SUSPENDED == nic->ru_running)
                restart_required = 1;

        /* 函数最重要的工作,就是遍历环形缓冲区,接收数据*/
        for(rx = nic->rx_to_clean; rx->skb; rx = nic->rx_to_clean = rx->next) {
                int err = e100_rx_indicate(nic, rx, work_done, work_to_do);
                if(-EAGAIN == err) {
                        /* hit quota so have more work to do, restart once
                         * cleanup is complete */
                        restart_required = 0;
                        break;
                } else if(-ENODATA == err)
                        break; /* No more to clean */
        }

        /* save our starting point as the place we'll restart the receiver */
        if(restart_required)
                rx_to_start = nic->rx_to_clean;

        /* Alloc new skbs to refill list */
        for(rx = nic->rx_to_use; !rx->skb; rx = nic->rx_to_use = rx->next) {
                if(unlikely(e100_rx_alloc_skb(nic, rx)))
                        break; /* Better luck next time (see watchdog) */
        }

        if(restart_required) {
                // ack the rnr?
                writeb(stat_ack_rnr, &nic->csr->scb.stat_ack);
                e100_start_receiver(nic, rx_to_start);
                if(work_done)
                        (*work_done)++;
        }
}



static inline int e100_rx_indicate(struct nic *nic, struct rx *rx,
        unsigned int *work_done, unsigned int work_to_do)
{
        struct sk_buff *skb = rx->skb;
        struct rfd *rfd = (struct rfd *)skb->data;
        u16 rfd_status, actual_size;

        if(unlikely(work_done && *work_done >= work_to_do))
                return -EAGAIN;

        /* 读取数据之前,也就是取消DMA映射之前,需要先读取cb_complete 状态位,
        以确定数据是否真的准备好了,并且,rfd的actual_size中,也包含了真实的数据大小
        pci_dma_sync_single_for_cpu函数前面已经介绍过,它让CPU在取消DMA映射之前,具备
        访问DMA缓存的能力*/
        pci_dma_sync_single_for_cpu(nic->pdev, rx->dma_addr,
                sizeof(struct rfd), PCI_DMA_FROMDEVICE);
        rfd_status = le16_to_cpu(rfd->status);

        DPRINTK(RX_STATUS, DEBUG, "status=0x%04X\n", rfd_status);

        /* If data isn't ready, nothing to indicate */
        if(unlikely(!(rfd_status & cb_complete)))
                return -ENODATA;

        /* Get actual data size */
        actual_size = le16_to_cpu(rfd->actual_size) & 0x3FFF;
        if(unlikely(actual_size > RFD_BUF_LEN - sizeof(struct rfd)))
                actual_size = RFD_BUF_LEN - sizeof(struct rfd);

        /* 取消映射,因为通过DMA,网卡已经把数据放在了主内存中,这里一取消,也就意味着,
        CPU可以处理主内存中的数据了 */
        pci_unmap_single(nic->pdev, rx->dma_addr,
                RFD_BUF_LEN, PCI_DMA_FROMDEVICE);

        /* this allows for a fast restart without re-enabling interrupts */
        if(le16_to_cpu(rfd->command) & cb_el)
                nic->ru_running = RU_SUSPENDED;
        
        /*正确地设置data指针,因为最前面有一个sizeof(struct rfd)大小区域,跳过它*/
        skb_reserve(skb, sizeof(struct rfd));
        /*更新skb的tail和len指针,也是就更新接收到这么多数据的长度*/
        skb_put(skb, actual_size);
        /*设置协议位*/
        skb->protocol = eth_type_trans(skb, nic->netdev);

        if(unlikely(!(rfd_status & cb_ok))) {
                /* Don't indicate if hardware indicates errors */
                nic->net_stats.rx_dropped++;
                dev_kfree_skb_any(skb);
        } else if(actual_size > nic->netdev->mtu + VLAN_ETH_HLEN) {
                /* Don't indicate oversized frames */
                nic->rx_over_length_errors++;
                nic->net_stats.rx_dropped++;
                dev_kfree_skb_any(skb);
        } else {
                /*网卡驱动要做的最后一步,就是统计接收计数器,设置接收时间戳,然后调用netif_receive_skb,
                把数据包交给上层协议栈,自己的光荣始命也就完成了*/
                nic->net_stats.rx_packets++;
                nic->net_stats.rx_bytes += actual_size;
                nic->netdev->last_rx = jiffies;
                netif_receive_skb(skb);
                if(work_done)
                        (*work_done)++;
        }

        rx->skb = NULL;

        return 0;
}

网卡驱动执行到这里,数据接收的工作,也就处理完成了。但是,使用这一种方法的驱动,省去了网络栈中一个重要的内容,就是
“队列层”,让我们来看看,传统中断接收数据包模式下,使用netif_rx函数调用,又会发生什么。
五、队列层

1、软中断与下半部
当用中断处理的时候,为了减少中断处理的工作量,比如,一般中断处理时,需要屏蔽其它中断,如果中断处理时间过长,那么其它中断
有可能得不到及时处理,也以,有一种机制,就是把“不必马上处理”的工作,推迟一点,让它在中断处理后的某一个时刻得到处理。这就
是下半部。

下半部只是一个机制,它在Linux中,有多种实现方式,其中一种对时间要求最严格的实现方式,叫“软中断”,可以使用:

open_softirq()

来向内核注册一个软中断,
然后,在合适的时候,调用

raise_softirq_irqoff()

触发它。

如果采用中断方式接收数据(这一节就是在说中断方式接收,后面,就不用这种假设了),同样也需要软中断,可以调用

open_softirq(NET_RX_SOFTIRQ, net_rx_action, NULL);

向内核注册一个名为NET_RX_SOFTIR的软中断,net_rx_action是软中断的处理函数。

然后,在驱动中断处理完后的某一个时刻,调用

raise_softirq_irqoff(NET_RX_SOFTIRQ);

触发它,这样net_rx_action将得到执行。

2、队列层
什么是队列层?通常,在网卡收发数据的时候,需要维护一个缓冲区队列,来缓存可能存在的突发数据,类似于前面的DMA环形缓冲区。
队列层中,包含了一个叫做struct softnet_data:

struct softnet_data
{
        /*throttle 用于拥塞控制,当拥塞发生时,throttle将被设置,后续进入的数据包将被丢弃*/
        int                        throttle;
        /*netif_rx函数返回的拥塞级别*/
        int                        cng_level;
        int                        avg_blog;
        /*softnet_data 结构包含一个指向接收和传输队列的指针,input_pkt_queue成员指向准备传送
        给网络层的sk_buffs包链表的首部的指针,这个队列中的包是由netif_rx函数递交的*/
        struct sk_buff_head        input_pkt_queue;
        
        struct list_head        poll_list;
        struct net_device        *output_queue;
        struct sk_buff                *completion_queue;

        struct net_device        backlog_dev;        /* Sorry. 8) */
};

内核使用了一个同名的变量softnet_data,它是一个Per-CPU变量,每个CPU都有一个。

net/core/dev.c

DECLARE_PER_CPU(struct softnet_data,softnet_data);



/*
*       网络模块的核心处理模块.
*/
static int __init net_dev_init(void)
{
        int i, rc = -ENOMEM;

        BUG_ON(!dev_boot_phase);

        net_random_init();

        if (dev_proc_init())                /*初始化proc文件系统*/
                goto out;

        if (netdev_sysfs_init())        /*初始化sysfs文件系统*/
                goto out;

        /*ptype_all和ptype_base是重点,后面会详细分析,它们都是
        struct list_head类型变量,这里初始化链表成员*/
        INIT_LIST_HEAD(&ptype_all);
        for (i = 0; i < 16; i++) 
                INIT_LIST_HEAD(&ptype_base[i]);

        for (i = 0; i < ARRAY_SIZE(dev_name_head); i++)
                INIT_HLIST_HEAD(&dev_name_head[i]);

        for (i = 0; i < ARRAY_SIZE(dev_index_head); i++)
                INIT_HLIST_HEAD(&dev_index_head[i]);

        /*
         *        初始化包接收队列,这里我们的重点了.
         */

        /*遍历每一个CPU,取得它的softnet_data,我们说过,它是一个struct softnet_data的Per-CPU变量*/
        for (i = 0; i < NR_CPUS; i++) {
                struct softnet_data *queue;
                
                /*取得第i个CPU的softnet_data,因为队列是包含在它里边的,所以,我会直接说,“取得队列”*/
                queue = &per_cpu(softnet_data, i);
                /*初始化队列头*/
                skb_queue_head_init(&queue->input_pkt_queue);
                queue->throttle = 0;
                queue->cng_level = 0;
                queue->avg_blog = 10; /* arbitrary non-zero */
                queue->completion_queue = NULL;
                INIT_LIST_HEAD(&queue->poll_list);
                set_bit(__LINK_STATE_START, &queue->backlog_dev.state);
                queue->backlog_dev.weight = weight_p;
                /*这里,队列中backlog_dev设备,它是一个伪网络设备,不对应任何物理设备,它的poll函数,指向了
                process_backlog,后面我们会详细分析*/
                queue->backlog_dev.poll = process_backlog;
                atomic_set(&queue->backlog_dev.refcnt, 1);
        }

#ifdef OFFLINE_SAMPLE
        samp_timer.expires = jiffies + (10 * HZ);
        add_timer(&samp_timer);
#endif

        dev_boot_phase = 0;
        
        /*注册收/发软中断*/
        open_softirq(NET_TX_SOFTIRQ, net_tx_action, NULL);
        open_softirq(NET_RX_SOFTIRQ, net_rx_action, NULL);

        hotcpu_notifier(dev_cpu_callback, 0);
        dst_init();
        dev_mcast_init();
        rc = 0;
out:
        return rc;
}

这样,初始化完成后,在驱动程序中,在中断处理函数中,会调用netif_rx将数据交上来,这与采用轮询技术,有本质的不同:

int netif_rx(struct sk_buff *skb)
{
        int this_cpu;
        struct softnet_data *queue;
        unsigned long flags;

        /* if netpoll wants it, pretend we never saw it */
        if (netpoll_rx(skb))
                return NET_RX_DROP;

        /*接收时间戳未设置,设置之*/
        if (!skb->stamp.tv_sec)
                net_timestamp(&skb->stamp);

        /*
         * 这里准备将数据包放入接收队列,需要禁止本地中断,在入队操作完成后,再打开中断.
         */
        local_irq_save(flags);
        /*获取当前CPU对应的softnet_data变量*/
        this_cpu = smp_processor_id();
        queue = &__get_cpu_var(softnet_data);

        /*接收计数器累加*/
        __get_cpu_var(netdev_rx_stat).total++;
        
        /*接收队列是否已满*/
        if (queue->input_pkt_queue.qlen <= netdev_max_backlog) {
                if (queue->input_pkt_queue.qlen) {
                        if (queue->throttle)                        /*拥塞发生了,丢弃数据包*/
                                goto drop;
                        
                        /*数据包入队操作*/
enqueue:
                        dev_hold(skb->dev);                        /*累加设备引入计数器*/
                        __skb_queue_tail(&queue->input_pkt_queue, skb);                /*将数据包加入接收队列*/
#ifndef OFFLINE_SAMPLE
                        get_sample_stats(this_cpu);
#endif
                        local_irq_restore(flags);
                        return queue->cng_level;
                }

                /*
                 * 驱动程序不断地调用net_rx函数,实现接收数据包的入队操作,当qlen == 0时, 则进入这段代码,这里,如果已经被设置拥塞标志的话,则清除它,因为这里将要调用软中断,开始将数据包交给 上层了,即上层协议的接收函数将执行出队操作,拥塞自然而然也就不存在了。 */
                if (queue->throttle)
                        queue->throttle = 0;

                /*
                 * netif_rx_schedule函数完成两件重要的工作:
                 * 1、将bakclog_dev设备加入“处理数据包的设备”的链表当中;
                 * 2、触发软中断函数,进行数据包接收处理;
                 */
                netif_rx_schedule(&queue->backlog_dev);
                goto enqueue;
        }

        /*前面判断了队列是否已满,如果已满而标志未设置,设置之,并累加拥塞计数器*/
        if (!queue->throttle) {
                queue->throttle = 1;
                __get_cpu_var(netdev_rx_stat).throttled++;
        }

/*拥塞发生,累加丢包计数器,释放数据包*/
drop:
        __get_cpu_var(netdev_rx_stat).dropped++;
        local_irq_restore(flags);

        kfree_skb(skb);
        return NET_RX_DROP;
}

从这段代码的分析中,我们可以看到,当第一个数据包被接收后,因为qlen==0,所以首先会调用netif_rx_schedule触发软中断,然后利用goto跳转至入队。因为软中断被触发后,将执行出队操作,把数据交往上层处理。而当这个时候,又有数据包进入,即网卡中断产生,因为它的优先级高过软中断,这样,出队操作即被中断,网卡中断程序再将被调用,netif_rx函数又再次被执行,如果队列未满,就入队返回。中断完成后,软中断的执行过程被恢复而继续执行出队——如此生产者/消费者循环不止,生生不息……

netif_rx调用netif_rx_schedule进一步处理数据包,我们注意到:
1、前面讨论过,采用轮询技术时,同样地,也是调用netif_rx_schedule,把设备自己传递了过去;
2、这里,采用中断方式,传递的是队列中的一个“伪设备”,并且,这个伪设备的poll函数指针,指向了一个叫做process_backlog的函数;

netif_rx_schedule函数完成两件重要的工作:
1、将bakclog_dev设备加入“处理数据包的设备”的链表当中;
2、触发软中断函数,进行数据包接收处理;

这样,我们可以猜想,在软中断函数中,不论是伪设备bakclog_dev,还是真实的设备(如前面讨论过的e100),都会被软中断函数以:
dev->poll()
的形式调用,对于e100来说,poll函数的接收过程已经分析了,而对于其它所有没有采用轮询技术的网络设备来说,它们将统统调用
process_backlog函数(我觉得把它改名为pseudo-poll是否更合适一些^o^)。

OK,我想分析到这里,关于中断处理与轮询技术的差异,已经基本分析开了……

继续来看,netif_rx_schedule进一步调用__netif_rx_schedule:

/* Try to reschedule poll. Called by irq handler. */

static inline void netif_rx_schedule(struct net_device *dev)
{
        if (netif_rx_schedule_prep(dev))
                __netif_rx_schedule(dev);
}



/* Add interface to tail of rx poll list. This assumes that _prep has
* already been called and returned 1.
*/

static inline void __netif_rx_schedule(struct net_device *dev)
{
        unsigned long flags;

        local_irq_save(flags);
        dev_hold(dev);
        /*伪设备也好,真实的设备也罢,都被加入了队列层的设备列表*/
        list_add_tail(&dev->poll_list, &__get_cpu_var(softnet_data).poll_list);
        if (dev->quota < 0)
                dev->quota += dev->weight;
        else
                dev->quota = dev->weight;
        /*触发软中断*/
        __raise_softirq_irqoff(NET_RX_SOFTIRQ);
        local_irq_restore(flags);
}

软中断被触发,注册的net_rx_action函数将被调用:

/*接收的软中断处理函数*/
static void net_rx_action(struct softirq_action *h)
{
        struct softnet_data *queue = &__get_cpu_var(softnet_data);
        unsigned long start_time = jiffies;
        int budget = netdev_max_backlog;

        
        local_irq_disable();
        
        /*
         * 遍历队列的设备链表,如前所述,__netif_rx_schedule已经执行了
         * list_add_tail(&dev->poll_list, &__get_cpu_var(softnet_data).poll_list);
         * 设备bakclog_dev已经被添加进来了
         */
        while (!list_empty(&queue->poll_list)) {
                struct net_device *dev;

                if (budget <= 0 || jiffies - start_time > 1)
                        goto softnet_break;

                local_irq_enable();
                
                /*取得链表中的设备*/
                dev = list_entry(queue->poll_list.next,
                                 struct net_device, poll_list);
                netpoll_poll_lock(dev);

                /*调用设备的poll函数,处理接收数据包,这样,采用轮询技术的网卡,它的真实的poll函数将被调用,
                这就回到我们上一节讨论的e100_poll函数去了,而对于采用传统中断处理的设备,它们调用的,都将是
                bakclog_dev的process_backlog函数*/
                if (dev->quota <= 0 || dev->poll(dev, &budget)) {
                        netpoll_poll_unlock(dev);
                        
                        /*处理完成后,把设备从设备链表中删除,又重置于末尾*/
                        local_irq_disable();
                        list_del(&dev->poll_list);
                        list_add_tail(&dev->poll_list, &queue->poll_list);
                        if (dev->quota < 0)
                                dev->quota += dev->weight;
                        else
                                dev->quota = dev->weight;
                } else {
                        netpoll_poll_unlock(dev);
                        dev_put(dev);
                        local_irq_disable();
                }
        }
out:
        local_irq_enable();
        return;

softnet_break:
        __get_cpu_var(netdev_rx_stat).time_squeeze++;
        __raise_softirq_irqoff(NET_RX_SOFTIRQ);
        goto out;
}

对于dev->poll(dev, &budget)的调用,一个真实的poll函数的例子,我们已经分析过了,现在来看process_backlog,

static int process_backlog(struct net_device *backlog_dev, int *budget)
{
        int work = 0;
        int quota = min(backlog_dev->quota, *budget);
        struct softnet_data *queue = &__get_cpu_var(softnet_data);
        unsigned long start_time = jiffies;

        backlog_dev->weight = weight_p;
        
        /*在这个循环中,执行出队操作,把数据从队列中取出来,交给netif_receive_skb,直至队列为空*/
        for (;;) {
                struct sk_buff *skb;
                struct net_device *dev;

                local_irq_disable();
                skb = __skb_dequeue(&queue->input_pkt_queue);
                if (!skb)
                        goto job_done;
                local_irq_enable();

                dev = skb->dev;

                netif_receive_skb(skb);

                dev_put(dev);

                work++;

                if (work >= quota || jiffies - start_time > 1)
                        break;

        }

        backlog_dev->quota -= work;
        *budget -= work;
        return -1;

/*当队列中的数据包被全部处理后,将执行到这里*/
job_done:
        backlog_dev->quota -= work;
        *budget -= work;

        list_del(&backlog_dev->poll_list);
        smp_mb__before_clear_bit();
        netif_poll_enable(backlog_dev);

        if (queue->throttle)
                queue->throttle = 0;
        local_irq_enable();
        return 0;
}

这个函数重要的工作,就是出队,然后调用netif_receive_skb()将数据包交给上层,这与上一节讨论的poll是一样的。这也是为什么,
在网卡驱动的编写中,采用中断技术,要调用netif_rx,而采用轮询技术,要调用netif_receive_skb啦!

到了这里,就处理完数据包与设备相关的部分了,数据包将进入上层协议栈……