DPDK系列之四十一数据收发整体流程源码分析之二流程框架

本文主要是介绍DPDK系列之四十一数据收发整体流程源码分析之二流程框架,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、基本说明

本篇接着分析流程的源码,在收发数据前的基础准备。包括端口的初始化、配置等,当一切基础的准备完成后,工作状态也已经就绪,则可以开始工作了。端口的处理是启动程序前的一个开始环节。
在这个示例中,主要是做一对一的数据转发,所以需要处理端口的配置是否成对、为相关端口分配接收队列和发送队列。下面就是数据的收发流程。下面分析一下代码。

二、源码分析

调用的数据结构和相关函数源码在上半部分,port_init函数在其后:

/*** A structure used to configure an Ethernet port.* Depending upon the RX multi-queue mode, extra advanced* configuration settings may be needed.*/
struct rte_eth_conf {uint32_t link_speeds; /**< bitmap of ETH_LINK_SPEED_XXX of speeds to beused. ETH_LINK_SPEED_FIXED disables linkautonegotiation, and a unique speed shall beset. Otherwise, the bitmap defines the set ofspeeds to be advertised. If the special valueETH_LINK_SPEED_AUTONEG (0) is used, all speedssupported are advertised. */struct rte_eth_rxmode rxmode; /**< Port RX configuration. */struct rte_eth_txmode txmode; /**< Port TX configuration. */uint32_t lpbk_mode; /**< Loopback operation mode. By default the valueis 0, meaning the loopback mode is disabled.Read the datasheet of given ethernet controllerfor details. The possible values of this fieldare defined in implementation of each driver. */struct {struct rte_eth_rss_conf rss_conf; /**< Port RSS configuration */struct rte_eth_vmdq_dcb_conf vmdq_dcb_conf;/**< Port vmdq+dcb configuration. */struct rte_eth_dcb_rx_conf dcb_rx_conf;/**< Port dcb RX configuration. */struct rte_eth_vmdq_rx_conf vmdq_rx_conf;/**< Port vmdq RX configuration. */} rx_adv_conf; /**< Port RX filtering configuration. */union {struct rte_eth_vmdq_dcb_tx_conf vmdq_dcb_tx_conf;/**< Port vmdq+dcb TX configuration. */struct rte_eth_dcb_tx_conf dcb_tx_conf;/**< Port dcb TX configuration. */struct rte_eth_vmdq_tx_conf vmdq_tx_conf;/**< Port vmdq TX configuration. */} tx_adv_conf; /**< Port TX DCB configuration (union). *//** Currently,Priority Flow Control(PFC) are supported,if DCB with PFCis needed,and the variable must be set ETH_DCB_PFC_SUPPORT. */uint32_t dcb_capability_en;struct rte_fdir_conf fdir_conf; /**< FDIR configuration. DEPRECATED */struct rte_intr_conf intr_conf; /**< Interrupt mode configuration. */
};
int
rte_eth_dev_is_valid_port(uint16_t port_id)
{if (port_id >= RTE_MAX_ETHPORTS ||(rte_eth_devices[port_id].state == RTE_ETH_DEV_UNUSED))return 0;elsereturn 1;
}
int
rte_eth_dev_info_get(uint16_t port_id, struct rte_eth_dev_info *dev_info)
{struct rte_eth_dev *dev;const struct rte_eth_desc_lim lim = {.nb_max = UINT16_MAX,.nb_min = 0,.nb_align = 1,.nb_seg_max = UINT16_MAX,.nb_mtu_seg_max = UINT16_MAX,};int diag;/** Init dev_info before port_id check since caller does not have* return status and does not know if get is successful or not.*/memset(dev_info, 0, sizeof(struct rte_eth_dev_info));dev_info->switch_info.domain_id = RTE_ETH_DEV_SWITCH_DOMAIN_ID_INVALID;RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);dev = &rte_eth_devices[port_id];dev_info->rx_desc_lim = lim;dev_info->tx_desc_lim = lim;dev_info->device = dev->device;dev_info->min_mtu = RTE_ETHER_MIN_MTU;dev_info->max_mtu = UINT16_MAX;RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_infos_get, -ENOTSUP);diag = (*dev->dev_ops->dev_infos_get)(dev, dev_info);if (diag != 0) {/* Cleanup already filled in device information */memset(dev_info, 0, sizeof(struct rte_eth_dev_info));return eth_err(port_id, diag);}/* Maximum number of queues should be <= RTE_MAX_QUEUES_PER_PORT */dev_info->max_rx_queues = RTE_MIN(dev_info->max_rx_queues,RTE_MAX_QUEUES_PER_PORT);dev_info->max_tx_queues = RTE_MIN(dev_info->max_tx_queues,RTE_MAX_QUEUES_PER_PORT);dev_info->driver_name = dev->device->driver->name;dev_info->nb_rx_queues = dev->data->nb_rx_queues;dev_info->nb_tx_queues = dev->data->nb_tx_queues;dev_info->dev_flags = &dev->data->dev_flags;return 0;
}
//=======================================================================================================================
/*调用代码及数据结构体在上方*/
//=======================================================================================================================
/* basicfwd.c: Basic DPDK skeleton forwarding example. *//** Initializes a given port using global settings and with the RX buffers* coming from the mbuf_pool passed as a parameter.*/
static inline int
port_init(uint16_t port, struct rte_mempool *mbuf_pool)
{//使用默认配置struct rte_eth_conf port_conf = port_conf_default;//一个默认的数据结构const uint16_t rx_rings = 1, tx_rings = 1;//收发环形队列数量uint16_t nb_rxd = RX_RING_SIZE;uint16_t nb_txd = TX_RING_SIZE;int retval;uint16_t q;struct rte_eth_dev_info dev_info;struct rte_eth_txconf txconf;//端口是否可用if (!rte_eth_dev_is_valid_port(port))return -1;//由端口获得设备信息retval = rte_eth_dev_info_get(port, &dev_info);if (retval != 0) {printf("Error during getting device (port %u) info: %s\n",port, strerror(-retval));return retval;}if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)port_conf.txmode.offloads |=DEV_TX_OFFLOAD_MBUF_FAST_FREE;/* Configure the Ethernet device. */retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);if (retval != 0)return retval;//对收发描述符数量进行合格检查retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd);if (retval != 0)return retval;/* Allocate and set up 1 RX queue per Ethernet port. *///rte_eth_dev_socket_id:如果为NUMA架构则使用这个函数得到port对应的以太网设备ID,否则为for (q = 0; q < rx_rings; q++) {retval = rte_eth_rx_queue_setup(port, q, nb_rxd,rte_eth_dev_socket_id(port), NULL, mbuf_pool);//NULL表示使用默认设备,否则指向RX 的队列if (retval < 0)return retval;}txconf = dev_info.default_txconf;txconf.offloads = port_conf.txmode.offloads;/* Allocate and set up 1 TX queue per Ethernet port. */for (q = 0; q < tx_rings; q++) {retval = rte_eth_tx_queue_setup(port, q, nb_txd,rte_eth_dev_socket_id(port), &txconf);if (retval < 0)return retval;}/* Start the Ethernet port. *///启动设备retval = rte_eth_dev_start(port);if (retval < 0)return retval;/* Display the port MAC address. */struct rte_ether_addr addr;retval = rte_eth_macaddr_get(port, &addr);if (retval != 0)return retval;printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8" %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",port,addr.addr_bytes[0], addr.addr_bytes[1],addr.addr_bytes[2], addr.addr_bytes[3],addr.addr_bytes[4], addr.addr_bytes[5]);/* Enable RX in promiscuous mode for the Ethernet device. *///设置网卡为混杂模式,即接收所有网卡接收到的数据retval = rte_eth_promiscuous_enable(port);if (retval != 0)return retval;return 0;
}

