DPDK:中断处理流程

2024-08-23 14:04
文章标签 流程 处理 中断 dpdk

本文主要是介绍DPDK:中断处理流程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

        本文基于DPDK17.11版本源码分析。主要分析一下DPDK的中断处理流程。

        网卡支持的中断有多种类型,比如收发包,LSC(链路状态变化),mailbox等,但是DPDK使用PMD来收发包,不用处理收发包中断

        将网卡绑定到igb_uio时会注册uio,生成/dev/uiox字符设备。DPDK初始化时会open /dev/uiox设备,对应到kernel端会申请中断号,并注册中断处理函数。DPDK还会创建中断处理线程,并注册用户态的中断处理函数,注册时将open的fd添加到epoll队列中等待中断发生。中断发生时,首先调用kernel中注册的中断处理函数,此函数主要用来唤醒用户态的中断处理线程,中断处理线程再调用用户态的中断处理函数。

        下面通过代码分析了整个过程。

1、创建中断处理线程

        eal初始化时,会创建中断处理线程。

int
rte_eal_init(int argc, char **argv)rte_eal_intr_init/* init the global interrupt source head *///初始化全局链表 intr_sources,后面会使用 rte_intr_callback_register 注册中断源到此链表TAILQ_INIT(&intr_sources);//创建管道intr_pipe,当主线程通过rte_intr_callback_register注册了中断处理函数时,需要通过pipe通知//中断处理线程将fd添加到epoll中等待事件发生pipe(intr_pipe.pipefd)//创建中断处理线程rte_ctrl_thread_create(&intr_thread, "eal-intr-thread", NULL, eal_intr_thread_main, NULL);/* Set thread_name for aid in debugging. *///设置线程名字为eal-intr-threadsnprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "eal-intr-thread");ret_1 = rte_thread_setname(intr_thread, thread_name);

2、中断处理线程逻辑

static __attribute__((noreturn)) void *
eal_intr_thread_main(__rte_unused void *arg)
{//最外层的for循环永远不会退出for(::) {static struct epoll_event pipe_event = {.events = EPOLLIN | EPOLLPRI,};struct rte_intr_source *src;unsigned numfds = 0;//创建epoll,用来获取fd事件int pfd = epoll_create(1);//获取管道intr_pipe读描述符,默认监听管道的读事件pipe_event.data.fd = intr_pipe.readfd;//将fd添加到epoll,用来获取读事件epoll_ctl(pfd, EPOLL_CTL_ADD, intr_pipe.readfd, &pipe_event);numfds++;//加锁保护链表 intr_sourcesrte_spinlock_lock(&intr_lock);struct rte_intr_source *src;//遍历intr_sources,将中断fd添加到epoll进行监听TAILQ_FOREACH(src, &intr_sources, next) {//跳过没有callback的中断源if (src->callbacks.tqh_first == NULL)continue; /* skip those with no callbacks *///添加感兴趣的事件ev.events = EPOLLIN | EPOLLPRI | EPOLLRDHUP | EPOLLHUP;//获取src代表的中断源fdev.data.fd = src->intr_handle.fd;//将此中断fd加入epollepoll_ctl(pfd, EPOLL_CTL_ADD, src->intr_handle.fd, &ev);//numfds表示epoll中监听的fd个数numfds++;}rte_spinlock_unlock(&intr_lock);//等待事件发生eal_intr_handle_interrupts(pfd, numfds);//每次循环结束都要释放epoll fd,下次循环重新创建新的/*** when we return, we need to rebuild the* list of fds to monitor.*/close(pfd);}
}

调用epoll_wait等待事件发生,如果有事件了再调用 eal_intr_process_interrupts 处理发生的事件。

