suricata
source-af-xdp.c
Go to the documentation of this file.
1/* Copyright (C) 2011-2022 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18/**
19 * \defgroup afxdppacket AF_XDP running mode
20 *
21 * @{
22 */
23
24/**
25 * \file
26 *
27 * \author Richard McConnell <richard_mcconnell@rapid7.com>
28 *
29 * AF_XDP socket acquisition support
30 *
31 */
32#define SC_PCAP_DONT_INCLUDE_PCAP_H 1
33#include "suricata-common.h"
34#include "suricata.h"
35#include "decode.h"
36#include "packet-queue.h"
37#include "threads.h"
38#include "threadvars.h"
39#include "tm-queuehandlers.h"
40#include "tm-modules.h"
41#include "tm-threads.h"
42#include "tm-threads-common.h"
43#include "conf.h"
44#include "util-cpu.h"
45#include "util-datalink.h"
46#include "util-debug.h"
47#include "util-device-private.h"
48#include "util-ebpf.h"
49#include "util-error.h"
50#include "util-privs.h"
51#include "util-optimize.h"
52#include "util-checksum.h"
53#include "util-ioctl.h"
54#include "util-host-info.h"
55#include "util-sysfs.h"
56#include "tmqh-packetpool.h"
57#include "source-af-xdp.h"
58#include "runmodes.h"
59#include "flow-storage.h"
60#include "util-validate.h"
61
62#ifdef HAVE_AF_XDP
63#include <net/if.h>
64#include <bpf/libbpf.h>
65#include <xdp/xsk.h>
66#include <xdp/libxdp.h>
67#endif
68
69#if HAVE_LINUX_IF_ETHER_H
70#include <linux/if_ether.h>
71#endif
72
73#ifndef HAVE_AF_XDP
74
75TmEcode NoAFXDPSupportExit(ThreadVars *, const void *, void **);
76
87
88/**
89 * \brief Registration Function for DecodeAFXDP.
90 */
101
102/**
103 * \brief this function prints an error message and exits.
104 */
105TmEcode NoAFXDPSupportExit(ThreadVars *tv, const void *initdata, void **data)
106{
107 SCLogError("Error creating thread %s: you do not have "
108 "support for AF_XDP enabled, on Linux host please recompile "
109 "with --enable-af-xdp",
110 tv->name);
111 exit(EXIT_FAILURE);
112}
113
114#else /* We have AF_XDP support */
115
116#define POLL_TIMEOUT 100
117#define NUM_FRAMES_PROD XSK_RING_PROD__DEFAULT_NUM_DESCS
118#define NUM_FRAMES_CONS XSK_RING_CONS__DEFAULT_NUM_DESCS
119#define NUM_FRAMES NUM_FRAMES_PROD
120#define FRAME_SIZE XSK_UMEM__DEFAULT_FRAME_SIZE
121#define MEM_BYTES (NUM_FRAMES * FRAME_SIZE * 2)
122#define RECONNECT_TIMEOUT 500000
123
124/* Interface state */
125enum state { AFXDP_STATE_DOWN, AFXDP_STATE_UP };
126
127struct XskInitProtect {
128 SCMutex queue_protect;
129 SC_ATOMIC_DECLARE(uint8_t, queue_num);
130} xsk_protect;
131
132struct UmemInfo {
133 void *buf;
134 struct xsk_umem *umem;
135 struct xsk_ring_prod fq;
136 struct xsk_ring_cons cq;
137 struct xsk_umem_config cfg;
138 int mmap_alignment_flag;
139};
140
141struct QueueAssignment {
142 uint32_t queue_num;
143 bool assigned;
144};
145
146struct XskSockInfo {
147 struct xsk_ring_cons rx;
148 struct xsk_ring_prod tx;
149 struct xsk_socket *xsk;
150
151 /* Queue assignment structure */
152 struct QueueAssignment queue;
153
154 /* Configuration items */
155 struct xsk_socket_config cfg;
156 bool enable_busy_poll;
157 uint32_t busy_poll_time;
158 uint32_t busy_poll_budget;
159
160 struct pollfd fd;
161};
162
163/**
164 * \brief Structure to hold thread specific variables.
165 */
166typedef struct AFXDPThreadVars_ {
167 ThreadVars *tv;
168 TmSlot *slot;
169 LiveDevice *livedev;
170
171 /* thread specific socket */
172 int promisc;
173 int threads;
174
175 char iface[AFXDP_IFACE_NAME_LENGTH];
176 uint32_t ifindex;
177
178 /* AF_XDP structure */
179 struct UmemInfo umem;
180 struct XskSockInfo xsk;
181 uint32_t gro_flush_timeout;
182 uint32_t napi_defer_hard_irqs;
183 uint32_t prog_id;
184
185 /* Handle state */
186 uint8_t afxdp_state;
187
188 /* Stats parameters */
189 uint64_t pkts;
190 uint64_t bytes;
191 uint16_t capture_afxdp_packets;
192 uint16_t capture_kernel_drops;
193 uint16_t capture_afxdp_poll;
194 uint16_t capture_afxdp_poll_timeout;
195 uint16_t capture_afxdp_poll_failed;
196 uint16_t capture_afxdp_empty_reads;
197 uint16_t capture_afxdp_failed_reads;
198 uint16_t capture_afxdp_acquire_pkt_failed;
199} AFXDPThreadVars;
200
201static TmEcode ReceiveAFXDPThreadInit(ThreadVars *, const void *, void **);
202static void ReceiveAFXDPThreadExitStats(ThreadVars *, void *);
203static TmEcode ReceiveAFXDPThreadDeinit(ThreadVars *, void *);
204static TmEcode ReceiveAFXDPLoop(ThreadVars *tv, void *data, void *slot);
205
206static TmEcode DecodeAFXDPThreadInit(ThreadVars *, const void *, void **);
207static TmEcode DecodeAFXDPThreadDeinit(ThreadVars *tv, void *data);
208static TmEcode DecodeAFXDP(ThreadVars *, Packet *, void *);
209
210/**
211 * \brief Registration Function for RecieveAFXDP.
212 * \todo Unit tests are needed for this module.
213 */
215{
216 tmm_modules[TMM_RECEIVEAFXDP].name = "ReceiveAFXDP";
217 tmm_modules[TMM_RECEIVEAFXDP].ThreadInit = ReceiveAFXDPThreadInit;
219 tmm_modules[TMM_RECEIVEAFXDP].PktAcqLoop = ReceiveAFXDPLoop;
221 tmm_modules[TMM_RECEIVEAFXDP].ThreadExitPrintStats = ReceiveAFXDPThreadExitStats;
222 tmm_modules[TMM_RECEIVEAFXDP].ThreadDeinit = ReceiveAFXDPThreadDeinit;
225}
226
227/**
228 * \brief Registration Function for DecodeAFXDP.
229 * \todo Unit tests are needed for this module.
230 */
232{
233 tmm_modules[TMM_DECODEAFXDP].name = "DecodeAFXDP";
234 tmm_modules[TMM_DECODEAFXDP].ThreadInit = DecodeAFXDPThreadInit;
235 tmm_modules[TMM_DECODEAFXDP].Func = DecodeAFXDP;
237 tmm_modules[TMM_DECODEAFXDP].ThreadDeinit = DecodeAFXDPThreadDeinit;
240}
241
242static inline void AFXDPDumpCounters(AFXDPThreadVars *ptv)
243{
244 struct xdp_statistics stats;
245 socklen_t len = sizeof(struct xdp_statistics);
246 int fd = xsk_socket__fd(ptv->xsk.xsk);
247
248 if (getsockopt(fd, SOL_XDP, XDP_STATISTICS, &stats, &len) >= 0) {
249 uint64_t rx_dropped = stats.rx_dropped + stats.rx_invalid_descs + stats.rx_ring_full;
250
251 StatsAddUI64(ptv->tv, ptv->capture_kernel_drops,
252 rx_dropped - StatsGetLocalCounterValue(ptv->tv, ptv->capture_kernel_drops));
253 StatsAddUI64(ptv->tv, ptv->capture_afxdp_packets, ptv->pkts);
254
255 (void)SC_ATOMIC_SET(ptv->livedev->drop, rx_dropped);
256 (void)SC_ATOMIC_ADD(ptv->livedev->pkts, ptv->pkts);
257
258 SCLogDebug("(%s) Kernel: Packets %" PRIu64 ", bytes %" PRIu64 ", dropped %" PRIu64 "",
259 ptv->tv->name, StatsGetLocalCounterValue(ptv->tv, ptv->capture_afxdp_packets),
260 ptv->bytes, StatsGetLocalCounterValue(ptv->tv, ptv->capture_kernel_drops));
261
262 ptv->pkts = 0;
263 }
264}
265
266/**
267 * \brief Init function for socket creation.
268 *
269 * Mutex used to synchronise initialisation - each socket opens a
270 * different queue. The specific order in which each queue is
271 * opened is not important, but it is vital the queue_num's
272 * are different.
273 *
274 * \param tv pointer to ThreadVars
275 */
277{
278 SCEnter();
279
280 SCMutexInit(&xsk_protect.queue_protect, NULL);
281 SC_ATOMIC_SET(xsk_protect.queue_num, 0);
283}
284
285static TmEcode AFXDPAssignQueueID(AFXDPThreadVars *ptv)
286{
287 if (!ptv->xsk.queue.assigned) {
288 ptv->xsk.queue.queue_num = SC_ATOMIC_GET(xsk_protect.queue_num);
289 SC_ATOMIC_ADD(xsk_protect.queue_num, 1);
290
291 /* Queue only needs assigned once, on startup */
292 ptv->xsk.queue.assigned = true;
293 }
295}
296
297static void AFXDPAllThreadsRunning(AFXDPThreadVars *ptv)
298{
299 SCMutexLock(&xsk_protect.queue_protect);
300 if ((ptv->threads - 1) == (int)ptv->xsk.queue.queue_num) {
301 SCLogDebug("All AF_XDP capture threads are running.");
302 }
303 SCMutexUnlock(&xsk_protect.queue_protect);
304}
305
306static TmEcode AcquireBuffer(AFXDPThreadVars *ptv)
307{
308 int mmap_flags = MAP_PRIVATE | MAP_ANONYMOUS | ptv->umem.mmap_alignment_flag;
309 ptv->umem.buf = mmap(NULL, MEM_BYTES, PROT_READ | PROT_WRITE, mmap_flags, -1, 0);
310
311 if (ptv->umem.buf == MAP_FAILED) {
312 SCLogError("mmap: failed to acquire memory");
314 }
315
317}
318
319static TmEcode ConfigureXSKUmem(AFXDPThreadVars *ptv)
320{
321 if (xsk_umem__create(&ptv->umem.umem, ptv->umem.buf, MEM_BYTES, &ptv->umem.fq, &ptv->umem.cq,
322 &ptv->umem.cfg)) {
323 SCLogError("failed to create umem: %s", strerror(errno));
325 }
326
328}
329
330static TmEcode InitFillRing(AFXDPThreadVars *ptv, const uint32_t cnt)
331{
332 uint32_t idx_fq = 0;
333
334 uint32_t ret = xsk_ring_prod__reserve(&ptv->umem.fq, cnt, &idx_fq);
335 if (ret != cnt) {
336 SCLogError("Failed to initialise the fill ring.");
338 }
339
340 for (uint32_t i = 0; i < cnt; i++) {
341 *xsk_ring_prod__fill_addr(&ptv->umem.fq, idx_fq++) = i * FRAME_SIZE;
342 }
343
344 xsk_ring_prod__submit(&ptv->umem.fq, cnt);
346}
347
348/**
349 * \brief Linux knobs are tuned to enable a NAPI polling context
350 *
351 * \param tv pointer to AFXDPThreadVars
352 */
353static TmEcode WriteLinuxTunables(AFXDPThreadVars *ptv)
354{
355 char fname[SYSFS_MAX_FILENAME_SIZE];
356
357 if (snprintf(fname, SYSFS_MAX_FILENAME_SIZE, "class/net/%s/gro_flush_timeout", ptv->iface) <
358 0) {
360 }
361
362 if (SysFsWriteValue(fname, ptv->gro_flush_timeout) != TM_ECODE_OK) {
364 }
365
366 if (snprintf(fname, SYSFS_MAX_FILENAME_SIZE, "class/net/%s/napi_defer_hard_irqs", ptv->iface) <
367 0) {
369 }
370
371 if (SysFsWriteValue(fname, ptv->napi_defer_hard_irqs) != TM_ECODE_OK) {
373 }
374
376}
377
378static TmEcode ConfigureBusyPolling(AFXDPThreadVars *ptv)
379{
380 if (!ptv->xsk.enable_busy_poll) {
382 }
383
384 /* Kernel version must be >= 5.11 to avail of SO_PREFER_BUSY_POLL
385 * see linux commit: 7fd3253a7de6a317a0683f83739479fb880bffc8
386 */
387 if (!SCKernelVersionIsAtLeast(5, 11)) {
388 SCLogWarning("Kernel version older than required: v5.11,"
389 " upgrade kernel version to use 'enable-busy-poll' option.");
391 }
392
393#if defined SO_PREFER_BUSY_POLL && defined SO_BUSY_POLL && defined SO_BUSY_POLL_BUDGET
394 const int fd = xsk_socket__fd(ptv->xsk.xsk);
395 int sock_opt = 1;
396
397 if (WriteLinuxTunables(ptv) != TM_ECODE_OK) {
399 }
400
401 if (setsockopt(fd, SOL_SOCKET, SO_PREFER_BUSY_POLL, (void *)&sock_opt, sizeof(sock_opt)) < 0) {
403 }
404
405 sock_opt = ptv->xsk.busy_poll_time;
406 if (setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL, (void *)&sock_opt, sizeof(sock_opt)) < 0) {
408 }
409
410 sock_opt = ptv->xsk.busy_poll_budget;
411 if (setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL_BUDGET, (void *)&sock_opt, sizeof(sock_opt)) < 0) {
413 }
414
416#else
418 "Kernel does not support busy poll, upgrade kernel or disable \"enable-busy-poll\".");
420#endif
421}
422
423static void AFXDPSwitchState(AFXDPThreadVars *ptv, int state)
424{
425 ptv->afxdp_state = state;
426}
427
428static TmEcode OpenXSKSocket(AFXDPThreadVars *ptv)
429{
430 int ret;
431
432 SCMutexLock(&xsk_protect.queue_protect);
433
434 if (AFXDPAssignQueueID(ptv) != TM_ECODE_OK) {
435 SCLogError("Failed to assign queue ID");
437 }
438
439 if ((ret = xsk_socket__create(&ptv->xsk.xsk, ptv->livedev->dev, ptv->xsk.queue.queue_num,
440 ptv->umem.umem, &ptv->xsk.rx, &ptv->xsk.tx, &ptv->xsk.cfg))) {
441 SCLogError("Failed to create socket: %s", strerror(-ret));
443 }
444 SCLogDebug("bind to %s on queue %u", ptv->iface, ptv->xsk.queue.queue_num);
445
446 /* For polling and socket options */
447 ptv->xsk.fd.fd = xsk_socket__fd(ptv->xsk.xsk);
448 ptv->xsk.fd.events = POLLIN;
449
450 /* Set state */
451 AFXDPSwitchState(ptv, AFXDP_STATE_UP);
452
453 SCMutexUnlock(&xsk_protect.queue_protect);
455}
456
457static void AFXDPCloseSocket(AFXDPThreadVars *ptv)
458{
459 if (ptv->xsk.xsk) {
460 xsk_socket__delete(ptv->xsk.xsk);
461 ptv->xsk.xsk = NULL;
462 }
463
464 if (ptv->umem.umem) {
465 xsk_umem__delete(ptv->umem.umem);
466 ptv->umem.umem = NULL;
467 }
468
469 memset(&ptv->umem.fq, 0, sizeof(struct xsk_ring_prod));
470 memset(&ptv->umem.cq, 0, sizeof(struct xsk_ring_cons));
471}
472
473static TmEcode AFXDPSocketCreation(AFXDPThreadVars *ptv)
474{
475 if (ConfigureXSKUmem(ptv) != TM_ECODE_OK) {
477 }
478
479 if (InitFillRing(ptv, NUM_FRAMES * 2) != TM_ECODE_OK) {
481 }
482
483 /* Open AF_XDP socket */
484 if (OpenXSKSocket(ptv) != TM_ECODE_OK) {
486 }
487
488 if (ConfigureBusyPolling(ptv) != TM_ECODE_OK) {
489 SCLogWarning("Failed to configure busy polling"
490 " performance may be reduced.");
491 }
492
493 /* Has the eBPF program successfully bound? */
494#ifdef HAVE_BPF_XDP_QUERY_ID
495 if (bpf_xdp_query_id(ptv->ifindex, ptv->xsk.cfg.xdp_flags, &ptv->prog_id)) {
496 SCLogError("Failed to attach eBPF program to interface: %s", ptv->livedev->dev);
498 }
499#else
500 if (bpf_get_link_xdp_id(ptv->ifindex, &ptv->prog_id, ptv->xsk.cfg.xdp_flags)) {
501 SCLogError("Failed to attach eBPF program to interface: %s", ptv->livedev->dev);
503 }
504#endif
505
507}
508
509/**
510 * \brief Try to reopen AF_XDP socket
511 *
512 * \retval: TM_ECODE_OK in case of success
513 * TM_ECODE_FAILED if error occurs or a condition is not met.
514 */
515static TmEcode AFXDPTryReopen(AFXDPThreadVars *ptv)
516{
517 AFXDPCloseSocket(ptv);
518 usleep(RECONNECT_TIMEOUT);
519
520 int if_flags = GetIfaceFlags(ptv->iface);
521 if (if_flags == -1) {
522 SCLogDebug("Couldn't get flags for interface '%s'", ptv->iface);
523 goto sock_err;
524 } else if ((if_flags & (IFF_UP | IFF_RUNNING)) == 0) {
525 SCLogDebug("Interface '%s' is down", ptv->iface);
526 goto sock_err;
527 }
528
529 if (AFXDPSocketCreation(ptv) != TM_ECODE_OK) {
531 }
532
533 SCLogInfo("Interface '%s' is back", ptv->iface);
535
536sock_err:
538}
539
540/**
541 * \brief Write packet entry to the fill ring, freeing
542 * this slot for re/fill with inbound packet descriptor
543 * \param pointer to Packet
544 * \retval: None
545 */
546static void AFXDPReleasePacket(Packet *p)
547{
548 *xsk_ring_prod__fill_addr((struct xsk_ring_prod *)p->afxdp_v.fq, p->afxdp_v.fq_idx) =
549 p->afxdp_v.orig;
550
552}
553
554static inline int DumpStatsEverySecond(AFXDPThreadVars *ptv, time_t *last_dump)
555{
556 int stats_dumped = 0;
557 time_t current_time = time(NULL);
558
559 if (current_time != *last_dump) {
560 AFXDPDumpCounters(ptv);
561 *last_dump = current_time;
562 stats_dumped = 1;
563 }
564
566
567 return stats_dumped;
568}
569
570static inline ssize_t WakeupSocket(void *data)
571{
572 ssize_t res = 0;
573 AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
574
575 /* Assuming kernel >= 5.11 in use if xdp_busy_poll is enabled */
576 if (ptv->xsk.enable_busy_poll || xsk_ring_prod__needs_wakeup(&ptv->umem.fq)) {
577 // cppcheck-suppress nullPointer
578 res = recvfrom(xsk_socket__fd(ptv->xsk.xsk), NULL, 0, MSG_DONTWAIT, NULL, NULL);
579 }
580
581 return res;
582}
583
584/**
585 * \brief Init function for ReceiveAFXDP.
586 *
587 * \param tv pointer to ThreadVars
588 * \param initdata pointer to the interface passed from the user
589 * \param data pointer gets populated with AFPThreadVars
590 *
591 * \todo Create a general AFP setup function.
592 */
593static TmEcode ReceiveAFXDPThreadInit(ThreadVars *tv, const void *initdata, void **data)
594{
595 SCEnter();
596
597 AFXDPIfaceConfig *afxdpconfig = (AFXDPIfaceConfig *)initdata;
598
599 if (initdata == NULL) {
600 SCLogError("initdata == NULL");
602 }
603
604 AFXDPThreadVars *ptv = SCCalloc(1, sizeof(AFXDPThreadVars));
605 if (unlikely(ptv == NULL)) {
606 afxdpconfig->DerefFunc(afxdpconfig);
608 }
609
610 ptv->tv = tv;
611
612 strlcpy(ptv->iface, afxdpconfig->iface, AFXDP_IFACE_NAME_LENGTH);
613 ptv->iface[AFXDP_IFACE_NAME_LENGTH - 1] = '\0';
614 ptv->ifindex = if_nametoindex(ptv->iface);
615
616 ptv->livedev = LiveGetDevice(ptv->iface);
617 if (ptv->livedev == NULL) {
618 SCLogError("Unable to find Live device");
619 SCFree(ptv);
621 }
622
623 ptv->promisc = afxdpconfig->promisc;
624 if (ptv->promisc != 0) {
625 /* Force promiscuous mode */
626 if (SetIfaceFlags(ptv->iface, IFF_PROMISC | IFF_UP) != 0) {
627 SCLogError("Failed to switch interface (%s) to promiscuous, error %s", ptv->iface,
628 strerror(errno));
629 SCFree(ptv);
631 }
632 }
633
634 ptv->threads = afxdpconfig->threads;
635
636 /* Socket configuration */
637 ptv->xsk.cfg.rx_size = NUM_FRAMES_CONS;
638 ptv->xsk.cfg.tx_size = NUM_FRAMES_PROD;
639 ptv->xsk.cfg.xdp_flags = afxdpconfig->mode;
640 ptv->xsk.cfg.bind_flags = afxdpconfig->bind_flags;
641
642 /* UMEM configuration */
643 ptv->umem.cfg.fill_size = NUM_FRAMES_PROD * 2;
644 ptv->umem.cfg.comp_size = NUM_FRAMES_CONS;
645 ptv->umem.cfg.frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE;
646 ptv->umem.cfg.frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM;
647 ptv->umem.cfg.flags = afxdpconfig->mem_alignment;
648
649 /* Use hugepages if unaligned chunk mode */
650 if (ptv->umem.cfg.flags == XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
651 ptv->umem.mmap_alignment_flag = MAP_HUGETLB;
652 }
653
654 /* Busy polling configuration */
655 ptv->xsk.enable_busy_poll = afxdpconfig->enable_busy_poll;
656 ptv->xsk.busy_poll_budget = afxdpconfig->busy_poll_budget;
657 ptv->xsk.busy_poll_time = afxdpconfig->busy_poll_time;
658 ptv->gro_flush_timeout = afxdpconfig->gro_flush_timeout;
659 ptv->napi_defer_hard_irqs = afxdpconfig->napi_defer_hard_irqs;
660
661 /* Stats registration */
662 ptv->capture_afxdp_packets = StatsRegisterCounter("capture.afxdp_packets", ptv->tv);
663 ptv->capture_kernel_drops = StatsRegisterCounter("capture.kernel_drops", ptv->tv);
664 ptv->capture_afxdp_poll = StatsRegisterCounter("capture.afxdp.poll", ptv->tv);
665 ptv->capture_afxdp_poll_timeout = StatsRegisterCounter("capture.afxdp.poll_timeout", ptv->tv);
666 ptv->capture_afxdp_poll_failed = StatsRegisterCounter("capture.afxdp.poll_failed", ptv->tv);
667 ptv->capture_afxdp_empty_reads = StatsRegisterCounter("capture.afxdp.empty_reads", ptv->tv);
668 ptv->capture_afxdp_failed_reads = StatsRegisterCounter("capture.afxdp.failed_reads", ptv->tv);
669 ptv->capture_afxdp_acquire_pkt_failed =
670 StatsRegisterCounter("capture.afxdp.acquire_pkt_failed", ptv->tv);
671
672 /* Reserve memory for umem */
673 if (AcquireBuffer(ptv) != TM_ECODE_OK) {
674 SCFree(ptv);
676 }
677
678 if (AFXDPSocketCreation(ptv) != TM_ECODE_OK) {
679 ReceiveAFXDPThreadDeinit(tv, ptv);
681 }
682
683 *data = (void *)ptv;
684 afxdpconfig->DerefFunc(afxdpconfig);
686}
687
688/**
689 * \brief Main AF_XDP reading Loop function
690 */
691static TmEcode ReceiveAFXDPLoop(ThreadVars *tv, void *data, void *slot)
692{
693 SCEnter();
694
695 Packet *p;
696 time_t last_dump = 0;
697 struct timeval ts;
698 uint32_t idx_rx = 0, idx_fq = 0, rcvd;
699 int r;
700 AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
701 TmSlot *s = (TmSlot *)slot;
702
703 ptv->slot = s->slot_next;
704
705 AFXDPAllThreadsRunning(ptv);
706
707 // Indicate that the thread is actually running its application level code (i.e., it can poll
708 // packets)
710
712 while (1) {
713 /* Start by checking the state of our interface */
714 if (unlikely(ptv->afxdp_state == AFXDP_STATE_DOWN)) {
715 do {
716 usleep(RECONNECT_TIMEOUT);
717 if (unlikely(suricata_ctl_flags != 0)) {
718 break;
719 }
720 r = AFXDPTryReopen(ptv);
721 } while (r != TM_ECODE_OK);
722 }
723
724 if (unlikely(suricata_ctl_flags != 0)) {
725 SCLogDebug("Stopping Suricata!");
726 AFXDPDumpCounters(ptv);
727 break;
728 }
729
730 /* Busy polling is not set, using poll() to maintain (relatively) decent
731 * performance. xdp_busy_poll must be disabled for kernels < 5.11
732 */
733 if (!ptv->xsk.enable_busy_poll) {
734 StatsIncr(ptv->tv, ptv->capture_afxdp_poll);
735
736 r = poll(&ptv->xsk.fd, 1, POLL_TIMEOUT);
737
738 /* Report poll results */
739 if (r <= 0) {
740 if (r == 0) {
741 StatsIncr(ptv->tv, ptv->capture_afxdp_poll_timeout);
742 } else if (r < 0) {
743 StatsIncr(ptv->tv, ptv->capture_afxdp_poll_failed);
744 SCLogWarning("poll failed with retval %d", r);
745 AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
746 }
747
748 DumpStatsEverySecond(ptv, &last_dump);
749 continue;
750 }
751 }
752
753 rcvd = xsk_ring_cons__peek(&ptv->xsk.rx, ptv->xsk.busy_poll_budget, &idx_rx);
754 if (!rcvd) {
755 StatsIncr(ptv->tv, ptv->capture_afxdp_empty_reads);
756 ssize_t ret = WakeupSocket(ptv);
757 if (ret < 0) {
758 SCLogWarning("recv failed with retval %ld", ret);
759 AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
760 }
761 DumpStatsEverySecond(ptv, &last_dump);
762 continue;
763 }
764
765 uint32_t res = xsk_ring_prod__reserve(&ptv->umem.fq, rcvd, &idx_fq);
766 while (res != rcvd) {
767 StatsIncr(ptv->tv, ptv->capture_afxdp_failed_reads);
768 ssize_t ret = WakeupSocket(ptv);
769 if (ret < 0) {
770 SCLogWarning("recv failed with retval %ld", ret);
771 AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
772 continue;
773 }
774 res = xsk_ring_prod__reserve(&ptv->umem.fq, rcvd, &idx_fq);
775 }
776
777 gettimeofday(&ts, NULL);
778 ptv->pkts += rcvd;
779 for (uint32_t i = 0; i < rcvd; i++) {
781 if (unlikely(p == NULL)) {
782 StatsIncr(ptv->tv, ptv->capture_afxdp_acquire_pkt_failed);
783 continue;
784 }
785
788 p->livedev = ptv->livedev;
789 p->ReleasePacket = AFXDPReleasePacket;
791
793
794 uint64_t addr = xsk_ring_cons__rx_desc(&ptv->xsk.rx, idx_rx)->addr;
795 uint32_t len = xsk_ring_cons__rx_desc(&ptv->xsk.rx, idx_rx++)->len;
796 uint64_t orig = xsk_umem__extract_addr(addr);
797 addr = xsk_umem__add_offset_to_addr(addr);
798
799 uint8_t *pkt_data = xsk_umem__get_data(ptv->umem.buf, addr);
800
801 ptv->bytes += len;
802
803 p->afxdp_v.fq_idx = idx_fq++;
804 p->afxdp_v.orig = orig;
805 p->afxdp_v.fq = &ptv->umem.fq;
806
807 PacketSetData(p, pkt_data, len);
808
809 if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
810 TmqhOutputPacketpool(ptv->tv, p);
811 SCReturnInt(EXIT_FAILURE);
812 }
813 }
814
815 xsk_ring_prod__submit(&ptv->umem.fq, rcvd);
816 xsk_ring_cons__release(&ptv->xsk.rx, rcvd);
817
818 /* Trigger one dump of stats every second */
819 DumpStatsEverySecond(ptv, &last_dump);
820 }
821
823}
824
825/**
826 * \brief function to unload an AF_XDP program
827 *
828 */
829static void RunModeAFXDPRemoveProg(char *iface_name)
830{
831 unsigned int ifindex = if_nametoindex(iface_name);
832
833 struct xdp_multiprog *progs = xdp_multiprog__get_from_ifindex(ifindex);
834 if (progs == NULL) {
835 return;
836 }
837 enum xdp_attach_mode mode = xdp_multiprog__attach_mode(progs);
838
839 struct xdp_program *prog = NULL;
840
841 // loop through the multiprogram struct, removing all the programs
842 for (prog = xdp_multiprog__next_prog(NULL, progs); prog;
843 prog = xdp_multiprog__next_prog(prog, progs)) {
844 int ret = xdp_program__detach(prog, ifindex, mode, 0);
845 if (ret) {
846 SCLogDebug("Error: cannot detatch XDP program: %s\n", strerror(errno));
847 }
848 }
849
850 prog = xdp_multiprog__main_prog(progs);
851 if (xdp_program__is_attached(prog, ifindex) != XDP_MODE_UNSPEC) {
852 int ret = xdp_program__detach(prog, ifindex, mode, 0);
853 if (ret) {
854 SCLogDebug("Error: cannot detatch XDP program: %s\n", strerror(errno));
855 }
856 }
857}
858
859/**
860 * \brief DeInit function closes af-xdp socket at exit.
861 * \param tv pointer to ThreadVars
862 * \param data pointer that gets cast into AFXDPPThreadVars for ptv
863 */
864static SCMutex sync_deinit = SCMUTEX_INITIALIZER;
865
866static TmEcode ReceiveAFXDPThreadDeinit(ThreadVars *tv, void *data)
867{
868 AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
869
870 /*
871 * If AF_XDP is enabled, the program must be detached before the AF_XDP sockets
872 * are closed to mitigate a bug that causes an IO_PAGEFAULT in linux kernel
873 * version 5.19, unknown as of now what other versions this affects.
874 */
875 SCMutexLock(&sync_deinit);
876 RunModeAFXDPRemoveProg(ptv->iface);
877 SCMutexUnlock(&sync_deinit);
878
879 if (ptv->xsk.xsk) {
880 xsk_socket__delete(ptv->xsk.xsk);
881 ptv->xsk.xsk = NULL;
882 }
883
884 if (ptv->umem.umem) {
885 xsk_umem__delete(ptv->umem.umem);
886 ptv->umem.umem = NULL;
887 }
888 munmap(ptv->umem.buf, MEM_BYTES);
889
890 SCFree(ptv);
892}
893
894/**
895 * \brief This function prints stats to the screen at exit.
896 * \param tv pointer to ThreadVars
897 * \param data pointer that gets cast into AFXDPThreadVars for ptv
898 */
899static void ReceiveAFXDPThreadExitStats(ThreadVars *tv, void *data)
900{
901 SCEnter();
902 AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
903
904 AFXDPDumpCounters(ptv);
905
906 SCLogPerf("(%s) Kernel: Packets %" PRIu64 ", bytes %" PRIu64 ", dropped %" PRIu64 "", tv->name,
907 StatsGetLocalCounterValue(tv, ptv->capture_afxdp_packets), ptv->bytes,
908 StatsGetLocalCounterValue(tv, ptv->capture_kernel_drops));
909}
910
911/**
912 * \brief This function passes off to link type decoders.
913 *
914 * DecodeAFXDP decodes packets from AF_XDP and passes
915 * them off to the proper link type decoder.
916 *
917 * \param t pointer to ThreadVars
918 * \param p pointer to the current packet
919 * \param data pointer that gets cast into AFXDPThreadVars for ptv
920 */
921static TmEcode DecodeAFXDP(ThreadVars *tv, Packet *p, void *data)
922{
923 SCEnter();
924
926
928
929 /* update counters */
931
932 /* If suri has set vlan during reading, we increase vlan counter */
933 if (p->vlan_idx) {
935 }
936
937 /* call the decoder */
938 DecodeLinkLayer(tv, dtv, p->datalink, p, GET_PKT_DATA(p), GET_PKT_LEN(p));
939
941
943}
944
945static TmEcode DecodeAFXDPThreadInit(ThreadVars *tv, const void *initdata, void **data)
946{
947 SCEnter();
949 if (dtv == NULL)
951
953
954 *data = (void *)dtv;
955
957}
958
959static TmEcode DecodeAFXDPThreadDeinit(ThreadVars *tv, void *data)
960{
961 if (data != NULL)
964}
965
966#endif /* HAVE_AF_XDP */
967/* eof */
968/**
969 * @}
970 */
uint8_t len
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition counters.c:952
void StatsSyncCountersIfSignalled(ThreadVars *tv)
Definition counters.c:450
uint64_t StatsGetLocalCounterValue(ThreadVars *tv, uint16_t id)
Get the value of the local copy of the counter that hold this id.
Definition counters.c:1255
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition counters.c:166
void StatsAddUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Adds a value of type uint64_t to the local counter.
Definition counters.c:146
#define PKT_SET_SRC(p, src_val)
Definition decode.h:1325
#define GET_PKT_DATA(p)
Definition decode.h:209
#define GET_PKT_LEN(p)
Definition decode.h:208
#define PKT_IS_PSEUDOPKT(p)
return 1 if the packet is a pseudo packet
Definition decode.h:1321
@ PKT_SRC_WIRE
Definition decode.h:52
#define PKT_IGNORE_CHECKSUM
Definition decode.h:1282
DecodeThreadVars * dtv
ThreadVars * tv
#define POLL_TIMEOUT
void TmModuleReceiveAFXDPRegister(void)
TmEcode NoAFXDPSupportExit(ThreadVars *, const void *, void **)
this function prints an error message and exits.
void TmModuleDecodeAFXDPRegister(void)
Registration Function for DecodeAFXDP.
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition decode.c:293
void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
Definition decode.c:628
void PacketDecodeFinalize(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
Finalize decoding of a packet.
Definition decode.c:232
DecodeThreadVars * DecodeThreadVarsAlloc(ThreadVars *tv)
Alloc and setup DecodeThreadVars.
Definition decode.c:804
void DecodeThreadVarsFree(ThreadVars *tv, DecodeThreadVars *dtv)
Definition decode.c:822
void DecodeUpdatePacketCounters(ThreadVars *tv, const DecodeThreadVars *dtv, const Packet *p)
Definition decode.c:770
int PacketSetData(Packet *p, const uint8_t *pktdata, uint32_t pktlen)
Set data for Packet and set length when zero copy is used.
Definition decode.c:842
void PacketFreeOrRelease(Packet *p)
Return a packet to where it was allocated.
Definition decode.c:276
TmEcode AFXDPQueueProtectionInit(void)
#define AFXDP_IFACE_NAME_LENGTH
uint64_t ts
void(* DerefFunc)(void *)
uint32_t busy_poll_budget
uint32_t busy_poll_time
uint32_t gro_flush_timeout
uint32_t napi_defer_hard_irqs
char iface[AFXDP_IFACE_NAME_LENGTH]
Structure to hold thread specific data for all decode modules.
Definition decode.h:963
uint16_t counter_vlan
Definition decode.h:1001
SCTime_t ts
Definition decode.h:555
int datalink
Definition decode.h:639
struct LiveDevice_ * livedev
Definition decode.h:618
void(* ReleasePacket)(struct Packet_ *)
Definition decode.h:591
uint32_t flags
Definition decode.h:544
uint8_t vlan_idx
Definition decode.h:529
Per thread variable structure.
Definition threadvars.h:58
char name[16]
Definition threadvars.h:65
const char * name
Definition tm-modules.h:48
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition tm-modules.h:53
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition tm-modules.h:52
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition tm-modules.h:61
uint8_t cap_flags
Definition tm-modules.h:77
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition tm-modules.h:56
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition tm-modules.h:58
uint8_t flags
Definition tm-modules.h:80
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition tm-modules.h:51
struct TmSlot_ * slot_next
Definition tm-threads.h:62
size_t strlcpy(char *dst, const char *src, size_t siz)
volatile uint8_t suricata_ctl_flags
Definition suricata.c:172
#define SCMUTEX_INITIALIZER
#define SCMutex
#define SCMutexUnlock(mut)
#define SCMutexInit(mut, mutattrs)
#define SCMutexLock(mut)
#define THV_RUNNING
Definition threadvars.h:55
TmModule tmm_modules[TMM_SIZE]
Definition tm-modules.c:29
#define TM_FLAG_RECEIVE_TM
Definition tm-modules.h:32
#define TM_FLAG_DECODE_TM
Definition tm-modules.h:33
@ TMM_RECEIVEAFXDP
@ TMM_DECODEAFXDP
@ TM_ECODE_FAILED
@ TM_ECODE_OK
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition tm-threads.c:101
void PacketPoolWait(void)
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
uint32_t cnt
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
#define SC_ATOMIC_DECLARE(type, name)
wrapper for declaring atomic variables.
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
#define SCEnter(...)
Definition util-debug.h:277
#define SCLogPerf(...)
Definition util-debug.h:234
#define SCLogDebug(...)
Definition util-debug.h:275
#define SCReturnInt(x)
Definition util-debug.h:281
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition util-debug.h:255
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition util-debug.h:225
#define SCLogError(...)
Macro used to log ERROR messages.
Definition util-debug.h:267
LiveDevice * LiveGetDevice(const char *name)
Get a pointer to the device at idx.
int SCKernelVersionIsAtLeast(int major, int minor)
#define SCFree(p)
Definition util-mem.h:61
#define SCCalloc(nm, sz)
Definition util-mem.h:53
#define unlikely(expr)
#define SC_CAP_NET_RAW
Definition util-privs.h:32
TmEcode SysFsWriteValue(const char *path, int64_t value)
Definition util-sysfs.c:28
#define SYSFS_MAX_FILENAME_SIZE
Definition util-sysfs.h:32
#define SCTIME_FROM_TIMEVAL(tv)
Definition util-time.h:79
#define DEBUG_VALIDATE_BUG_ON(exp)