这里很重要的一个函数是:

int
rte_eth_dev_configure(uint16_t port_id, uint16_t nb_rx_q, uint16_t nb_tx_q,const struct rte_eth_conf *dev_conf)
{struct rte_eth_dev *dev;struct rte_eth_dev_info dev_info;struct rte_eth_conf orig_conf;uint16_t overhead_len;int diag;int ret;uint16_t old_mtu;RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -EINVAL);dev = &rte_eth_devices[port_id];RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_configure, -ENOTSUP);if (dev->data->dev_started) {RTE_ETHDEV_LOG(ERR,"Port %u must be stopped to allow configuration\n",port_id);return -EBUSY;}/* Store original config, as rollback required on failure */memcpy(&orig_conf, &dev->data->dev_conf, sizeof(dev->data->dev_conf));/** Copy the dev_conf parameter into the dev structure.* rte_eth_dev_info_get() requires dev_conf, copy it before dev_info get*/if (dev_conf != &dev->data->dev_conf)memcpy(&dev->data->dev_conf, dev_conf,sizeof(dev->data->dev_conf));/* Backup mtu for rollback */old_mtu = dev->data->mtu;ret = rte_eth_dev_info_get(port_id, &dev_info);if (ret != 0)goto rollback;/* Get the real Ethernet overhead length */if (dev_info.max_mtu != UINT16_MAX &&dev_info.max_rx_pktlen > dev_info.max_mtu)overhead_len = dev_info.max_rx_pktlen - dev_info.max_mtu;elseoverhead_len = RTE_ETHER_HDR_LEN + RTE_ETHER_CRC_LEN;/* If number of queues specified by application for both Rx and Tx is* zero, use driver preferred values. This cannot be done individually* as it is valid for either Tx or Rx (but not both) to be zero.* If driver does not provide any preferred valued, fall back on* EAL defaults.*/if (nb_rx_q == 0 && nb_tx_q == 0) {nb_rx_q = dev_info.default_rxportconf.nb_queues;if (nb_rx_q == 0)nb_rx_q = RTE_ETH_DEV_FALLBACK_RX_NBQUEUES;nb_tx_q = dev_info.default_txportconf.nb_queues;if (nb_tx_q == 0)nb_tx_q = RTE_ETH_DEV_FALLBACK_TX_NBQUEUES;}if (nb_rx_q > RTE_MAX_QUEUES_PER_PORT) {RTE_ETHDEV_LOG(ERR,"Number of RX queues requested (%u) is greater than max supported(%d)\n",nb_rx_q, RTE_MAX_QUEUES_PER_PORT);ret = -EINVAL;goto rollback;}if (nb_tx_q > RTE_MAX_QUEUES_PER_PORT) {RTE_ETHDEV_LOG(ERR,"Number of TX queues requested (%u) is greater than max supported(%d)\n",nb_tx_q, RTE_MAX_QUEUES_PER_PORT);ret = -EINVAL;goto rollback;}/** Check that the numbers of RX and TX queues are not greater* than the maximum number of RX and TX queues supported by the* configured device.*/if (nb_rx_q > dev_info.max_rx_queues) {RTE_ETHDEV_LOG(ERR, "Ethdev port_id=%u nb_rx_queues=%u > %u\n",port_id, nb_rx_q, dev_info.max_rx_queues);ret = -EINVAL;goto rollback;}if (nb_tx_q > dev_info.max_tx_queues) {RTE_ETHDEV_LOG(ERR, "Ethdev port_id=%u nb_tx_queues=%u > %u\n",port_id, nb_tx_q, dev_info.max_tx_queues);ret = -EINVAL;goto rollback;}/* Check that the device supports requested interrupts */if ((dev_conf->intr_conf.lsc == 1) &&(!(dev->data->dev_flags & RTE_ETH_DEV_INTR_LSC))) {RTE_ETHDEV_LOG(ERR, "Driver %s does not support lsc\n",dev->device->driver->name);ret = -EINVAL;goto rollback;}if ((dev_conf->intr_conf.rmv == 1) &&(!(dev->data->dev_flags & RTE_ETH_DEV_INTR_RMV))) {RTE_ETHDEV_LOG(ERR, "Driver %s does not support rmv\n",dev->device->driver->name);ret = -EINVAL;goto rollback;}/** If jumbo frames are enabled, check that the maximum RX packet* length is supported by the configured device.*/if (dev_conf->rxmode.offloads & DEV_RX_OFFLOAD_JUMBO_FRAME) {if (dev_conf->rxmode.max_rx_pkt_len > dev_info.max_rx_pktlen) {RTE_ETHDEV_LOG(ERR,"Ethdev port_id=%u max_rx_pkt_len %u > max valid value %u\n",port_id, dev_conf->rxmode.max_rx_pkt_len,dev_info.max_rx_pktlen);ret = -EINVAL;goto rollback;} else if (dev_conf->rxmode.max_rx_pkt_len < RTE_ETHER_MIN_LEN) {RTE_ETHDEV_LOG(ERR,"Ethdev port_id=%u max_rx_pkt_len %u < min valid value %u\n",port_id, dev_conf->rxmode.max_rx_pkt_len,(unsigned int)RTE_ETHER_MIN_LEN);ret = -EINVAL;goto rollback;}/* Scale the MTU size to adapt max_rx_pkt_len */dev->data->mtu = dev->data->dev_conf.rxmode.max_rx_pkt_len -overhead_len;} else {uint16_t pktlen = dev_conf->rxmode.max_rx_pkt_len;if (pktlen < RTE_ETHER_MIN_MTU + overhead_len ||pktlen > RTE_ETHER_MTU + overhead_len)/* Use default value */dev->data->dev_conf.rxmode.max_rx_pkt_len =RTE_ETHER_MTU + overhead_len;}/** If LRO is enabled, check that the maximum aggregated packet* size is supported by the configured device.*/if (dev_conf->rxmode.offloads & DEV_RX_OFFLOAD_TCP_LRO) {if (dev_conf->rxmode.max_lro_pkt_size == 0)dev->data->dev_conf.rxmode.max_lro_pkt_size =dev->data->dev_conf.rxmode.max_rx_pkt_len;ret = check_lro_pkt_size(port_id,dev->data->dev_conf.rxmode.max_lro_pkt_size,dev->data->dev_conf.rxmode.max_rx_pkt_len,dev_info.max_lro_pkt_size);if (ret != 0)goto rollback;}/* Any requested offloading must be within its device capabilities */if ((dev_conf->rxmode.offloads & dev_info.rx_offload_capa) !=dev_conf->rxmode.offloads) {RTE_ETHDEV_LOG(ERR,"Ethdev port_id=%u requested Rx offloads 0x%"PRIx64" doesn't match Rx offloads ""capabilities 0x%"PRIx64" in %s()\n",port_id, dev_conf->rxmode.offloads,dev_info.rx_offload_capa,__func__);ret = -EINVAL;goto rollback;}if ((dev_conf->txmode.offloads & dev_info.tx_offload_capa) !=dev_conf->txmode.offloads) {RTE_ETHDEV_LOG(ERR,"Ethdev port_id=%u requested Tx offloads 0x%"PRIx64" doesn't match Tx offloads ""capabilities 0x%"PRIx64" in %s()\n",port_id, dev_conf->txmode.offloads,dev_info.tx_offload_capa,__func__);ret = -EINVAL;goto rollback;}dev->data->dev_conf.rx_adv_conf.rss_conf.rss_hf =rte_eth_rss_hf_refine(dev_conf->rx_adv_conf.rss_conf.rss_hf);/* Check that device supports requested rss hash functions. */if ((dev_info.flow_type_rss_offloads |dev_conf->rx_adv_conf.rss_conf.rss_hf) !=dev_info.flow_type_rss_offloads) {RTE_ETHDEV_LOG(ERR,"Ethdev port_id=%u invalid rss_hf: 0x%"PRIx64", valid value: 0x%"PRIx64"\n",port_id, dev_conf->rx_adv_conf.rss_conf.rss_hf,dev_info.flow_type_rss_offloads);ret = -EINVAL;goto rollback;}/* Check if Rx RSS distribution is disabled but RSS hash is enabled. */if (((dev_conf->rxmode.mq_mode & ETH_MQ_RX_RSS_FLAG) == 0) &&(dev_conf->rxmode.offloads & DEV_RX_OFFLOAD_RSS_HASH)) {RTE_ETHDEV_LOG(ERR,"Ethdev port_id=%u config invalid Rx mq_mode without RSS but %s offload is requested\n",port_id,rte_eth_dev_rx_offload_name(DEV_RX_OFFLOAD_RSS_HASH));ret = -EINVAL;goto rollback;}/** Setup new number of RX/TX queues and reconfigure device.*/diag = rte_eth_dev_rx_queue_config(dev, nb_rx_q);if (diag != 0) {RTE_ETHDEV_LOG(ERR,"Port%u rte_eth_dev_rx_queue_config = %d\n",port_id, diag);ret = diag;goto rollback;}diag = rte_eth_dev_tx_queue_config(dev, nb_tx_q);if (diag != 0) {RTE_ETHDEV_LOG(ERR,"Port%u rte_eth_dev_tx_queue_config = %d\n",port_id, diag);rte_eth_dev_rx_queue_config(dev, 0);ret = diag;goto rollback;}diag = (*dev->dev_ops->dev_configure)(dev);if (diag != 0) {RTE_ETHDEV_LOG(ERR, "Port%u dev_configure = %d\n",port_id, diag);ret = eth_err(port_id, diag);goto reset_queues;}/* Initialize Rx profiling if enabled at compilation time. */diag = __rte_eth_dev_profile_init(port_id, dev);if (diag != 0) {RTE_ETHDEV_LOG(ERR, "Port%u __rte_eth_dev_profile_init = %d\n",port_id, diag);ret = eth_err(port_id, diag);goto reset_queues;}/* Validate Rx offloads. */diag = validate_offloads(port_id,dev_conf->rxmode.offloads,dev->data->dev_conf.rxmode.offloads, "Rx",rte_eth_dev_rx_offload_name);if (diag != 0) {ret = diag;goto reset_queues;}/* Validate Tx offloads. */diag = validate_offloads(port_id,dev_conf->txmode.offloads,dev->data->dev_conf.txmode.offloads, "Tx",rte_eth_dev_tx_offload_name);if (diag != 0) {ret = diag;goto reset_queues;}return 0;
reset_queues:rte_eth_dev_rx_queue_config(dev, 0);rte_eth_dev_tx_queue_config(dev, 0);
rollback:memcpy(&dev->data->dev_conf, &orig_conf, sizeof(dev->data->dev_conf));if (old_mtu != dev->data->mtu)dev->data->mtu = old_mtu;return ret;
}

