suricata
source-dpdk.c
Go to the documentation of this file.
1/* Copyright (C) 2021-2025 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18/**
19 * \defgroup dpdk DPDK running mode
20 *
21 * @{
22 */
23
24/**
25 * \file
26 *
27 * \author Lukas Sismis <lukas.sismis@gmail.com>
28 *
29 * DPDK capture interface
30 *
31 */
32
33#include "suricata-common.h"
34#include "runmodes.h"
35#include "decode.h"
36#include "packet.h"
37#include "source-dpdk.h"
38#include "suricata.h"
39#include "threads.h"
40#include "threadvars.h"
41#include "tm-threads.h"
42#include "tmqh-packetpool.h"
43#include "util-privs.h"
44#include "util-device-private.h"
45#include "action-globals.h"
46
47#ifndef HAVE_DPDK
48
49TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **);
50
61
62/**
63 * \brief Registration Function for DecodeDPDK.
64 */
75
76/**
77 * \brief this function prints an error message and exits.
78 */
79TmEcode NoDPDKSupportExit(ThreadVars *tv, const void *initdata, void **data)
80{
81 FatalError("Error creating thread %s: you do not have "
82 "support for DPDK enabled, on Linux host please recompile "
83 "with --enable-dpdk",
84 tv->name);
85}
86
87#else /* We have DPDK support */
88
89#include "util-affinity.h"
90#include "util-dpdk.h"
91#include "util-dpdk-i40e.h"
92#include "util-dpdk-ice.h"
93#include "util-dpdk-ixgbe.h"
94#include "util-dpdk-mlx5.h"
95#include "util-dpdk-bonding.h"
96#include <numa.h>
97
98#define BURST_SIZE 32
99// interrupt mode constants
100#define MIN_ZERO_POLL_COUNT 10U
101#define MIN_ZERO_POLL_COUNT_TO_SLEEP 10U
102#define MINIMUM_SLEEP_TIME_US 1U
103#define STANDARD_SLEEP_TIME_US 100U
104#define MAX_EPOLL_TIMEOUT_MS 500U
105static rte_spinlock_t intr_lock[RTE_MAX_ETHPORTS];
106
107/**
108 * \brief Structure to hold thread specific variables.
109 */
110typedef struct DPDKThreadVars_ {
111 /* counters */
112 uint64_t pkts;
113 ThreadVars *tv;
114 TmSlot *slot;
115 LiveDevice *livedev;
116 ChecksumValidationMode checksum_mode;
117 bool intr_enabled;
118 /* references to packet and drop counters */
119 uint16_t capture_dpdk_packets;
120 uint16_t capture_dpdk_rx_errs;
121 uint16_t capture_dpdk_imissed;
122 uint16_t capture_dpdk_rx_no_mbufs;
123 uint16_t capture_dpdk_ierrors;
124 uint16_t capture_dpdk_tx_errs;
125 unsigned int flags;
126 uint16_t threads;
127 /* for IPS */
128 DpdkCopyModeEnum copy_mode;
129 uint16_t out_port_id;
130 /* Entry in the peers_list */
131
132 uint64_t bytes;
133 uint64_t accepted;
134 uint64_t dropped;
135 uint16_t port_id;
136 uint16_t queue_id;
137 int32_t port_socket_id;
138 struct rte_mbuf *received_mbufs[BURST_SIZE];
139 DPDKWorkerSync *workers_sync;
140} DPDKThreadVars;
141
142static TmEcode ReceiveDPDKThreadInit(ThreadVars *, const void *, void **);
143static void ReceiveDPDKThreadExitStats(ThreadVars *, void *);
144static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *, void *);
145static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot);
146
147static TmEcode DecodeDPDKThreadInit(ThreadVars *, const void *, void **);
148static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data);
149static TmEcode DecodeDPDK(ThreadVars *, Packet *, void *);
150
151static void DPDKFreeMbufArray(struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset);
152static bool InterruptsRXEnable(uint16_t port_id, uint16_t queue_id)
153{
154 uint32_t event_data = (uint32_t)port_id << UINT16_WIDTH | queue_id;
155 int32_t ret = rte_eth_dev_rx_intr_ctl_q(port_id, queue_id, RTE_EPOLL_PER_THREAD,
156 RTE_INTR_EVENT_ADD, (void *)((uintptr_t)event_data));
157
158 if (ret != 0) {
159 SCLogError("%s-Q%d: failed to enable interrupt mode: %s", DPDKGetPortNameByPortID(port_id),
160 queue_id, rte_strerror(-ret));
161 return false;
162 }
163 return true;
164}
165
166static inline uint32_t InterruptsSleepHeuristic(uint32_t no_pkt_polls_count)
167{
168 if (no_pkt_polls_count < MIN_ZERO_POLL_COUNT_TO_SLEEP)
169 return MINIMUM_SLEEP_TIME_US;
170
171 return STANDARD_SLEEP_TIME_US;
172}
173
174static inline void InterruptsTurnOnOff(uint16_t port_id, uint16_t queue_id, bool on)
175{
176 rte_spinlock_lock(&(intr_lock[port_id]));
177
178 if (on)
179 rte_eth_dev_rx_intr_enable(port_id, queue_id);
180 else
181 rte_eth_dev_rx_intr_disable(port_id, queue_id);
182
183 rte_spinlock_unlock(&(intr_lock[port_id]));
184}
185
186static inline void DPDKFreeMbufArray(
187 struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset)
188{
189 for (int i = offset; i < mbuf_cnt; i++) {
190 rte_pktmbuf_free(mbuf_array[i]);
191 }
192}
193
194static void DevicePostStartPMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
195{
196 if (strcmp(driver_name, "net_bonding") == 0)
197 driver_name = BondingDeviceDriverGet(ptv->port_id);
198 if (strcmp(driver_name, "net_i40e") == 0)
199 i40eDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
200 else if (strcmp(driver_name, "net_ixgbe") == 0)
201 ixgbeDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
202 else if (strcmp(driver_name, "net_ice") == 0)
203 iceDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
204 else if (strcmp(driver_name, "mlx5_pci") == 0)
205 mlx5DeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
206}
207
208static void DevicePreClosePMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
209{
210 if (strcmp(driver_name, "net_bonding") == 0) {
211 driver_name = BondingDeviceDriverGet(ptv->port_id);
212 }
213
214 if (
215#if RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0)
216 strcmp(driver_name, "net_i40e") == 0 ||
217#endif /* RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0) */
218 strcmp(driver_name, "net_ixgbe") == 0 || strcmp(driver_name, "net_ice") == 0 ||
219 strcmp(driver_name, "mlx5_pci") == 0) {
220 // Flush the RSS rules that have been inserted in the post start section
221 struct rte_flow_error flush_error = { 0 };
222 int32_t retval = rte_flow_flush(ptv->port_id, &flush_error);
223 if (retval != 0) {
224 SCLogError("%s: unable to flush rte_flow rules: %s Flush error msg: %s",
225 ptv->livedev->dev, rte_strerror(-retval), flush_error.message);
226 }
227 }
228}
229
230/**
231 * Attempts to retrieve NUMA node id on which the caller runs
232 * @return NUMA id on success, -1 otherwise
233 */
234static int GetNumaNode(void)
235{
236 int cpu = 0;
237 int node = -1;
238
239#if defined(__linux__)
240 cpu = sched_getcpu();
241 node = numa_node_of_cpu(cpu);
242#else
243 SCLogWarning("NUMA node retrieval is not supported on this OS.");
244#endif
245
246 return node;
247}
248
249/**
250 * \brief Registration Function for ReceiveDPDK.
251 * \todo Unit tests are needed for this module.
252 */
254{
255 tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
256 tmm_modules[TMM_RECEIVEDPDK].ThreadInit = ReceiveDPDKThreadInit;
258 tmm_modules[TMM_RECEIVEDPDK].PktAcqLoop = ReceiveDPDKLoop;
260 tmm_modules[TMM_RECEIVEDPDK].ThreadExitPrintStats = ReceiveDPDKThreadExitStats;
261 tmm_modules[TMM_RECEIVEDPDK].ThreadDeinit = ReceiveDPDKThreadDeinit;
264}
265
266/**
267 * \brief Registration Function for DecodeDPDK.
268 * \todo Unit tests are needed for this module.
269 */
271{
272 tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
273 tmm_modules[TMM_DECODEDPDK].ThreadInit = DecodeDPDKThreadInit;
274 tmm_modules[TMM_DECODEDPDK].Func = DecodeDPDK;
276 tmm_modules[TMM_DECODEDPDK].ThreadDeinit = DecodeDPDKThreadDeinit;
279}
280
281static inline void DPDKDumpCounters(DPDKThreadVars *ptv)
282{
283 /* Some NICs (e.g. Intel) do not support queue statistics and the drops can be fetched only on
284 * the port level. Therefore setting it to the first worker to have at least continuous update
285 * on the dropped packets. */
286 if (ptv->queue_id == 0) {
287 struct rte_eth_stats eth_stats;
288 int retval = rte_eth_stats_get(ptv->port_id, &eth_stats);
289 if (unlikely(retval != 0)) {
290 SCLogError("%s: failed to get stats: %s", ptv->livedev->dev, rte_strerror(-retval));
291 return;
292 }
293
294 StatsSetUI64(ptv->tv, ptv->capture_dpdk_packets,
295 ptv->pkts + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
296 SC_ATOMIC_SET(ptv->livedev->pkts,
297 eth_stats.ipackets + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
298 StatsSetUI64(ptv->tv, ptv->capture_dpdk_rx_errs,
299 eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
300 StatsSetUI64(ptv->tv, ptv->capture_dpdk_imissed, eth_stats.imissed);
301 StatsSetUI64(ptv->tv, ptv->capture_dpdk_rx_no_mbufs, eth_stats.rx_nombuf);
302 StatsSetUI64(ptv->tv, ptv->capture_dpdk_ierrors, eth_stats.ierrors);
303 StatsSetUI64(ptv->tv, ptv->capture_dpdk_tx_errs, eth_stats.oerrors);
305 ptv->livedev->drop, eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
306 } else {
307 StatsSetUI64(ptv->tv, ptv->capture_dpdk_packets, ptv->pkts);
308 }
309}
310
311static void DPDKReleasePacket(Packet *p)
312{
313 int retval;
314 /* Need to be in copy mode and need to detect early release
315 where Ethernet header could not be set (and pseudo packet)
316 When enabling promiscuous mode on Intel cards, 2 ICMPv6 packets are generated.
317 These get into the infinite cycle between the NIC and the switch in some cases */
318 if ((p->dpdk_v.copy_mode == DPDK_COPY_MODE_TAP ||
319 (p->dpdk_v.copy_mode == DPDK_COPY_MODE_IPS && !PacketCheckAction(p, ACTION_DROP)))
320#if defined(RTE_LIBRTE_I40E_PMD) || defined(RTE_LIBRTE_IXGBE_PMD) || defined(RTE_LIBRTE_ICE_PMD)
321 && !(PacketIsICMPv6(p) && PacketGetICMPv6(p)->type == 143)
322#endif
323 ) {
325 retval =
326 rte_eth_tx_burst(p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
327 // rte_eth_tx_burst can return only 0 (failure) or 1 (success) because we are only
328 // transmitting burst of size 1 and the function rte_eth_tx_burst returns number of
329 // successfully sent packets.
330 if (unlikely(retval < 1)) {
331 // sometimes a repeated transmit can help to send out the packet
332 rte_delay_us(DPDK_BURST_TX_WAIT_US);
333 retval = rte_eth_tx_burst(
334 p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
335 if (unlikely(retval < 1)) {
336 SCLogDebug("Unable to transmit the packet on port %u queue %u",
337 p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id);
338 rte_pktmbuf_free(p->dpdk_v.mbuf);
339 p->dpdk_v.mbuf = NULL;
340 }
341 }
342 } else {
343 rte_pktmbuf_free(p->dpdk_v.mbuf);
344 p->dpdk_v.mbuf = NULL;
345 }
346
348}
349
350static TmEcode ReceiveDPDKLoopInit(ThreadVars *tv, DPDKThreadVars *ptv)
351{
352 SCEnter();
353 // Indicate that the thread is actually running its application level
354 // code (i.e., it can poll packets)
357
358 rte_eth_stats_reset(ptv->port_id);
359 rte_eth_xstats_reset(ptv->port_id);
360
361 if (ptv->intr_enabled && !InterruptsRXEnable(ptv->port_id, ptv->queue_id))
363
365}
366
367static inline void LoopHandleTimeoutOnIdle(ThreadVars *tv)
368{
369 static thread_local uint64_t last_timeout_msec = 0;
370 SCTime_t t = TimeGet();
371 uint64_t msecs = SCTIME_MSECS(t);
372 if (msecs > last_timeout_msec + 100) {
373 TmThreadsCaptureHandleTimeout(tv, NULL);
374 last_timeout_msec = msecs;
375 }
376}
377
378/**
379 * \brief Decides if it should retry the packet poll or continue with the packet processing
380 * \return true if the poll should be retried, false otherwise
381 */
382static inline bool RXPacketCountHeuristic(ThreadVars *tv, DPDKThreadVars *ptv, uint16_t nb_rx)
383{
384 static thread_local uint32_t zero_pkt_polls_cnt = 0;
385
386 if (nb_rx > 0) {
387 zero_pkt_polls_cnt = 0;
388 return false;
389 }
390
391 LoopHandleTimeoutOnIdle(tv);
392 if (!ptv->intr_enabled)
393 return true;
394
395 zero_pkt_polls_cnt++;
396 if (zero_pkt_polls_cnt <= MIN_ZERO_POLL_COUNT)
397 return true;
398
399 uint32_t pwd_idle_hint = InterruptsSleepHeuristic(zero_pkt_polls_cnt);
400 if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) {
401 rte_delay_us(pwd_idle_hint);
402 } else {
403 InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, true);
404 struct rte_epoll_event event;
405 rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS);
406 InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, false);
407 return true;
408 }
409
410 return false;
411}
412
413/**
414 * \brief Initializes a packet from an mbuf
415 * \return true if the packet was initialized successfully, false otherwise
416 */
417static inline Packet *PacketInitFromMbuf(DPDKThreadVars *ptv, struct rte_mbuf *mbuf)
418{
420 if (unlikely(p == NULL)) {
421 return NULL;
422 }
425 if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
427 }
428
429 p->ts = TimeGet();
430 p->dpdk_v.mbuf = mbuf;
431 p->ReleasePacket = DPDKReleasePacket;
432 p->dpdk_v.copy_mode = ptv->copy_mode;
433 p->dpdk_v.out_port_id = ptv->out_port_id;
434 p->dpdk_v.out_queue_id = ptv->queue_id;
435 p->livedev = ptv->livedev;
436
437 if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
439 } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_OFFLOAD) {
440 uint64_t ol_flags = p->dpdk_v.mbuf->ol_flags;
441 if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD &&
442 (ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) {
443 SCLogDebug("HW detected GOOD IP and L4 chsum, ignoring validation");
445 } else {
446 if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) {
447 SCLogDebug("HW detected BAD IP checksum");
448 // chsum recalc will not be triggered but rule keyword check will be
449 p->l3.csum_set = true;
450 p->l3.csum = 0;
451 }
452 if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) {
453 SCLogDebug("HW detected BAD L4 chsum");
454 p->l4.csum_set = true;
455 p->l4.csum = 0;
456 }
457 }
458 }
459
460 return p;
461}
462
463static inline void DPDKSegmentedMbufWarning(struct rte_mbuf *mbuf)
464{
465 static thread_local bool segmented_mbufs_warned = false;
466 if (!segmented_mbufs_warned && !rte_pktmbuf_is_contiguous(mbuf)) {
467 char warn_s[] = "Segmented mbufs detected! Redmine Ticket #6012 "
468 "Check your configuration or report the issue";
469 enum rte_proc_type_t eal_t = rte_eal_process_type();
470 if (eal_t == RTE_PROC_SECONDARY) {
471 SCLogWarning("%s. To avoid segmented mbufs, "
472 "try to increase mbuf size in your primary application",
473 warn_s);
474 } else if (eal_t == RTE_PROC_PRIMARY) {
475 SCLogWarning("%s. To avoid segmented mbufs, "
476 "try to increase MTU in your suricata.yaml",
477 warn_s);
478 }
479
480 segmented_mbufs_warned = true;
481 }
482}
483
484static void HandleShutdown(DPDKThreadVars *ptv)
485{
486 SCLogDebug("Stopping Suricata!");
487 SC_ATOMIC_ADD(ptv->workers_sync->worker_checked_in, 1);
488 while (SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) < ptv->workers_sync->worker_cnt) {
489 rte_delay_us(10);
490 }
491 if (ptv->queue_id == 0) {
492 rte_delay_us(20); // wait for all threads to get out of the sync loop
493 SC_ATOMIC_SET(ptv->workers_sync->worker_checked_in, 0);
494 // If Suricata runs in peered mode, the peer threads might still want to send
495 // packets to our port. Instead, we know, that we are done with the peered port, so
496 // we stop it. The peered threads will stop our port.
497 if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) {
498 rte_eth_dev_stop(ptv->out_port_id);
499 } else {
500 // in IDS we stop our port - no peer threads are running
501 rte_eth_dev_stop(ptv->port_id);
502 }
503 }
504 DPDKDumpCounters(ptv);
505}
506
507static void PeriodicDPDKDumpCounters(DPDKThreadVars *ptv)
508{
509 static thread_local SCTime_t last_dump = { 0 };
510 SCTime_t current_time = TimeGet();
511 /* Trigger one dump of stats every second */
512 if (current_time.secs != last_dump.secs) {
513 DPDKDumpCounters(ptv);
514 last_dump = current_time;
515 }
516}
517
518/**
519 * \brief Main DPDK reading Loop function
520 */
521static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
522{
523 SCEnter();
524 DPDKThreadVars *ptv = (DPDKThreadVars *)data;
525 ptv->slot = ((TmSlot *)slot)->slot_next;
526 TmEcode ret = ReceiveDPDKLoopInit(tv, ptv);
527 if (ret != TM_ECODE_OK) {
528 SCReturnInt(ret);
529 }
530 while (true) {
531 if (unlikely(suricata_ctl_flags != 0)) {
532 HandleShutdown(ptv);
533 break;
534 }
535
536 uint16_t nb_rx =
537 rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE);
538 if (RXPacketCountHeuristic(tv, ptv, nb_rx)) {
539 continue;
540 }
541
542 ptv->pkts += (uint64_t)nb_rx;
543 for (uint16_t i = 0; i < nb_rx; i++) {
544 Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]);
545 if (p == NULL) {
546 rte_pktmbuf_free(ptv->received_mbufs[i]);
547 continue;
548 }
549 DPDKSegmentedMbufWarning(ptv->received_mbufs[i]);
550 PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *),
551 rte_pktmbuf_pkt_len(p->dpdk_v.mbuf));
552 if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
553 TmqhOutputPacketpool(ptv->tv, p);
554 DPDKFreeMbufArray(ptv->received_mbufs, nb_rx - i - 1, i + 1);
555 SCReturnInt(EXIT_FAILURE);
556 }
557 }
558
559 PeriodicDPDKDumpCounters(ptv);
561 }
562
564}
565
566/**
567 * \brief Init function for ReceiveDPDK.
568 *
569 * \param tv pointer to ThreadVars
570 * \param initdata pointer to the interface passed from the user
571 * \param data pointer gets populated with DPDKThreadVars
572 *
573 */
574static TmEcode ReceiveDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
575{
576 SCEnter();
577 int retval, thread_numa;
578 DPDKThreadVars *ptv = NULL;
579 DPDKIfaceConfig *dpdk_config = (DPDKIfaceConfig *)initdata;
580
581 if (initdata == NULL) {
582 SCLogError("DPDK configuration is NULL in thread initialization");
583 goto fail;
584 }
585
586 ptv = SCCalloc(1, sizeof(DPDKThreadVars));
587 if (unlikely(ptv == NULL)) {
588 SCLogError("Unable to allocate memory");
589 goto fail;
590 }
591
592 ptv->tv = tv;
593 ptv->pkts = 0;
594 ptv->bytes = 0;
595 ptv->livedev = LiveGetDevice(dpdk_config->iface);
596
597 ptv->capture_dpdk_packets = StatsRegisterCounter("capture.packets", ptv->tv);
598 ptv->capture_dpdk_rx_errs = StatsRegisterCounter("capture.rx_errors", ptv->tv);
599 ptv->capture_dpdk_tx_errs = StatsRegisterCounter("capture.tx_errors", ptv->tv);
600 ptv->capture_dpdk_imissed = StatsRegisterCounter("capture.dpdk.imissed", ptv->tv);
601 ptv->capture_dpdk_rx_no_mbufs = StatsRegisterCounter("capture.dpdk.no_mbufs", ptv->tv);
602 ptv->capture_dpdk_ierrors = StatsRegisterCounter("capture.dpdk.ierrors", ptv->tv);
603
604 ptv->copy_mode = dpdk_config->copy_mode;
605 ptv->checksum_mode = dpdk_config->checksum_mode;
606
607 ptv->threads = dpdk_config->threads;
608 ptv->intr_enabled = (dpdk_config->flags & DPDK_IRQ_MODE) ? true : false;
609 ptv->port_id = dpdk_config->port_id;
610 ptv->out_port_id = dpdk_config->out_port_id;
611 ptv->port_socket_id = dpdk_config->socket_id;
612
613 thread_numa = GetNumaNode();
614 if (thread_numa >= 0 && ptv->port_socket_id != SOCKET_ID_ANY &&
615 thread_numa != ptv->port_socket_id) {
616 SC_ATOMIC_ADD(dpdk_config->inconsistent_numa_cnt, 1);
617 SCLogPerf("%s: NIC is on NUMA %d, thread on NUMA %d", dpdk_config->iface,
618 ptv->port_socket_id, thread_numa);
619 }
620
621 ptv->workers_sync = dpdk_config->workers_sync;
622 uint16_t queue_id = SC_ATOMIC_ADD(dpdk_config->queue_id, 1);
623 ptv->queue_id = queue_id;
624
625 // the last thread starts the device
626 if (queue_id == dpdk_config->threads - 1) {
627 retval = rte_eth_dev_start(ptv->port_id);
628 if (retval < 0) {
629 SCLogError("%s: error (%s) during device startup", dpdk_config->iface,
630 rte_strerror(-retval));
631 goto fail;
632 }
633
634 struct rte_eth_dev_info dev_info;
635 retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
636 if (retval != 0) {
637 SCLogError("%s: error (%s) when getting device info", dpdk_config->iface,
638 rte_strerror(-retval));
639 goto fail;
640 }
641
642 uint32_t timeout = dpdk_config->linkup_timeout * 10;
643 while (timeout > 0) {
644 struct rte_eth_link link = { 0 };
645 retval = rte_eth_link_get_nowait(ptv->port_id, &link);
646 if (retval != 0) {
647 if (retval == -ENOTSUP) {
648 SCLogInfo("%s: link status not supported, skipping", dpdk_config->iface);
649 } else {
650 SCLogInfo("%s: error (%s) when getting link status, skipping",
651 dpdk_config->iface, rte_strerror(-retval));
652 }
653 break;
654 }
655 if (link.link_status) {
656 char link_status_str[RTE_ETH_LINK_MAX_STR_LEN];
657#if RTE_VERSION >= RTE_VERSION_NUM(20, 11, 0, 0)
658#pragma GCC diagnostic push
659#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
660 rte_eth_link_to_str(link_status_str, sizeof(link_status_str), &link);
661#pragma GCC diagnostic pop
662#else
663 snprintf(link_status_str, sizeof(link_status_str),
664 "Link Up, speed %u Mbps, %s", // 22 chars + 10 for digits + 11 for duplex
665 link.link_speed,
666 (link.link_duplex == ETH_LINK_FULL_DUPLEX) ? "full-duplex" : "half-duplex");
667#endif
668
669 SCLogInfo("%s: %s", dpdk_config->iface, link_status_str);
670 break;
671 }
672
673 rte_delay_ms(100);
674 timeout--;
675 }
676
677 if (dpdk_config->linkup_timeout && timeout == 0) {
678 SCLogWarning("%s: link is down, trying to continue anyway", dpdk_config->iface);
679 }
680
681 // some PMDs requires additional actions only after the device has started
682 DevicePostStartPMDSpecificActions(ptv, dev_info.driver_name);
683
684 uint16_t inconsistent_numa_cnt = SC_ATOMIC_GET(dpdk_config->inconsistent_numa_cnt);
685 if (inconsistent_numa_cnt > 0 && ptv->port_socket_id != SOCKET_ID_ANY) {
686 SCLogWarning("%s: NIC is on NUMA %d, %u threads on different NUMA node(s)",
687 dpdk_config->iface, ptv->port_socket_id, inconsistent_numa_cnt);
688 } else if (ptv->port_socket_id == SOCKET_ID_ANY && rte_socket_count() > 1) {
690 "%s: unable to determine NIC's NUMA node, degraded performance can be expected",
691 dpdk_config->iface);
692 }
693 if (ptv->intr_enabled) {
694 rte_spinlock_init(&intr_lock[ptv->port_id]);
695 }
696 }
697
698 *data = (void *)ptv;
699 dpdk_config->DerefFunc(dpdk_config);
701
702fail:
703 if (dpdk_config != NULL)
704 dpdk_config->DerefFunc(dpdk_config);
705 if (ptv != NULL)
706 SCFree(ptv);
708}
709
710static void PrintDPDKPortXstats(uint16_t port_id, const char *port_name)
711{
712 struct rte_eth_xstat *xstats;
713 struct rte_eth_xstat_name *xstats_names;
714
715 int32_t ret = rte_eth_xstats_get(port_id, NULL, 0);
716 if (ret < 0) {
717 FatalError("Error (%s) getting count of rte_eth_xstats failed on port %s",
718 rte_strerror(-ret), port_name);
719 }
720 uint16_t len = (uint16_t)ret;
721
722 xstats = SCCalloc(len, sizeof(*xstats));
723 if (xstats == NULL)
724 FatalError("Failed to allocate memory for the rte_eth_xstat structure");
725
726 ret = rte_eth_xstats_get(port_id, xstats, len);
727 if (ret < 0 || ret > len) {
728 SCFree(xstats);
729 FatalError("Error (%s) getting rte_eth_xstats failed on port %s", rte_strerror(-ret),
730 port_name);
731 }
732 xstats_names = SCCalloc(len, sizeof(*xstats_names));
733 if (xstats_names == NULL) {
734 SCFree(xstats);
735 FatalError("Failed to allocate memory for the rte_eth_xstat_name array");
736 }
737 ret = rte_eth_xstats_get_names(port_id, xstats_names, len);
738 if (ret < 0 || ret > len) {
739 SCFree(xstats);
740 SCFree(xstats_names);
741 FatalError("Error (%s) getting names of rte_eth_xstats failed on port %s",
742 rte_strerror(-ret), port_name);
743 }
744 for (int32_t i = 0; i < len; i++) {
745 if (xstats[i].value > 0)
746 SCLogPerf("Port %u (%s) - %s: %" PRIu64, port_id, port_name, xstats_names[i].name,
747 xstats[i].value);
748 }
749
750 SCFree(xstats);
751 SCFree(xstats_names);
752}
753
754/**
755 * \brief This function prints stats to the screen at exit.
756 * \param tv pointer to ThreadVars
757 * \param data pointer that gets cast into DPDKThreadVars for ptv
758 */
759static void ReceiveDPDKThreadExitStats(ThreadVars *tv, void *data)
760{
761 SCEnter();
762 int retval;
763 DPDKThreadVars *ptv = (DPDKThreadVars *)data;
764
765 if (ptv->queue_id == 0) {
766 struct rte_eth_stats eth_stats;
767 PrintDPDKPortXstats(ptv->port_id, ptv->livedev->dev);
768 retval = rte_eth_stats_get(ptv->port_id, &eth_stats);
769 if (unlikely(retval != 0)) {
770 SCLogError("%s: failed to get stats (%s)", ptv->livedev->dev, strerror(-retval));
771 SCReturn;
772 }
773 SCLogPerf("%s: total RX stats: packets %" PRIu64 " bytes: %" PRIu64 " missed: %" PRIu64
774 " errors: %" PRIu64 " nombufs: %" PRIu64,
775 ptv->livedev->dev, eth_stats.ipackets, eth_stats.ibytes, eth_stats.imissed,
776 eth_stats.ierrors, eth_stats.rx_nombuf);
777 if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS)
778 SCLogPerf("%s: total TX stats: packets %" PRIu64 " bytes: %" PRIu64 " errors: %" PRIu64,
779 ptv->livedev->dev, eth_stats.opackets, eth_stats.obytes, eth_stats.oerrors);
780 }
781
782 DPDKDumpCounters(ptv);
783 SCLogPerf("(%s) received packets %" PRIu64, tv->name, ptv->pkts);
784}
785
786/**
787 * \brief DeInit function closes dpdk at exit.
788 * \param tv pointer to ThreadVars
789 * \param data pointer that gets cast into DPDKThreadVars for ptv
790 */
791static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *tv, void *data)
792{
793 SCEnter();
794 DPDKThreadVars *ptv = (DPDKThreadVars *)data;
795
796 if (ptv->queue_id == 0) {
797 struct rte_eth_dev_info dev_info;
798 int retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
799 if (retval != 0) {
800 SCLogError("%s: error (%s) when getting device info", ptv->livedev->dev,
801 rte_strerror(-retval));
803 }
804
805 DevicePreClosePMDSpecificActions(ptv, dev_info.driver_name);
806
807 if (ptv->workers_sync) {
808 SCFree(ptv->workers_sync);
809 }
810 }
811
812 SCFree(ptv);
814}
815
816/**
817 * \brief This function passes off to link type decoders.
818 *
819 * DecodeDPDK decodes packets from DPDK and passes
820 * them off to the proper link type decoder.
821 *
822 * \param t pointer to ThreadVars
823 * \param p pointer to the current packet
824 * \param data pointer that gets cast into DPDKThreadVars for ptv
825 */
826static TmEcode DecodeDPDK(ThreadVars *tv, Packet *p, void *data)
827{
828 SCEnter();
830
832
833 /* update counters */
835
836 /* If suri has set vlan during reading, we increase vlan counter */
837 if (p->vlan_idx) {
839 }
840
841 /* call the decoder */
842 DecodeLinkLayer(tv, dtv, p->datalink, p, GET_PKT_DATA(p), GET_PKT_LEN(p));
843
845
847}
848
849static TmEcode DecodeDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
850{
851 SCEnter();
852 DecodeThreadVars *dtv = NULL;
853
855
856 if (dtv == NULL)
858
860
861 *data = (void *)dtv;
862
864}
865
866static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data)
867{
868 SCEnter();
869 if (data != NULL)
872}
873
874#endif /* HAVE_DPDK */
875/* eof */
876/**
877 * @}
878 */
#define ACTION_DROP
uint8_t len
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition counters.c:952
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition counters.c:207
void StatsSyncCountersIfSignalled(ThreadVars *tv)
Definition counters.c:450
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition counters.c:166
uint8_t flags
Definition decode-gre.h:0
uint16_t type
ChecksumValidationMode
Definition decode.h:42
@ CHECKSUM_VALIDATION_OFFLOAD
Definition decode.h:48
@ CHECKSUM_VALIDATION_DISABLE
Definition decode.h:43
#define PKT_SET_SRC(p, src_val)
Definition decode.h:1325
#define GET_PKT_DATA(p)
Definition decode.h:209
#define GET_PKT_LEN(p)
Definition decode.h:208
#define PKT_IS_PSEUDOPKT(p)
return 1 if the packet is a pseudo packet
Definition decode.h:1321
@ PKT_SRC_WIRE
Definition decode.h:52
#define PKT_IGNORE_CHECKSUM
Definition decode.h:1282
DecodeThreadVars * dtv
ThreadVars * tv
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition decode.c:293
void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
Definition decode.c:628
void PacketDecodeFinalize(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
Finalize decoding of a packet.
Definition decode.c:232
DecodeThreadVars * DecodeThreadVarsAlloc(ThreadVars *tv)
Alloc and setup DecodeThreadVars.
Definition decode.c:804
void DecodeThreadVarsFree(ThreadVars *tv, DecodeThreadVars *dtv)
Definition decode.c:822
void DecodeUpdatePacketCounters(ThreadVars *tv, const DecodeThreadVars *dtv, const Packet *p)
Definition decode.c:770
int PacketSetData(Packet *p, const uint8_t *pktdata, uint32_t pktlen)
Set data for Packet and set length when zero copy is used.
Definition decode.c:842
void PacketFreeOrRelease(Packet *p)
Return a packet to where it was allocated.
Definition decode.c:276
void TmModuleDecodeDPDKRegister(void)
Registration Function for DecodeDPDK.
Definition source-dpdk.c:65
TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **)
this function prints an error message and exits.
Definition source-dpdk.c:79
void TmModuleReceiveDPDKRegister(void)
Definition source-dpdk.c:51
bool PacketCheckAction(const Packet *p, const uint8_t a)
Definition packet.c:49
#define DPDK_BURST_TX_WAIT_US
Definition source-dpdk.h:36
DpdkCopyModeEnum
Definition source-dpdk.h:34
@ DPDK_COPY_MODE_IPS
Definition source-dpdk.h:34
@ DPDK_COPY_MODE_TAP
Definition source-dpdk.h:34
#define DPDK_IRQ_MODE
Definition source-dpdk.h:42
Structure to hold thread specific data for all decode modules.
Definition decode.h:963
uint16_t counter_vlan
Definition decode.h:1001
bool csum_set
Definition decode.h:436
uint16_t csum
Definition decode.h:437
bool csum_set
Definition decode.h:466
uint16_t csum
Definition decode.h:467
struct PacketL4 l4
Definition decode.h:601
SCTime_t ts
Definition decode.h:555
struct PacketL3 l3
Definition decode.h:600
int datalink
Definition decode.h:639
struct LiveDevice_ * livedev
Definition decode.h:618
void(* ReleasePacket)(struct Packet_ *)
Definition decode.h:591
uint32_t flags
Definition decode.h:544
uint8_t vlan_idx
Definition decode.h:529
uint64_t secs
Definition util-time.h:41
Per thread variable structure.
Definition threadvars.h:58
char name[16]
Definition threadvars.h:65
const char * name
Definition tm-modules.h:48
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition tm-modules.h:53
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition tm-modules.h:52
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition tm-modules.h:61
uint8_t cap_flags
Definition tm-modules.h:77
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition tm-modules.h:56
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition tm-modules.h:58
uint8_t flags
Definition tm-modules.h:80
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition tm-modules.h:51
#define BUG_ON(x)
volatile uint8_t suricata_ctl_flags
Definition suricata.c:172
#define THV_RUNNING
Definition threadvars.h:55
TmModule tmm_modules[TMM_SIZE]
Definition tm-modules.c:29
#define TM_FLAG_RECEIVE_TM
Definition tm-modules.h:32
#define TM_FLAG_DECODE_TM
Definition tm-modules.h:33
@ TMM_RECEIVEDPDK
@ TMM_DECODEDPDK
@ TM_ECODE_FAILED
@ TM_ECODE_OK
const char * name
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition tm-threads.c:101
void PacketPoolWait(void)
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
#define SCEnter(...)
Definition util-debug.h:277
#define FatalError(...)
Definition util-debug.h:510
#define SCLogPerf(...)
Definition util-debug.h:234
#define SCLogDebug(...)
Definition util-debug.h:275
#define SCReturnInt(x)
Definition util-debug.h:281
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition util-debug.h:243
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition util-debug.h:255
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition util-debug.h:225
#define SCLogError(...)
Macro used to log ERROR messages.
Definition util-debug.h:267
#define SCReturn
Definition util-debug.h:279
LiveDevice * LiveGetDevice(const char *name)
Get a pointer to the device at idx.
#define SCFree(p)
Definition util-mem.h:61
#define SCCalloc(nm, sz)
Definition util-mem.h:53
#define unlikely(expr)
#define SC_CAP_NET_RAW
Definition util-privs.h:32
uint64_t offset
SCTime_t TimeGet(void)
Definition util-time.c:152
#define SCTIME_MSECS(t)
Definition util-time.h:58