本文主要是介绍DPDK系列之四十一数据收发整体流程源码分析之二流程框架,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、基本说明
本篇接着分析流程的源码,在收发数据前的基础准备。包括端口的初始化、配置等,当一切基础的准备完成后,工作状态也已经就绪,则可以开始工作了。端口的处理是启动程序前的一个开始环节。
在这个示例中,主要是做一对一的数据转发,所以需要处理端口的配置是否成对、为相关端口分配接收队列和发送队列。下面就是数据的收发流程。下面分析一下代码。
二、源码分析
调用的数据结构和相关函数源码在上半部分,port_init函数在其后:
/*** A structure used to configure an Ethernet port.* Depending upon the RX multi-queue mode, extra advanced* configuration settings may be needed.*/
struct rte_eth_conf {uint32_t link_speeds; /**< bitmap of ETH_LINK_SPEED_XXX of speeds to beused. ETH_LINK_SPEED_FIXED disables linkautonegotiation, and a unique speed shall beset. Otherwise, the bitmap defines the set ofspeeds to be advertised. If the special valueETH_LINK_SPEED_AUTONEG (0) is used, all speedssupported are advertised. */struct rte_eth_rxmode rxmode; /**< Port RX configuration. */struct rte_eth_txmode txmode; /**< Port TX configuration. */uint32_t lpbk_mode; /**< Loopback operation mode. By default the valueis 0, meaning the loopback mode is disabled.Read the datasheet of given ethernet controllerfor details. The possible values of this fieldare defined in implementation of each driver. */struct {struct rte_eth_rss_conf rss_conf; /**< Port RSS configuration */struct rte_eth_vmdq_dcb_conf vmdq_dcb_conf;/**< Port vmdq+dcb configuration. */struct rte_eth_dcb_rx_conf dcb_rx_conf;/**< Port dcb RX configuration. */struct rte_eth_vmdq_rx_conf vmdq_rx_conf;/**< Port vmdq RX configuration. */} rx_adv_conf; /**< Port RX filtering configuration. */union {struct rte_eth_vmdq_dcb_tx_conf vmdq_dcb_tx_conf;/**< Port vmdq+dcb TX configuration. */struct rte_eth_dcb_tx_conf dcb_tx_conf;/**< Port dcb TX configuration. */struct rte_eth_vmdq_tx_conf vmdq_tx_conf;/**< Port vmdq TX configuration. */} tx_adv_conf; /**< Port TX DCB configuration (union). *//** Currently,Priority Flow Control(PFC) are supported,if DCB with PFCis needed,and the variable must be set ETH_DCB_PFC_SUPPORT. */uint32_t dcb_capability_en;struct rte_fdir_conf fdir_conf; /**< FDIR configuration. DEPRECATED */struct rte_intr_conf intr_conf; /**< Interrupt mode configuration. */
};
int
rte_eth_dev_is_valid_port(uint16_t port_id)
{if (port_id >= RTE_MAX_ETHPORTS ||(rte_eth_devices[port_id].state == RTE_ETH_DEV_UNUSED))return 0;elsereturn 1;
}
int
rte_eth_dev_info_get(uint16_t port_id, struct rte_eth_dev_info *dev_info)
{struct rte_eth_dev *dev;const struct rte_eth_desc_lim lim = {.nb_max = UINT16_MAX,.nb_min = 0,.nb_align = 1,.nb_seg_max = UINT16_MAX,.nb_mtu_seg_max = UINT16_MAX,};int diag;/** Init dev_info before port_id check since caller does not have* return status and does not know if get is successful or not.*/memset(dev_info, 0, sizeof(struct rte_eth_dev_info));dev_info->switch_info.domain_id = RTE_ETH_DEV_SWITCH_DOMAIN_ID_INVALID;RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -ENODEV);dev = &rte_eth_devices[port_id];dev_info->rx_desc_lim = lim;dev_info->tx_desc_lim = lim;dev_info->device = dev->device;dev_info->min_mtu = RTE_ETHER_MIN_MTU;dev_info->max_mtu = UINT16_MAX;RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_infos_get, -ENOTSUP);diag = (*dev->dev_ops->dev_infos_get)(dev, dev_info);if (diag != 0) {/* Cleanup already filled in device information */memset(dev_info, 0, sizeof(struct rte_eth_dev_info));return eth_err(port_id, diag);}/* Maximum number of queues should be <= RTE_MAX_QUEUES_PER_PORT */dev_info->max_rx_queues = RTE_MIN(dev_info->max_rx_queues,RTE_MAX_QUEUES_PER_PORT);dev_info->max_tx_queues = RTE_MIN(dev_info->max_tx_queues,RTE_MAX_QUEUES_PER_PORT);dev_info->driver_name = dev->device->driver->name;dev_info->nb_rx_queues = dev->data->nb_rx_queues;dev_info->nb_tx_queues = dev->data->nb_tx_queues;dev_info->dev_flags = &dev->data->dev_flags;return 0;
}
//=======================================================================================================================
/*调用代码及数据结构体在上方*/
//=======================================================================================================================
/* basicfwd.c: Basic DPDK skeleton forwarding example. *//** Initializes a given port using global settings and with the RX buffers* coming from the mbuf_pool passed as a parameter.*/
static inline int
port_init(uint16_t port, struct rte_mempool *mbuf_pool)
{//使用默认配置struct rte_eth_conf port_conf = port_conf_default;//一个默认的数据结构const uint16_t rx_rings = 1, tx_rings = 1;//收发环形队列数量uint16_t nb_rxd = RX_RING_SIZE;uint16_t nb_txd = TX_RING_SIZE;int retval;uint16_t q;struct rte_eth_dev_info dev_info;struct rte_eth_txconf txconf;//端口是否可用if (!rte_eth_dev_is_valid_port(port))return -1;//由端口获得设备信息retval = rte_eth_dev_info_get(port, &dev_info);if (retval != 0) {printf("Error during getting device (port %u) info: %s\n",port, strerror(-retval));return retval;}if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)port_conf.txmode.offloads |=DEV_TX_OFFLOAD_MBUF_FAST_FREE;/* Configure the Ethernet device. */retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);if (retval != 0)return retval;//对收发描述符数量进行合格检查retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd);if (retval != 0)return retval;/* Allocate and set up 1 RX queue per Ethernet port. *///rte_eth_dev_socket_id:如果为NUMA架构则使用这个函数得到port对应的以太网设备ID,否则为for (q = 0; q < rx_rings; q++) {retval = rte_eth_rx_queue_setup(port, q, nb_rxd,rte_eth_dev_socket_id(port), NULL, mbuf_pool);//NULL表示使用默认设备,否则指向RX 的队列if (retval < 0)return retval;}txconf = dev_info.default_txconf;txconf.offloads = port_conf.txmode.offloads;/* Allocate and set up 1 TX queue per Ethernet port. */for (q = 0; q < tx_rings; q++) {retval = rte_eth_tx_queue_setup(port, q, nb_txd,rte_eth_dev_socket_id(port), &txconf);if (retval < 0)return retval;}/* Start the Ethernet port. *///启动设备retval = rte_eth_dev_start(port);if (retval < 0)return retval;/* Display the port MAC address. */struct rte_ether_addr addr;retval = rte_eth_macaddr_get(port, &addr);if (retval != 0)return retval;printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8" %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",port,addr.addr_bytes[0], addr.addr_bytes[1],addr.addr_bytes[2], addr.addr_bytes[3],addr.addr_bytes[4], addr.addr_bytes[5]);/* Enable RX in promiscuous mode for the Ethernet device. *///设置网卡为混杂模式,即接收所有网卡接收到的数据retval = rte_eth_promiscuous_enable(port);if (retval != 0)return retval;return 0;
}
这里很重要的一个函数是:
int
rte_eth_dev_configure(uint16_t port_id, uint16_t nb_rx_q, uint16_t nb_tx_q,const struct rte_eth_conf *dev_conf)
{struct rte_eth_dev *dev;struct rte_eth_dev_info dev_info;struct rte_eth_conf orig_conf;uint16_t overhead_len;int diag;int ret;uint16_t old_mtu;RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -EINVAL);dev = &rte_eth_devices[port_id];RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_configure, -ENOTSUP);if (dev->data->dev_started) {RTE_ETHDEV_LOG(ERR,"Port %u must be stopped to allow configuration\n",port_id);return -EBUSY;}/* Store original config, as rollback required on failure */memcpy(&orig_conf, &dev->data->dev_conf, sizeof(dev->data->dev_conf));/** Copy the dev_conf parameter into the dev structure.* rte_eth_dev_info_get() requires dev_conf, copy it before dev_info get*/if (dev_conf != &dev->data->dev_conf)memcpy(&dev->data->dev_conf, dev_conf,sizeof(dev->data->dev_conf));/* Backup mtu for rollback */old_mtu = dev->data->mtu;ret = rte_eth_dev_info_get(port_id, &dev_info);if (ret != 0)goto rollback;/* Get the real Ethernet overhead length */if (dev_info.max_mtu != UINT16_MAX &&dev_info.max_rx_pktlen > dev_info.max_mtu)overhead_len = dev_info.max_rx_pktlen - dev_info.max_mtu;elseoverhead_len = RTE_ETHER_HDR_LEN + RTE_ETHER_CRC_LEN;/* If number of queues specified by application for both Rx and Tx is* zero, use driver preferred values. This cannot be done individually* as it is valid for either Tx or Rx (but not both) to be zero.* If driver does not provide any preferred valued, fall back on* EAL defaults.*/if (nb_rx_q == 0 && nb_tx_q == 0) {nb_rx_q = dev_info.default_rxportconf.nb_queues;if (nb_rx_q == 0)nb_rx_q = RTE_ETH_DEV_FALLBACK_RX_NBQUEUES;nb_tx_q = dev_info.default_txportconf.nb_queues;if (nb_tx_q == 0)nb_tx_q = RTE_ETH_DEV_FALLBACK_TX_NBQUEUES;}if (nb_rx_q > RTE_MAX_QUEUES_PER_PORT) {RTE_ETHDEV_LOG(ERR,"Number of RX queues requested (%u) is greater than max supported(%d)\n",nb_rx_q, RTE_MAX_QUEUES_PER_PORT);ret = -EINVAL;goto rollback;}if (nb_tx_q > RTE_MAX_QUEUES_PER_PORT) {RTE_ETHDEV_LOG(ERR,"Number of TX queues requested (%u) is greater than max supported(%d)\n",nb_tx_q, RTE_MAX_QUEUES_PER_PORT);ret = -EINVAL;goto rollback;}/** Check that the numbers of RX and TX queues are not greater* than the maximum number of RX and TX queues supported by the* configured device.*/if (nb_rx_q > dev_info.max_rx_queues) {RTE_ETHDEV_LOG(ERR, "Ethdev port_id=%u nb_rx_queues=%u > %u\n",port_id, nb_rx_q, dev_info.max_rx_queues);ret = -EINVAL;goto rollback;}if (nb_tx_q > dev_info.max_tx_queues) {RTE_ETHDEV_LOG(ERR, "Ethdev port_id=%u nb_tx_queues=%u > %u\n",port_id, nb_tx_q, dev_info.max_tx_queues);ret = -EINVAL;goto rollback;}/* Check that the device supports requested interrupts */if ((dev_conf->intr_conf.lsc == 1) &&(!(dev->data->dev_flags & RTE_ETH_DEV_INTR_LSC))) {RTE_ETHDEV_LOG(ERR, "Driver %s does not support lsc\n",dev->device->driver->name);ret = -EINVAL;goto rollback;}if ((dev_conf->intr_conf.rmv == 1) &&(!(dev->data->dev_flags & RTE_ETH_DEV_INTR_RMV))) {RTE_ETHDEV_LOG(ERR, "Driver %s does not support rmv\n",dev->device->driver->name);ret = -EINVAL;goto rollback;}/** If jumbo frames are enabled, check that the maximum RX packet* length is supported by the configured device.*/if (dev_conf->rxmode.offloads & DEV_RX_OFFLOAD_JUMBO_FRAME) {if (dev_conf->rxmode.max_rx_pkt_len > dev_info.max_rx_pktlen) {RTE_ETHDEV_LOG(ERR,"Ethdev port_id=%u max_rx_pkt_len %u > max valid value %u\n",port_id, dev_conf->rxmode.max_rx_pkt_len,dev_info.max_rx_pktlen);ret = -EINVAL;goto rollback;} else if (dev_conf->rxmode.max_rx_pkt_len < RTE_ETHER_MIN_LEN) {RTE_ETHDEV_LOG(ERR,"Ethdev port_id=%u max_rx_pkt_len %u < min valid value %u\n",port_id, dev_conf->rxmode.max_rx_pkt_len,(unsigned int)RTE_ETHER_MIN_LEN);ret = -EINVAL;goto rollback;}/* Scale the MTU size to adapt max_rx_pkt_len */dev->data->mtu = dev->data->dev_conf.rxmode.max_rx_pkt_len -overhead_len;} else {uint16_t pktlen = dev_conf->rxmode.max_rx_pkt_len;if (pktlen < RTE_ETHER_MIN_MTU + overhead_len ||pktlen > RTE_ETHER_MTU + overhead_len)/* Use default value */dev->data->dev_conf.rxmode.max_rx_pkt_len =RTE_ETHER_MTU + overhead_len;}/** If LRO is enabled, check that the maximum aggregated packet* size is supported by the configured device.*/if (dev_conf->rxmode.offloads & DEV_RX_OFFLOAD_TCP_LRO) {if (dev_conf->rxmode.max_lro_pkt_size == 0)dev->data->dev_conf.rxmode.max_lro_pkt_size =dev->data->dev_conf.rxmode.max_rx_pkt_len;ret = check_lro_pkt_size(port_id,dev->data->dev_conf.rxmode.max_lro_pkt_size,dev->data->dev_conf.rxmode.max_rx_pkt_len,dev_info.max_lro_pkt_size);if (ret != 0)goto rollback;}/* Any requested offloading must be within its device capabilities */if ((dev_conf->rxmode.offloads & dev_info.rx_offload_capa) !=dev_conf->rxmode.offloads) {RTE_ETHDEV_LOG(ERR,"Ethdev port_id=%u requested Rx offloads 0x%"PRIx64" doesn't match Rx offloads ""capabilities 0x%"PRIx64" in %s()\n",port_id, dev_conf->rxmode.offloads,dev_info.rx_offload_capa,__func__);ret = -EINVAL;goto rollback;}if ((dev_conf->txmode.offloads & dev_info.tx_offload_capa) !=dev_conf->txmode.offloads) {RTE_ETHDEV_LOG(ERR,"Ethdev port_id=%u requested Tx offloads 0x%"PRIx64" doesn't match Tx offloads ""capabilities 0x%"PRIx64" in %s()\n",port_id, dev_conf->txmode.offloads,dev_info.tx_offload_capa,__func__);ret = -EINVAL;goto rollback;}dev->data->dev_conf.rx_adv_conf.rss_conf.rss_hf =rte_eth_rss_hf_refine(dev_conf->rx_adv_conf.rss_conf.rss_hf);/* Check that device supports requested rss hash functions. */if ((dev_info.flow_type_rss_offloads |dev_conf->rx_adv_conf.rss_conf.rss_hf) !=dev_info.flow_type_rss_offloads) {RTE_ETHDEV_LOG(ERR,"Ethdev port_id=%u invalid rss_hf: 0x%"PRIx64", valid value: 0x%"PRIx64"\n",port_id, dev_conf->rx_adv_conf.rss_conf.rss_hf,dev_info.flow_type_rss_offloads);ret = -EINVAL;goto rollback;}/* Check if Rx RSS distribution is disabled but RSS hash is enabled. */if (((dev_conf->rxmode.mq_mode & ETH_MQ_RX_RSS_FLAG) == 0) &&(dev_conf->rxmode.offloads & DEV_RX_OFFLOAD_RSS_HASH)) {RTE_ETHDEV_LOG(ERR,"Ethdev port_id=%u config invalid Rx mq_mode without RSS but %s offload is requested\n",port_id,rte_eth_dev_rx_offload_name(DEV_RX_OFFLOAD_RSS_HASH));ret = -EINVAL;goto rollback;}/** Setup new number of RX/TX queues and reconfigure device.*/diag = rte_eth_dev_rx_queue_config(dev, nb_rx_q);if (diag != 0) {RTE_ETHDEV_LOG(ERR,"Port%u rte_eth_dev_rx_queue_config = %d\n",port_id, diag);ret = diag;goto rollback;}diag = rte_eth_dev_tx_queue_config(dev, nb_tx_q);if (diag != 0) {RTE_ETHDEV_LOG(ERR,"Port%u rte_eth_dev_tx_queue_config = %d\n",port_id, diag);rte_eth_dev_rx_queue_config(dev, 0);ret = diag;goto rollback;}diag = (*dev->dev_ops->dev_configure)(dev);if (diag != 0) {RTE_ETHDEV_LOG(ERR, "Port%u dev_configure = %d\n",port_id, diag);ret = eth_err(port_id, diag);goto reset_queues;}/* Initialize Rx profiling if enabled at compilation time. */diag = __rte_eth_dev_profile_init(port_id, dev);if (diag != 0) {RTE_ETHDEV_LOG(ERR, "Port%u __rte_eth_dev_profile_init = %d\n",port_id, diag);ret = eth_err(port_id, diag);goto reset_queues;}/* Validate Rx offloads. */diag = validate_offloads(port_id,dev_conf->rxmode.offloads,dev->data->dev_conf.rxmode.offloads, "Rx",rte_eth_dev_rx_offload_name);if (diag != 0) {ret = diag;goto reset_queues;}/* Validate Tx offloads. */diag = validate_offloads(port_id,dev_conf->txmode.offloads,dev->data->dev_conf.txmode.offloads, "Tx",rte_eth_dev_tx_offload_name);if (diag != 0) {ret = diag;goto reset_queues;}return 0;
reset_queues:rte_eth_dev_rx_queue_config(dev, 0);rte_eth_dev_tx_queue_config(dev, 0);
rollback:memcpy(&dev->data->dev_conf, &orig_conf, sizeof(dev->data->dev_conf));if (old_mtu != dev->data->mtu)dev->data->mtu = old_mtu;return ret;
}
最后是启动设备:
int
rte_eth_dev_start(uint16_t port_id)
{struct rte_eth_dev *dev;struct rte_eth_dev_info dev_info;int diag;int ret;RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -EINVAL);dev = &rte_eth_devices[port_id];RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_start, -ENOTSUP);if (dev->data->dev_started != 0) {RTE_ETHDEV_LOG(INFO,"Device with port_id=%"PRIu16" already started\n",port_id);return 0;}ret = rte_eth_dev_info_get(port_id, &dev_info);if (ret != 0)return ret;/* Lets restore MAC now if device does not support live change */if (*dev_info.dev_flags & RTE_ETH_DEV_NOLIVE_MAC_ADDR)rte_eth_dev_mac_restore(dev, &dev_info);diag = (*dev->dev_ops->dev_start)(dev);if (diag == 0)dev->data->dev_started = 1;elsereturn eth_err(port_id, diag);ret = rte_eth_dev_config_restore(dev, &dev_info, port_id);if (ret != 0) {RTE_ETHDEV_LOG(ERR,"Error during restoring configuration for device (port %u): %s\n",port_id, rte_strerror(-ret));rte_eth_dev_stop(port_id);return ret;}if (dev->data->dev_conf.intr_conf.lsc == 0) {RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->link_update, -ENOTSUP);(*dev->dev_ops->link_update)(dev, 0);}return 0;
}
在一切准备就绪后就可以进行收发数据了:
/* Run until the application is quit or killed. */
for (;;) {/** Receive packets on a port and forward them on the paired* port. The mapping is 0 -> 1, 1 -> 0, 2 -> 3, 3 -> 2, etc.*/RTE_ETH_FOREACH_DEV(port) {/* Get burst of RX packets, from first port of pair. */struct rte_mbuf *bufs[BURST_SIZE];const uint16_t nb_rx = rte_eth_rx_burst(port, 0,bufs, BURST_SIZE);if (unlikely(nb_rx == 0))continue;/* Send burst of TX packets, to second port of pair. */const uint16_t nb_tx = rte_eth_tx_burst(port ^ 1, 0,bufs, nb_rx);/* Free any unsent packets. */if (unlikely(nb_tx < nb_rx)) {uint16_t buf;for (buf = nb_tx; buf < nb_rx; buf++)rte_pktmbuf_free(bufs[buf]);}}
}
最主要的当然是两个burst函数,先看接收rte_eth_rx_burst:
/**** Retrieve a burst of input packets from a receive queue of an Ethernet* device. The retrieved packets are stored in *rte_mbuf* structures whose* pointers are supplied in the *rx_pkts* array.** The rte_eth_rx_burst() function loops, parsing the RX ring of the* receive queue, up to *nb_pkts* packets, and for each completed RX* descriptor in the ring, it performs the following operations:** - Initialize the *rte_mbuf* data structure associated with the* RX descriptor according to the information provided by the NIC into* that RX descriptor.** - Store the *rte_mbuf* data structure into the next entry of the* *rx_pkts* array.** - Replenish the RX descriptor with a new *rte_mbuf* buffer* allocated from the memory pool associated with the receive queue at* initialization time.** When retrieving an input packet that was scattered by the controller* into multiple receive descriptors, the rte_eth_rx_burst() function* appends the associated *rte_mbuf* buffers to the first buffer of the* packet.** The rte_eth_rx_burst() function returns the number of packets* actually retrieved, which is the number of *rte_mbuf* data structures* effectively supplied into the *rx_pkts* array.* A return value equal to *nb_pkts* indicates that the RX queue contained* at least *rx_pkts* packets, and this is likely to signify that other* received packets remain in the input queue. Applications implementing* a "retrieve as much received packets as possible" policy can check this* specific case and keep invoking the rte_eth_rx_burst() function until* a value less than *nb_pkts* is returned.** This receive method has the following advantages:** - It allows a run-to-completion network stack engine to retrieve and* to immediately process received packets in a fast burst-oriented* approach, avoiding the overhead of unnecessary intermediate packet* queue/dequeue operations.** - Conversely, it also allows an asynchronous-oriented processing* method to retrieve bursts of received packets and to immediately* queue them for further parallel processing by another logical core,* for instance. However, instead of having received packets being* individually queued by the driver, this approach allows the caller* of the rte_eth_rx_burst() function to queue a burst of retrieved* packets at a time and therefore dramatically reduce the cost of* enqueue/dequeue operations per packet.** - It allows the rte_eth_rx_burst() function of the driver to take* advantage of burst-oriented hardware features (CPU cache,* prefetch instructions, and so on) to minimize the number of CPU* cycles per packet.** To summarize, the proposed receive API enables many* burst-oriented optimizations in both synchronous and asynchronous* packet processing environments with no overhead in both cases.** The rte_eth_rx_burst() function does not provide any error* notification to avoid the corresponding overhead. As a hint, the* upper-level application might check the status of the device link once* being systematically returned a 0 value for a given number of tries.** @param port_id* The port identifier of the Ethernet device.* @param queue_id* The index of the receive queue from which to retrieve input packets.* The value must be in the range [0, nb_rx_queue - 1] previously supplied* to rte_eth_dev_configure().* @param rx_pkts* The address of an array of pointers to *rte_mbuf* structures that* must be large enough to store *nb_pkts* pointers in it.* @param nb_pkts* The maximum number of packets to retrieve.* @return* The number of packets actually retrieved, which is the number* of pointers to *rte_mbuf* structures effectively supplied to the* *rx_pkts* array.*/
static inline uint16_t
rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,struct rte_mbuf **rx_pkts, const uint16_t nb_pkts)
{struct rte_eth_dev *dev = &rte_eth_devices[port_id];uint16_t nb_rx;#ifdef RTE_LIBRTE_ETHDEV_DEBUGRTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);RTE_FUNC_PTR_OR_ERR_RET(*dev->rx_pkt_burst, 0);if (queue_id >= dev->data->nb_rx_queues) {RTE_ETHDEV_LOG(ERR, "Invalid RX queue_id=%u\n", queue_id);return 0;}
#endifnb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],rx_pkts, nb_pkts);#ifdef RTE_ETHDEV_RXTX_CALLBACKSstruct rte_eth_rxtx_callback *cb;/* __ATOMIC_RELEASE memory order was used when the* call back was inserted into the list.* Since there is a clear dependency between loading* cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is* not required.*/cb = __atomic_load_n(&dev->post_rx_burst_cbs[queue_id],__ATOMIC_RELAXED);if (unlikely(cb != NULL)) {do {nb_rx = cb->fn.rx(port_id, queue_id, rx_pkts, nb_rx,nb_pkts, cb->param);cb = cb->next;} while (cb != NULL);}
#endifreturn nb_rx;
}
再看一下发送rte_eth_tx_burst函数:
/**
* Send a burst of output packets on a transmit queue of an Ethernet device.
*
* The rte_eth_tx_burst() function is invoked to transmit output packets
* on the output queue *queue_id* of the Ethernet device designated by its
* *port_id*.
* The *nb_pkts* parameter is the number of packets to send which are
* supplied in the *tx_pkts* array of *rte_mbuf* structures, each of them
* allocated from a pool created with rte_pktmbuf_pool_create().
* The rte_eth_tx_burst() function loops, sending *nb_pkts* packets,
* up to the number of transmit descriptors available in the TX ring of the
* transmit queue.
* For each packet to send, the rte_eth_tx_burst() function performs
* the following operations:
*
* - Pick up the next available descriptor in the transmit ring.
*
* - Free the network buffer previously sent with that descriptor, if any.
*
* - Initialize the transmit descriptor with the information provided
* in the *rte_mbuf data structure.
*
* In the case of a segmented packet composed of a list of *rte_mbuf* buffers,
* the rte_eth_tx_burst() function uses several transmit descriptors
* of the ring.
*
* The rte_eth_tx_burst() function returns the number of packets it
* actually sent. A return value equal to *nb_pkts* means that all packets
* have been sent, and this is likely to signify that other output packets
* could be immediately transmitted again. Applications that implement a
* "send as many packets to transmit as possible" policy can check this
* specific case and keep invoking the rte_eth_tx_burst() function until
* a value less than *nb_pkts* is returned.
*
* It is the responsibility of the rte_eth_tx_burst() function to
* transparently free the memory buffers of packets previously sent.
* This feature is driven by the *tx_free_thresh* value supplied to the
* rte_eth_dev_configure() function at device configuration time.
* When the number of free TX descriptors drops below this threshold, the
* rte_eth_tx_burst() function must [attempt to] free the *rte_mbuf* buffers
* of those packets whose transmission was effectively completed.
*
* If the PMD is DEV_TX_OFFLOAD_MT_LOCKFREE capable, multiple threads can
* invoke this function concurrently on the same tx queue without SW lock.
* @see rte_eth_dev_info_get, struct rte_eth_txconf::offloads
*
* @see rte_eth_tx_prepare to perform some prior checks or adjustments
* for offloads.
*
* @param port_id
* The port identifier of the Ethernet device.
* @param queue_id
* The index of the transmit queue through which output packets must be
* sent.
* The value must be in the range [0, nb_tx_queue - 1] previously supplied
* to rte_eth_dev_configure().
* @param tx_pkts
* The address of an array of *nb_pkts* pointers to *rte_mbuf* structures
* which contain the output packets.
* @param nb_pkts
* The maximum number of packets to transmit.
* @return
* The number of output packets actually stored in transmit descriptors of
* the transmit ring. The return value can be less than the value of the
* *tx_pkts* parameter when the transmit ring is full or has been filled up.
*/
static inline uint16_t
rte_eth_tx_burst(uint16_t port_id, uint16_t queue_id,struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
{struct rte_eth_dev *dev = &rte_eth_devices[port_id];#ifdef RTE_LIBRTE_ETHDEV_DEBUGRTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);RTE_FUNC_PTR_OR_ERR_RET(*dev->tx_pkt_burst, 0);if (queue_id >= dev->data->nb_tx_queues) {RTE_ETHDEV_LOG(ERR, "Invalid TX queue_id=%u\n", queue_id);return 0;}
#endif#ifdef RTE_ETHDEV_RXTX_CALLBACKSstruct rte_eth_rxtx_callback *cb;/* __ATOMIC_RELEASE memory order was used when the* call back was inserted into the list.* Since there is a clear dependency between loading* cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is* not required.*/cb = __atomic_load_n(&dev->pre_tx_burst_cbs[queue_id],__ATOMIC_RELAXED);if (unlikely(cb != NULL)) {do {nb_pkts = cb->fn.tx(port_id, queue_id, tx_pkts, nb_pkts,cb->param);cb = cb->next;} while (cb != NULL);}
#endifreturn (*dev->tx_pkt_burst)(dev->data->tx_queues[queue_id], tx_pkts, nb_pkts);
}
这两块具体的数据收发流程会在后面详细分析。
三、汇总流程
通过上面的代码分析可以发现,整个DPDK中对数据的收发需要进行下面基本的流程:
1、环境初始化,即rte_eal_init函数对一系列的参数和环境进行处理,比如内存的管理等
2、网络设备的配置,特别是相关收发队列的内存配置,即rte_eth_dev_configure调用
3、具体的收发队列的分配和管理,即rte_eth_rx_queue_setup和rte_eth_tx_queue_setup函数的调用
4、在一系列的准备和配备及相关安全检测通过后,启动网卡设备,将相关数据结构和内存等与网卡关联,即rte_eth_dev_start函数
5、在循环中处理收发数据,即rte_eth_rx_burst和rte_eth_tx_burst函数,当然这个程序里处理报文相当简单
6、处理程序结束后的环境资源回收即rte_eal_cleanup函数
四、总结
不同的版本的DPDK可能源码会有些不同,请注意版本即可。重点是把DPDK的相关应用熟悉,在此基础上才能更好的对DPDK的框架有所了解。由应用到内部的实现,可能会更好的理解DPDK的设计理念和相关技术特点。
这篇关于DPDK系列之四十一数据收发整体流程源码分析之二流程框架的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!