最后是启动设备:

int
rte_eth_dev_start(uint16_t port_id)
{struct rte_eth_dev *dev;struct rte_eth_dev_info dev_info;int diag;int ret;RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -EINVAL);dev = &rte_eth_devices[port_id];RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_start, -ENOTSUP);if (dev->data->dev_started != 0) {RTE_ETHDEV_LOG(INFO,"Device with port_id=%"PRIu16" already started\n",port_id);return 0;}ret = rte_eth_dev_info_get(port_id, &dev_info);if (ret != 0)return ret;/* Lets restore MAC now if device does not support live change */if (*dev_info.dev_flags & RTE_ETH_DEV_NOLIVE_MAC_ADDR)rte_eth_dev_mac_restore(dev, &dev_info);diag = (*dev->dev_ops->dev_start)(dev);if (diag == 0)dev->data->dev_started = 1;elsereturn eth_err(port_id, diag);ret = rte_eth_dev_config_restore(dev, &dev_info, port_id);if (ret != 0) {RTE_ETHDEV_LOG(ERR,"Error during restoring configuration for device (port %u): %s\n",port_id, rte_strerror(-ret));rte_eth_dev_stop(port_id);return ret;}if (dev->data->dev_conf.intr_conf.lsc == 0) {RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->link_update, -ENOTSUP);(*dev->dev_ops->link_update)(dev, 0);}return 0;
}