static void
eal_intr_handle_interrupts(int pfd, unsigned totalfds)
{struct epoll_event events[totalfds];for(;;) {//#define EAL_INTR_EPOLL_WAIT_FOREVER (-1)//最后一个参数为-1表示堵塞等待事件的到来nfds = epoll_wait(pfd, events, totalfds, EAL_INTR_EPOLL_WAIT_FOREVER);//出现异常错误,返回/* epoll_wait fail */if (nfds < 0) {if (errno == EINTR)continue;RTE_LOG(ERR, EAL,"epoll_wait returns with fail\n");return;}/* epoll_wait timeout, will never happens here *///超时了?这里是不可能发生的,因为最后一个参数是 -1else if (nfds == 0)continue;//处理发生事件的fd/* epoll_wait has at least one fd ready to read */if (eal_intr_process_interrupts(events, nfds) < 0)return;}
}

处理发生事件的fd,调用用户态注册的中断处理函数。

static int
eal_intr_process_interrupts(struct epoll_event *events, int nfds)
{bool call = false;int n, bytes_read;struct rte_intr_source *src;struct rte_intr_callback *cb;union rte_intr_read_buffer buf;struct rte_intr_callback active_cb;//循环处理发生的事件for (n = 0; n < nfds; n++) {/*** if the pipe fd is ready to read, return out to* rebuild the wait list.*///如果是intr_pipe.readfd发生事件说明有新的中断添加进来,//需要返回-1,在最外层重建 epollif (events[n].data.fd == intr_pipe.readfd){int r = read(intr_pipe.readfd, buf.charbuf, sizeof(buf.charbuf));return -1;}//检查events[n].data.fd是哪个中断的事件rte_spinlock_lock(&intr_lock);TAILQ_FOREACH(src, &intr_sources, next)if (src->intr_handle.fd ==events[n].data.fd)break;if (src == NULL){rte_spinlock_unlock(&intr_lock);continue;}/* mark this interrupt source as active and release the lock. */src->active = 1;rte_spinlock_unlock(&intr_lock);//读取fdbytes_read = read(events[n].data.fd, &buf, bytes_read);if (bytes_read < 0) {if (errno == EINTR || errno == EWOULDBLOCK)continue;RTE_LOG(ERR, EAL, "Error reading from file ""descriptor %d: %s\n",events[n].data.fd,strerror(errno));} else if (bytes_read == 0)RTE_LOG(ERR, EAL, "Read nothing from file ""descriptor %d\n", events[n].data.fd);else//正常的话,设置call为truecall = true;/* grab a lock, again to call callbacks and update status. */rte_spinlock_lock(&intr_lock);if (call) {//调用 src 上注册的中断处理函数/* Finally, call all callbacks. */TAILQ_FOREACH(cb, &src->callbacks, next) {/* make a copy and unlock. */active_cb = *cb;rte_spinlock_unlock(&intr_lock);/* call the actual callback */active_cb.cb_fn(active_cb.cb_arg);/*get the lock back. */rte_spinlock_lock(&intr_lock);}}/* we done with that interrupt source, release it. */src->active = 0;rte_spinlock_unlock(&intr_lock);}return 0;
}

3、用户态注册中断处理函数

        调用 rte_intr_callback_register 注册中断处理函数到全局链表 intr_sources。

int
rte_intr_callback_register(const struct rte_intr_handle *intr_handle,rte_intr_callback_fn cb, void *cb_arg)
{struct rte_intr_source *src;struct rte_intr_callback *callback;//先分配内存,保存cb和cb_arg/* allocate a new interrupt callback entity */callback = rte_zmalloc("interrupt callback list", sizeof(*callback), 0);callback->cb_fn = cb;callback->cb_arg = cb_arg;//遍历intr_sources,如果没有找到fd,则需要注册一个,并通知中断处理线程将fd添加到//epoll中,等待事件通知。一个fd可以注册多个回调函数。rte_spinlock_lock(&intr_lock);/* check if there is at least one callback registered for the fd */TAILQ_FOREACH(src, &intr_sources, next) {if (src->intr_handle.fd == intr_handle->fd) {/* we had no interrupts for this */if TAILQ_EMPTY(&src->callbacks)wake_thread = 1;TAILQ_INSERT_TAIL(&(src->callbacks), callback, next);ret = 0;break;}}//如果是首次给fd添加回调函数,还需要通过管道通知中断处理线程将此fd添加到//epoll中,等待中断事件发生/* no existing callbacks for this - add new source */if (src == NULL) {rte_zmalloc("interrupt source list", sizeof(*src), 0);//将callback 插入 src->callbackssrc->intr_handle = *intr_handle;TAILQ_INIT(&src->callbacks);TAILQ_INSERT_TAIL(&(src->callbacks), callback, next);//将 src 插入全局链表 intr_sourcesTAILQ_INSERT_TAIL(&intr_sources, src, next);//设置为1wake_thread = 1;ret = 0;}rte_spinlock_unlock(&intr_lock);/*** check if need to notify the pipe fd waited by epoll_wait to* rebuild the wait list.*///如果是第一次添加此中断,需要中断处理线程将此//中断处理fd添加到epoll,等待中断事件到来。if (wake_thread)if (write(intr_pipe.writefd, "1", 1) < 0)return -EPIPE;return ret;
}

4、注册中断处理函数,并使能中断

        open /dev/uiox 注册kernel中的中断处理函数

rte_pci_probe -> rte_pci_probe_one_driver -> rte_pci_map_device -> pci_uio_map_resource-> pci_uio_alloc_resource//获取网卡的 uio_numuio_num = pci_get_uio_dev(dev, dirname, sizeof(dirname), 1);/* depending on kernel version, uio can be located in uio/uioX or uio:uioX *///到目录/sys/bus/pci/devices/'pci address'/,找到uio目录,//获取uio number(网卡绑定到igb_uio驱动后,会创建此目录)snprintf(dirname, sizeof(dirname),"%s/" PCI_PRI_FMT "/uio", rte_pci_get_sysfs_path(),loc->domain, loc->bus, loc->devid, loc->function);//打开 /dev/uiox 设备,获取fd,并保存到dev->intr_handle.fdsnprintf(devname, sizeof(devname), "/dev/uio%u", uio_num);/* save fd if in primary process *///打开 /dev/uiox,调用到kernel中的函数 uio_opendev->intr_handle.fd = open(devname, O_RDWR);//uio_open调用到最后会申请中断,并注册中断处理函数 igbuio_pci_irqhandler
uio_open -> igbuio_pci_open -> igbuio_pci_enable_interrupts//更新pci配置空间中msix capability字段,并申请中断号pci_enable_msix(udev->pdev, &msix_entry, 1)dev_dbg(&udev->pdev->dev, "using MSI-X");udev->info.irq_flags = IRQF_NO_THREAD;udev->info.irq = msix_entry.vector;udev->mode = RTE_INTR_MODE_MSIX;//注册中断处理函数request_irq(udev->info.irq, igbuio_pci_irqhandler,udev->info.irq_flags, udev->info.name, udev);pci配置空间可通过 lspci -s 0000:81:00.0 -vv 查看,
对于 MSI-X,默认为 Enable-Capabilities: [70] MSI-X: Enable- Count=129 Masked-Vector table: BAR=3 offset=00000000PBA: BAR=3 offset=00001000
经过igbuio_pci_enable_interrupts后,可看到变成了 Enable+,表示使能了pci层的中断Capabilities: [70] MSI-X: Enable+ Count=129 Masked-Vector table: BAR=3 offset=00000000PBA: BAR=3 offset=00001000

        注册用户态的中断处理函数,并使能中断。

rte_pci_probe -> rte_pci_probe_one_driver -> dr->probe(dr, dev) -> eth_ixgbe_dev_init//注册中断处理函数 ixgbe_dev_interrupt_handlerrte_intr_callback_register(intr_handle, ixgbe_dev_interrupt_handler, eth_dev);/* enable uio/vfio intr/eventfd mapping */rte_intr_enable(intr_handle);uio_intr_enable(intr_handle)const int value = 1;//write会调用到kernel中的 uio_writewrite(intr_handle->fd, &value, sizeof(value))//调用uio字符设备驱动中的 uio_write//调用igb_uio驱动中的 igbuio_pci_irqcontrolidev->info->irqcontrol(idev->info, irq_on)//清除中断 maskigbuio_mask_irq(pdev, udev->mode, irq_state);/* enable support intr */ixgbe_enable_intr(eth_dev);struct ixgbe_interrupt *intr =IXGBE_DEV_PRIVATE_TO_INTR(dev->data->dev_private);struct ixgbe_hw *hw =IXGBE_DEV_PRIVATE_TO_HW(dev->data->dev_private);//使能网卡层中断,intr->mask 指定了中断类型IXGBE_WRITE_REG(hw, IXGBE_EIMS, intr->mask);IXGBE_WRITE_FLUSH(hw);

5、发生中断时的处理

        前面调用rte_intr_callback_register时,已经将 /dev/uiox 的 fd 添加到了 epoll 监听队列,在kernel中会调用uio_poll 等待事件到来。

static unsigned int uio_poll(struct file *filep, poll_table *wait)
{struct uio_listener *listener = filep->private_data;struct uio_device *idev = listener->dev;if (!idev->info->irq)return -EIO;//将当前进程(调用epoll的进程,即中断处理线程)作为等待队列的一个元素添加到fd的等待队列idev->wait中,//当前进程进入睡眠状态,等待事件发生后被唤醒poll_wait(filep, &idev->wait, wait);//没中断时这两个值是相等的,中断发生时会将idev->event加1,//这两个值不相等时说明有中断事件,返回POLLINif (listener->event_count != atomic_read(&idev->event))return POLLIN | POLLRDNORM;return 0;
}

        在kernel端,注册了真正的中断处理函数 igbuio_pci_irqhandler,当中断发生时,调用uio_event_notify唤醒调用epoll的的进程,即中断处理线程。

static irqreturn_t
igbuio_pci_irqhandler(int irq, void *dev_id)
{struct rte_uio_pci_dev *udev = (struct rte_uio_pci_dev *)dev_id;struct uio_info *info = &udev->info;uio_event_notify(info);struct uio_device *idev = info->uio_dev;//event加1atomic_inc(&idev->event);//唤醒等待队列上的进程,比如中断处理线程wake_up_interruptible(&idev->wait);kill_fasync(&idev->async_queue, SIGIO, POLL_IN);/* Message signal mode, no share IRQ and automasked */return IRQ_HANDLED;
}

        被唤醒的进程(即中断处理线程)再次调用 uio_poll 获取发生的事件,此时会返回 POLLIN | POLLRDNORM,然后再调用系统调用read读取数据,此时肯定是有事件的,所以不会再次堵塞等待。

static ssize_t uio_read(struct file *filep, char __user *buf, size_t count, loff_t *ppos)
{struct uio_listener *listener = filep->private_data;struct uio_device *idev = listener->dev;DECLARE_WAITQUEUE(wait, current);ssize_t retval;s32 event_count;if (!idev->info->irq)return -EIO;if (count != sizeof(s32))return -EINVAL;add_wait_queue(&idev->wait, &wait);do {set_current_state(TASK_INTERRUPTIBLE);//再次判断 idev->event 和 listener->event_count,//不相等说明有中断事件发生,将count拷贝到用户态read提供的buf中,//将 idev->event 的值赋给 listener->event_count 等待下次中断到来event_count = atomic_read(&idev->event);if (event_count != listener->event_count) {if (copy_to_user(buf, &event_count, count))retval = -EFAULT;else {listener->event_count = event_count;retval = count;}break;}if (filep->f_flags & O_NONBLOCK) {retval = -EAGAIN;break;}if (signal_pending(current)) {retval = -ERESTARTSYS;break;}schedule();} while (1);__set_current_state(TASK_RUNNING);remove_wait_queue(&idev->wait, &wait);return retval;
}

6、用户态中断处理函数

        中断处理线程正确读取到事件后,会调用用户态注册的中断处理函数,比如 ixgbe_dev_interrupt_handler。

static void
ixgbe_dev_interrupt_handler(void *param)
{struct rte_eth_dev *dev = (struct rte_eth_dev *)param;//从EICR寄存器可以获知中断类型:比如mailbox,lsc(link status change), //接收/发送数据包(但是在dpdk中,收/发数据包不使用中断)ixgbe_dev_interrupt_get_status(dev);//根据中断类型进行处理ixgbe_dev_interrupt_action(dev, dev->intr_handle);
}/** It reads ICR and sets flag (IXGBE_EICR_LSC) for the link_update.** @param dev*  Pointer to struct rte_eth_dev.** @return*  - On success, zero.*  - On failure, a negative value.*/
static int
ixgbe_dev_interrupt_get_status(struct rte_eth_dev *dev)
{uint32_t eicr;//hw指向BAR0的虚拟地址,可以通过偏移访问寄存器struct ixgbe_hw *hw = IXGBE_DEV_PRIVATE_TO_HW(dev->data->dev_private);struct ixgbe_interrupt *intr =IXGBE_DEV_PRIVATE_TO_INTR(dev->data->dev_private);/* clear all cause mask */ixgbe_disable_intr(hw);//读取寄存器 IXGBE_EICR,记录了发送中断的类型/* read-on-clear nic registers here */eicr = IXGBE_READ_REG(hw, IXGBE_EICR);PMD_DRV_LOG(DEBUG, "eicr %x", eicr);intr->flags = 0;//是否发生了 LSX 事件?/* set flag for async link update */if (eicr & IXGBE_EICR_LSC)intr->flags |= IXGBE_FLAG_NEED_LINK_UPDATE;//是否发生了 mailbox 事件,mailbox 是 PF 和 VF 通信机制if (eicr & IXGBE_EICR_MAILBOX)intr->flags |= IXGBE_FLAG_MAILBOX;...return 0;
}/** It executes link_update after knowing an interrupt occurred.** @param dev*  Pointer to struct rte_eth_dev.** @return*  - On success, zero.*  - On failure, a negative value.*/
static int
ixgbe_dev_interrupt_action(struct rte_eth_dev *dev,struct rte_intr_handle *intr_handle)
{struct ixgbe_interrupt *intr =IXGBE_DEV_PRIVATE_TO_INTR(dev->data->dev_private);int64_t timeout;struct rte_eth_link link;struct ixgbe_hw *hw =IXGBE_DEV_PRIVATE_TO_HW(dev->data->dev_private);PMD_DRV_LOG(DEBUG, "intr action type %d", intr->flags);//处理 mailbox 事件if (intr->flags & IXGBE_FLAG_MAILBOX) {ixgbe_pf_mbx_process(dev);intr->flags &= ~IXGBE_FLAG_MAILBOX;}//处理 link 变化事件if (intr->flags & IXGBE_FLAG_NEED_LINK_UPDATE) {/* get the link status before link update, for predicting later */memset(&link, 0, sizeof(link));rte_ixgbe_dev_atomic_read_link_status(dev, &link);ixgbe_dev_link_update(dev, 0);/* likely to up */if (!link.link_status)/* handle it 1 sec later, wait it being stable */timeout = IXGBE_LINK_UP_CHECK_TIMEOUT;/* likely to down */else/* handle it 4 sec later, wait it being stable */timeout = IXGBE_LINK_DOWN_CHECK_TIMEOUT;ixgbe_dev_link_status_print(dev);if (rte_eal_alarm_set(timeout * 1000,ixgbe_dev_interrupt_delayed_handler, (void *)dev) < 0)PMD_DRV_LOG(ERR, "Error setting alarm");else {/* remember original mask */intr->mask_original = intr->mask;/* only disable lsc interrupt */intr->mask &= ~IXGBE_EIMS_LSC;}}PMD_DRV_LOG(DEBUG, "enable intr immediately");ixgbe_enable_intr(dev);rte_intr_enable(intr_handle);return 0;
}

这篇关于DPDK:中断处理流程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1099585

相关文章

Security OAuth2 单点登录流程

单点登录(英语:Single sign-on,缩写为 SSO),又译为单一签入,一种对于许多相互关连,但是又是各自独立的软件系统,提供访问控制的属性。当拥有这项属性时,当用户登录时,就可以获取所有系统的访问权限,不用对每个单一系统都逐一登录。这项功能通常是以轻型目录访问协议(LDAP)来实现,在服务器上会将用户信息存储到LDAP数据库中。相同的,单一注销(single sign-off)就是指

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

无人叉车3d激光slam多房间建图定位异常处理方案-墙体画线地图切分方案

墙体画线地图切分方案 针对问题:墙体两侧特征混淆误匹配,导致建图和定位偏差,表现为过门跳变、外月台走歪等 ·解决思路:预期的根治方案IGICP需要较长时间完成上线,先使用切分地图的工程化方案,即墙体两侧切分为不同地图,在某一侧只使用该侧地图进行定位 方案思路 切分原理:切分地图基于关键帧位置,而非点云。 理论基础:光照是直线的,一帧点云必定只能照射到墙的一侧,无法同时照到两侧实践考虑:关

第10章 中断和动态时钟显示

第10章 中断和动态时钟显示 从本章开始,按照书籍的划分,第10章开始就进入保护模式(Protected Mode)部分了,感觉从这里开始难度突然就增加了。 书中介绍了为什么有中断(Interrupt)的设计,中断的几种方式:外部硬件中断、内部中断和软中断。通过中断做了一个会走的时钟和屏幕上输入字符的程序。 我自己理解中断的一些作用: 为了更好的利用处理器的性能。协同快速和慢速设备一起工作

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言

Thymeleaf:生成静态文件及异常处理java.lang.NoClassDefFoundError: ognl/PropertyAccessor

我们需要引入包: <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency><dependency><groupId>org.springframework</groupId><artifactId>sp

kubelet组件的启动流程源码分析

概述 摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。 正文 kubelet的作用 这里对kubelet的作用做一个简单总结。 节点管理 节点的注册 节点状态更新 容器管理(pod生命周期管理) 监听apiserver的容器事件 容器的创建、删除(CRI) 容器的网络的创建与删除

jenkins 插件执行shell命令时,提示“Command not found”处理方法

首先提示找不到“Command not found,可能我们第一反应是查看目标机器是否已支持该命令,不过如果相信能找到这里来的朋友估计遇到的跟我一样,其实目标机器是没有问题的通过一些远程工具执行shell命令是可以执行。奇怪的就是通过jenkinsSSH插件无法执行,经一番折腾各种搜索发现是jenkins没有加载/etc/profile导致。 【解决办法】: 需要在jenkins调用shell脚

火语言RPA流程组件介绍--浏览网页

🚩【组件功能】:浏览器打开指定网址或本地html文件 配置预览 配置说明 网址URL 支持T或# 默认FLOW输入项 输入需要打开的网址URL 超时时间 支持T或# 打开网页超时时间 执行后后等待时间(ms) 支持T或# 当前组件执行完成后继续等待的时间 UserAgent 支持T或# User Agent中文名为用户代理,简称 UA,它是一个特殊字符串头,使得服务器

明明的随机数处理问题分析与解决方案

明明的随机数处理问题分析与解决方案 引言问题描述解决方案数据结构设计具体步骤伪代码C语言实现详细解释读取输入去重操作排序操作输出结果复杂度分析 引言 明明生成了N个1到500之间的随机整数,我们需要对这些整数进行处理,删去重复的数字,然后进行排序并输出结果。本文将详细讲解如何通过算法、数据结构以及C语言来解决这个问题。我们将会使用数组和哈希表来实现去重操作,再利用排序算法对结果