在一切准备就绪后就可以进行收发数据了:

/* Run until the application is quit or killed. */
for (;;) {/** Receive packets on a port and forward them on the paired* port. The mapping is 0 -> 1, 1 -> 0, 2 -> 3, 3 -> 2, etc.*/RTE_ETH_FOREACH_DEV(port) {/* Get burst of RX packets, from first port of pair. */struct rte_mbuf *bufs[BURST_SIZE];const uint16_t nb_rx = rte_eth_rx_burst(port, 0,bufs, BURST_SIZE);if (unlikely(nb_rx == 0))continue;/* Send burst of TX packets, to second port of pair. */const uint16_t nb_tx = rte_eth_tx_burst(port ^ 1, 0,bufs, nb_rx);/* Free any unsent packets. */if (unlikely(nb_tx < nb_rx)) {uint16_t buf;for (buf = nb_tx; buf < nb_rx; buf++)rte_pktmbuf_free(bufs[buf]);}}
}

最主要的当然是两个burst函数,先看接收rte_eth_rx_burst:

/**** Retrieve a burst of input packets from a receive queue of an Ethernet* device. The retrieved packets are stored in *rte_mbuf* structures whose* pointers are supplied in the *rx_pkts* array.** The rte_eth_rx_burst() function loops, parsing the RX ring of the* receive queue, up to *nb_pkts* packets, and for each completed RX* descriptor in the ring, it performs the following operations:** - Initialize the *rte_mbuf* data structure associated with the*   RX descriptor according to the information provided by the NIC into*   that RX descriptor.** - Store the *rte_mbuf* data structure into the next entry of the*   *rx_pkts* array.** - Replenish the RX descriptor with a new *rte_mbuf* buffer*   allocated from the memory pool associated with the receive queue at*   initialization time.** When retrieving an input packet that was scattered by the controller* into multiple receive descriptors, the rte_eth_rx_burst() function* appends the associated *rte_mbuf* buffers to the first buffer of the* packet.** The rte_eth_rx_burst() function returns the number of packets* actually retrieved, which is the number of *rte_mbuf* data structures* effectively supplied into the *rx_pkts* array.* A return value equal to *nb_pkts* indicates that the RX queue contained* at least *rx_pkts* packets, and this is likely to signify that other* received packets remain in the input queue. Applications implementing* a "retrieve as much received packets as possible" policy can check this* specific case and keep invoking the rte_eth_rx_burst() function until* a value less than *nb_pkts* is returned.** This receive method has the following advantages:** - It allows a run-to-completion network stack engine to retrieve and*   to immediately process received packets in a fast burst-oriented*   approach, avoiding the overhead of unnecessary intermediate packet*   queue/dequeue operations.** - Conversely, it also allows an asynchronous-oriented processing*   method to retrieve bursts of received packets and to immediately*   queue them for further parallel processing by another logical core,*   for instance. However, instead of having received packets being*   individually queued by the driver, this approach allows the caller*   of the rte_eth_rx_burst() function to queue a burst of retrieved*   packets at a time and therefore dramatically reduce the cost of*   enqueue/dequeue operations per packet.** - It allows the rte_eth_rx_burst() function of the driver to take*   advantage of burst-oriented hardware features (CPU cache,*   prefetch instructions, and so on) to minimize the number of CPU*   cycles per packet.** To summarize, the proposed receive API enables many* burst-oriented optimizations in both synchronous and asynchronous* packet processing environments with no overhead in both cases.** The rte_eth_rx_burst() function does not provide any error* notification to avoid the corresponding overhead. As a hint, the* upper-level application might check the status of the device link once* being systematically returned a 0 value for a given number of tries.** @param port_id*   The port identifier of the Ethernet device.* @param queue_id*   The index of the receive queue from which to retrieve input packets.*   The value must be in the range [0, nb_rx_queue - 1] previously supplied*   to rte_eth_dev_configure().* @param rx_pkts*   The address of an array of pointers to *rte_mbuf* structures that*   must be large enough to store *nb_pkts* pointers in it.* @param nb_pkts*   The maximum number of packets to retrieve.* @return*   The number of packets actually retrieved, which is the number*   of pointers to *rte_mbuf* structures effectively supplied to the*   *rx_pkts* array.*/
static inline uint16_t
rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,struct rte_mbuf **rx_pkts, const uint16_t nb_pkts)
{struct rte_eth_dev *dev = &rte_eth_devices[port_id];uint16_t nb_rx;#ifdef RTE_LIBRTE_ETHDEV_DEBUGRTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);RTE_FUNC_PTR_OR_ERR_RET(*dev->rx_pkt_burst, 0);if (queue_id >= dev->data->nb_rx_queues) {RTE_ETHDEV_LOG(ERR, "Invalid RX queue_id=%u\n", queue_id);return 0;}
#endifnb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],rx_pkts, nb_pkts);#ifdef RTE_ETHDEV_RXTX_CALLBACKSstruct rte_eth_rxtx_callback *cb;/* __ATOMIC_RELEASE memory order was used when the* call back was inserted into the list.* Since there is a clear dependency between loading* cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is* not required.*/cb = __atomic_load_n(&dev->post_rx_burst_cbs[queue_id],__ATOMIC_RELAXED);if (unlikely(cb != NULL)) {do {nb_rx = cb->fn.rx(port_id, queue_id, rx_pkts, nb_rx,nb_pkts, cb->param);cb = cb->next;} while (cb != NULL);}
#endifreturn nb_rx;
}

再看一下发送rte_eth_tx_burst函数:

/**
* Send a burst of output packets on a transmit queue of an Ethernet device.
*
* The rte_eth_tx_burst() function is invoked to transmit output packets
* on the output queue *queue_id* of the Ethernet device designated by its
* *port_id*.
* The *nb_pkts* parameter is the number of packets to send which are
* supplied in the *tx_pkts* array of *rte_mbuf* structures, each of them
* allocated from a pool created with rte_pktmbuf_pool_create().
* The rte_eth_tx_burst() function loops, sending *nb_pkts* packets,
* up to the number of transmit descriptors available in the TX ring of the
* transmit queue.
* For each packet to send, the rte_eth_tx_burst() function performs
* the following operations:
*
* - Pick up the next available descriptor in the transmit ring.
*
* - Free the network buffer previously sent with that descriptor, if any.
*
* - Initialize the transmit descriptor with the information provided
*   in the *rte_mbuf data structure.
*
* In the case of a segmented packet composed of a list of *rte_mbuf* buffers,
* the rte_eth_tx_burst() function uses several transmit descriptors
* of the ring.
*
* The rte_eth_tx_burst() function returns the number of packets it
* actually sent. A return value equal to *nb_pkts* means that all packets
* have been sent, and this is likely to signify that other output packets
* could be immediately transmitted again. Applications that implement a
* "send as many packets to transmit as possible" policy can check this
* specific case and keep invoking the rte_eth_tx_burst() function until
* a value less than *nb_pkts* is returned.
*
* It is the responsibility of the rte_eth_tx_burst() function to
* transparently free the memory buffers of packets previously sent.
* This feature is driven by the *tx_free_thresh* value supplied to the
* rte_eth_dev_configure() function at device configuration time.
* When the number of free TX descriptors drops below this threshold, the
* rte_eth_tx_burst() function must [attempt to] free the *rte_mbuf*  buffers
* of those packets whose transmission was effectively completed.
*
* If the PMD is DEV_TX_OFFLOAD_MT_LOCKFREE capable, multiple threads can
* invoke this function concurrently on the same tx queue without SW lock.
* @see rte_eth_dev_info_get, struct rte_eth_txconf::offloads
*
* @see rte_eth_tx_prepare to perform some prior checks or adjustments
* for offloads.
*
* @param port_id
*   The port identifier of the Ethernet device.
* @param queue_id
*   The index of the transmit queue through which output packets must be
*   sent.
*   The value must be in the range [0, nb_tx_queue - 1] previously supplied
*   to rte_eth_dev_configure().
* @param tx_pkts
*   The address of an array of *nb_pkts* pointers to *rte_mbuf* structures
*   which contain the output packets.
* @param nb_pkts
*   The maximum number of packets to transmit.
* @return
*   The number of output packets actually stored in transmit descriptors of
*   the transmit ring. The return value can be less than the value of the
*   *tx_pkts* parameter when the transmit ring is full or has been filled up.
*/
static inline uint16_t
rte_eth_tx_burst(uint16_t port_id, uint16_t queue_id,struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
{struct rte_eth_dev *dev = &rte_eth_devices[port_id];#ifdef RTE_LIBRTE_ETHDEV_DEBUGRTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);RTE_FUNC_PTR_OR_ERR_RET(*dev->tx_pkt_burst, 0);if (queue_id >= dev->data->nb_tx_queues) {RTE_ETHDEV_LOG(ERR, "Invalid TX queue_id=%u\n", queue_id);return 0;}
#endif#ifdef RTE_ETHDEV_RXTX_CALLBACKSstruct rte_eth_rxtx_callback *cb;/* __ATOMIC_RELEASE memory order was used when the* call back was inserted into the list.* Since there is a clear dependency between loading* cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is* not required.*/cb = __atomic_load_n(&dev->pre_tx_burst_cbs[queue_id],__ATOMIC_RELAXED);if (unlikely(cb != NULL)) {do {nb_pkts = cb->fn.tx(port_id, queue_id, tx_pkts, nb_pkts,cb->param);cb = cb->next;} while (cb != NULL);}
#endifreturn (*dev->tx_pkt_burst)(dev->data->tx_queues[queue_id], tx_pkts, nb_pkts);
}

这两块具体的数据收发流程会在后面详细分析。

三、汇总流程

通过上面的代码分析可以发现,整个DPDK中对数据的收发需要进行下面基本的流程:
1、环境初始化,即rte_eal_init函数对一系列的参数和环境进行处理,比如内存的管理等
2、网络设备的配置,特别是相关收发队列的内存配置,即rte_eth_dev_configure调用
3、具体的收发队列的分配和管理,即rte_eth_rx_queue_setup和rte_eth_tx_queue_setup函数的调用
4、在一系列的准备和配备及相关安全检测通过后,启动网卡设备,将相关数据结构和内存等与网卡关联,即rte_eth_dev_start函数
5、在循环中处理收发数据,即rte_eth_rx_burst和rte_eth_tx_burst函数,当然这个程序里处理报文相当简单
6、处理程序结束后的环境资源回收即rte_eal_cleanup函数

四、总结

不同的版本的DPDK可能源码会有些不同,请注意版本即可。重点是把DPDK的相关应用熟悉,在此基础上才能更好的对DPDK的框架有所了解。由应用到内部的实现,可能会更好的理解DPDK的设计理念和相关技术特点。

这篇关于DPDK系列之四十一数据收发整体流程源码分析之二流程框架的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/720584

相关文章

Security OAuth2 单点登录流程

单点登录(英语:Single sign-on,缩写为 SSO),又译为单一签入,一种对于许多相互关连,但是又是各自独立的软件系统,提供访问控制的属性。当拥有这项属性时,当用户登录时,就可以获取所有系统的访问权限,不用对每个单一系统都逐一登录。这项功能通常是以轻型目录访问协议(LDAP)来实现,在服务器上会将用户信息存储到LDAP数据库中。相同的,单一注销(single sign-off)就是指

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

mybatis的整体架构

mybatis的整体架构分为三层: 1.基础支持层 该层包括:数据源模块、事务管理模块、缓存模块、Binding模块、反射模块、类型转换模块、日志模块、资源加载模块、解析器模块 2.核心处理层 该层包括:配置解析、参数映射、SQL解析、SQL执行、结果集映射、插件 3.接口层 该层包括:SqlSession 基础支持层 该层保护mybatis的基础模块,它们为核心处理层提供了良好的支撑。

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd