suricata
tmqh-packetpool.c
Go to the documentation of this file.
1/* Copyright (C) 2007-2022 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18/**
19 * \file
20 *
21 * \author Victor Julien <victor@inliniac.net>
22 *
23 * Packetpool queue handlers. Packet pool is implemented as a stack.
24 */
25
26#include "suricata-common.h"
27#include "tmqh-packetpool.h"
28#include "tm-queuehandlers.h"
29#include "tm-threads.h"
30#include "threads.h"
31#include "decode.h"
32#include "tm-modules.h"
33#include "packet.h"
34#include "util-profiling.h"
35#include "util-validate.h"
36#include "action-globals.h"
37
38extern uint32_t max_pending_packets;
39
40/* Number of freed packet to save for one pool before freeing them. */
41#define MAX_PENDING_RETURN_PACKETS 32
42static uint32_t max_pending_return_packets = MAX_PENDING_RETURN_PACKETS;
43
45
46static inline PktPool *GetThreadPacketPool(void)
47{
48 return &thread_pkt_pool;
49}
50
51/**
52 * \brief TmqhPacketpoolRegister
53 * \initonly
54 */
61
62static void UpdateReturnThreshold(PktPool *pool)
63{
64 const float perc = (float)pool->cnt / (float)max_pending_packets;
65 uint32_t threshold = (uint32_t)(perc * (float)max_pending_return_packets);
66 if (threshold != SC_ATOMIC_GET(pool->return_stack.return_threshold)) {
67 SC_ATOMIC_SET(pool->return_stack.return_threshold, threshold);
68 }
69}
70
72{
73 PktPool *my_pool = GetThreadPacketPool();
74
75 if (my_pool->head == NULL) {
76 SC_ATOMIC_SET(my_pool->return_stack.return_threshold, 1);
77
78 SCMutexLock(&my_pool->return_stack.mutex);
79 int rc = 0;
80 while (my_pool->return_stack.cnt == 0 && rc == 0) {
81 rc = SCCondWait(&my_pool->return_stack.cond, &my_pool->return_stack.mutex);
82 }
83 SCMutexUnlock(&my_pool->return_stack.mutex);
84
85 UpdateReturnThreshold(my_pool);
86 }
87}
88
89/** \brief a initialized packet
90 *
91 * \warning Use *only* at init, not at packet runtime
92 */
93static void PacketPoolStorePacket(Packet *p)
94{
95 p->pool = GetThreadPacketPool();
98}
99
100static void PacketPoolGetReturnedPackets(PktPool *pool)
101{
102 SCMutexLock(&pool->return_stack.mutex);
103 /* Move all the packets from the locked return stack to the local stack. */
104 pool->head = pool->return_stack.head;
105 pool->return_stack.head = NULL;
106 pool->cnt += pool->return_stack.cnt;
107 pool->return_stack.cnt = 0;
108 SCMutexUnlock(&pool->return_stack.mutex);
109}
110
111/** \brief Get a new packet from the packet pool
112 *
113 * Only allocates from the thread's local stack, or mallocs new packets.
114 * If the local stack is empty, first move all the return stack packets to
115 * the local stack.
116 * \retval Packet pointer, or NULL on failure.
117 */
119{
120 PktPool *pool = GetThreadPacketPool();
121 DEBUG_VALIDATE_BUG_ON(pool->initialized == 0);
122 DEBUG_VALIDATE_BUG_ON(pool->destroyed == 1);
123 if (pool->head) {
124 /* Stack is not empty. */
125 Packet *p = pool->head;
126 pool->head = p->next;
127 pool->cnt--;
128 p->pool = pool;
129 PacketReinit(p);
130
131 UpdateReturnThreshold(pool);
132 SCLogDebug("pp: %0.2f cnt:%u max:%d threshold:%u",
133 ((float)pool->cnt / (float)max_pending_packets) * (float)100, pool->cnt,
134 max_pending_packets, SC_ATOMIC_GET(pool->return_stack.return_threshold));
135 return p;
136 }
137
138 /* Local Stack is empty, so check the return stack, which requires
139 * locking. */
140 PacketPoolGetReturnedPackets(pool);
141
142 /* Try to allocate again. Need to check for not empty again, since the
143 * return stack might have been empty too.
144 */
145 if (pool->head) {
146 /* Stack is not empty. */
147 Packet *p = pool->head;
148 pool->head = p->next;
149 pool->cnt--;
150 p->pool = pool;
151 PacketReinit(p);
152
153 UpdateReturnThreshold(pool);
154 SCLogDebug("pp: %0.2f cnt:%u max:%d threshold:%u",
155 ((float)pool->cnt / (float)max_pending_packets) * (float)100, pool->cnt,
156 max_pending_packets, SC_ATOMIC_GET(pool->return_stack.return_threshold));
157 return p;
158 }
159
160 /* Failed to allocate a packet, so return NULL. */
161 /* Optionally, could allocate a new packet here. */
162 return NULL;
163}
164
165/** \brief Return packet to Packet pool
166 *
167 */
169{
170 PktPool *my_pool = GetThreadPacketPool();
171 PktPool *pool = p->pool;
172 if (pool == NULL) {
173 PacketFree(p);
174 return;
175 }
176
178
179#ifdef DEBUG_VALIDATION
180 BUG_ON(pool->initialized == 0);
181 BUG_ON(pool->destroyed == 1);
182 BUG_ON(my_pool->initialized == 0);
183 BUG_ON(my_pool->destroyed == 1);
184#endif /* DEBUG_VALIDATION */
185
186 if (pool == my_pool) {
187 /* Push back onto this thread's own stack, so no locking. */
188 p->next = my_pool->head;
189 my_pool->head = p;
190 my_pool->cnt++;
191 } else {
192 PktPool *pending_pool = my_pool->pending_pool;
193 if (pending_pool == NULL || pending_pool == pool) {
194 if (pending_pool == NULL) {
195 /* No pending packet, so store the current packet. */
196 p->next = NULL;
197 my_pool->pending_pool = pool;
198 my_pool->pending_head = p;
199 my_pool->pending_tail = p;
200 my_pool->pending_count = 1;
201 } else if (pending_pool == pool) {
202 /* Another packet for the pending pool list. */
203 p->next = my_pool->pending_head;
204 my_pool->pending_head = p;
205 my_pool->pending_count++;
206 }
207
208 const uint32_t threshold = SC_ATOMIC_GET(pool->return_stack.return_threshold);
209 if (my_pool->pending_count >= threshold) {
210 /* Return the entire list of pending packets. */
211 SCMutexLock(&pool->return_stack.mutex);
212 my_pool->pending_tail->next = pool->return_stack.head;
213 pool->return_stack.head = my_pool->pending_head;
214 pool->return_stack.cnt += my_pool->pending_count;
215 SCCondSignal(&pool->return_stack.cond);
216 SCMutexUnlock(&pool->return_stack.mutex);
217 /* Clear the list of pending packets to return. */
218 my_pool->pending_pool = NULL;
219 my_pool->pending_head = NULL;
220 my_pool->pending_tail = NULL;
221 my_pool->pending_count = 0;
222 }
223 } else {
224 /* Push onto return stack for this pool */
225 SCMutexLock(&pool->return_stack.mutex);
226 p->next = pool->return_stack.head;
227 pool->return_stack.head = p;
228 pool->return_stack.cnt++;
229 SCCondSignal(&pool->return_stack.cond);
230 SCMutexUnlock(&pool->return_stack.mutex);
231 }
232 }
233}
234
236{
237 PktPool *my_pool = GetThreadPacketPool();
238
239#ifdef DEBUG_VALIDATION
240 BUG_ON(my_pool->initialized);
241 my_pool->initialized = 1;
242 my_pool->destroyed = 0;
243#endif /* DEBUG_VALIDATION */
244
245 SCMutexInit(&my_pool->return_stack.mutex, NULL);
246 SCCondInit(&my_pool->return_stack.cond, NULL);
247 SC_ATOMIC_INIT(my_pool->return_stack.return_threshold);
248 SC_ATOMIC_SET(my_pool->return_stack.return_threshold, 32);
249
250 /* pre allocate packets */
251 SCLogDebug("preallocating packets... packet size %" PRIuMAX "",
252 (uintmax_t)SIZE_OF_PACKET);
253 for (uint32_t i = 0; i < max_pending_packets; i++) {
255 if (unlikely(p == NULL)) {
256 FatalError("Fatal error encountered while allocating a packet. Exiting...");
257 }
258 PacketPoolStorePacket(p);
259 }
260
261 //SCLogInfo("preallocated %"PRIiMAX" packets. Total memory %"PRIuMAX"",
262 // max_pending_packets, (uintmax_t)(max_pending_packets*SIZE_OF_PACKET));
263}
264
266{
267 Packet *p = NULL;
268 PktPool *my_pool = GetThreadPacketPool();
269
270#ifdef DEBUG_VALIDATION
271 BUG_ON(my_pool && my_pool->destroyed);
272#endif /* DEBUG_VALIDATION */
273
274 if (my_pool && my_pool->pending_pool != NULL) {
275 p = my_pool->pending_head;
276 while (p) {
277 Packet *next_p = p->next;
278 PacketFree(p);
279 p = next_p;
280 my_pool->pending_count--;
281 }
282#ifdef DEBUG_VALIDATION
283 BUG_ON(my_pool->pending_count);
284#endif /* DEBUG_VALIDATION */
285 my_pool->pending_pool = NULL;
286 my_pool->pending_head = NULL;
287 my_pool->pending_tail = NULL;
288 }
289
290 while ((p = PacketPoolGetPacket()) != NULL) {
291 PacketFree(p);
292 }
293
294#ifdef DEBUG_VALIDATION
295 my_pool->initialized = 0;
296 my_pool->destroyed = 1;
297#endif /* DEBUG_VALIDATION */
298}
299
304
306{
307 bool proot = false;
308
309 SCEnter();
310 SCLogDebug("Packet %p, p->root %p, alloced %s", p, p->root, BOOL2STR(p->pool == NULL));
311
312 if (PacketIsTunnel(p)) {
313 SCLogDebug("Packet %p is a tunnel packet: %s",
314 p,p->root ? "upper layer" : "tunnel root");
315
316 /* get a lock to access root packet fields */
319
320 if (PacketIsTunnelRoot(p)) {
321 SCLogDebug("IS_TUNNEL_ROOT_PKT == TRUE");
322 CaptureStatsUpdate(t, p); // TODO move out of lock
323
324 const uint16_t outstanding = TUNNEL_PKT_TPR(p) - TUNNEL_PKT_RTV(p);
325 SCLogDebug("root pkt: outstanding %u", outstanding);
326 if (outstanding == 0) {
327 SCLogDebug("no tunnel packets outstanding, no more tunnel "
328 "packet(s) depending on this root");
329 /* if this packet is the root and there are no
330 * more tunnel packets to consider
331 *
332 * return it to the pool */
333 } else {
334 SCLogDebug("tunnel root Packet %p: outstanding > 0, so "
335 "packets are still depending on this root, setting "
336 "SET_TUNNEL_PKT_VERDICTED", p);
337 /* if this is the root and there are more tunnel
338 * packets, return this to the pool. It's still referenced
339 * by the tunnel packets, and we will return it
340 * when we handle them */
341 PacketTunnelSetVerdicted(p);
342
345 SCReturn;
346 }
347 } else {
348 SCLogDebug("NOT IS_TUNNEL_ROOT_PKT, so tunnel pkt");
349
351 const uint16_t outstanding = TUNNEL_PKT_TPR(p) - TUNNEL_PKT_RTV(p);
352 SCLogDebug("tunnel pkt: outstanding %u", outstanding);
353 /* all tunnel packets are processed except us. Root already
354 * processed. So return tunnel pkt and root packet to the
355 * pool. */
356 if (outstanding == 0 && p->root && PacketTunnelIsVerdicted(p->root)) {
357 SCLogDebug("root verdicted == true && no outstanding");
358
359 /* handle freeing the root as well*/
360 SCLogDebug("setting proot = 1 for root pkt, p->root %p "
361 "(tunnel packet %p)", p->root, p);
362 proot = true;
363
364 /* fall through */
365
366 } else {
367 /* root not ready yet, or not the last tunnel packet,
368 * so get rid of the tunnel pkt only */
369
370 SCLogDebug("NOT IS_TUNNEL_PKT_VERDICTED (%s) || "
371 "outstanding > 0 (%u)",
372 (p->root && PacketTunnelIsVerdicted(p->root)) ? "true" : "false",
373 outstanding);
374
375 /* fall through */
376 }
377 }
379
380 SCLogDebug("tunnel stuff done, move on (proot %d)", proot);
381
382 } else {
383 CaptureStatsUpdate(t, p);
384 }
385
386 SCLogDebug("[packet %p][%s] %s", p,
387 PacketIsTunnel(p) ? PacketIsTunnelRoot(p) ? "tunnel::root" : "tunnel::leaf"
388 : "no tunnel",
389 (p->action & ACTION_DROP) ? "DROP" : "no drop");
390
391 /* we're done with the tunnel root now as well */
392 if (proot) {
393 SCLogDebug("getting rid of root pkt... alloc'd %s", BOOL2STR(p->root->pool == NULL));
394
396 p->root->ReleasePacket(p->root);
397 p->root = NULL;
398 }
399
401
403 p->ReleasePacket(p);
404
405 SCReturn;
406}
407
408/**
409 * \brief Release all the packets in the queue back to the packetpool. Mainly
410 * used by threads that have failed, and wants to return the packets back
411 * to the packetpool.
412 *
413 * \param pq Pointer to the packetqueue from which the packets have to be
414 * returned back to the packetpool
415 *
416 * \warning this function assumes that the pq does not use locking
417 */
419{
420 Packet *p = NULL;
421
422 if (pq == NULL)
423 return;
424
425 while ((p = PacketDequeue(pq)) != NULL) {
426 DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
427 TmqhOutputPacketpool(NULL, p);
428 }
429}
430
431/** number of packets to keep reserved when calculating the pending
432 * return packets count. This assumes we need at max 10 packets in one
433 * PacketPoolWaitForN call. The actual number is 9 now, so this has a
434 * bit of margin. */
435#define RESERVED_PACKETS 10
436
437/**
438 * \brief Set the max_pending_return_packets value
439 *
440 * Set it to the max pending packets value, divided by the number
441 * of lister threads. Normally, in autofp these are the stream/detect/log
442 * worker threads.
443 *
444 * The max_pending_return_packets value needs to stay below the packet
445 * pool size of the 'producers' (normally pkt capture threads but also
446 * flow timeout injection ) to avoid a deadlock where all the 'workers'
447 * keep packets in their return pools, while the capture thread can't
448 * continue because its pool is empty.
449 */
451{
452 extern uint32_t max_pending_packets;
453 uint32_t pending_packets = max_pending_packets;
454 if (pending_packets < RESERVED_PACKETS) {
455 FatalError("'max-pending-packets' setting "
456 "must be at least %d",
458 }
460 if (threads == 0)
461 return;
462
463 uint32_t packets = (pending_packets / threads) - 1;
464 if (packets < max_pending_return_packets)
465 max_pending_return_packets = packets;
466
467 /* make sure to have a margin in the return logic */
468 if (max_pending_return_packets >= RESERVED_PACKETS)
469 max_pending_return_packets -= RESERVED_PACKETS;
470
471 SCLogDebug("detect threads %u, max packets %u, max_pending_return_packets %u",
472 threads, packets, max_pending_return_packets);
473}
#define ACTION_DROP
#define TUNNEL_PKT_RTV(p)
Definition decode.h:1089
#define TUNNEL_PKT_TPR(p)
Definition decode.h:1090
#define TUNNEL_INCR_PKT_RTV_NOLOCK(p)
Definition decode.h:1077
#define SIZE_OF_PACKET
Definition decode.h:703
ThreadVars * tv
uint32_t max_pending_packets
Definition suricata.c:183
void CaptureStatsUpdate(ThreadVars *tv, const Packet *p)
Definition decode.c:1014
Packet * PacketGetFromAlloc(void)
Get a malloced packet.
Definition decode.c:258
void PacketFree(Packet *p)
Return a malloced packet.
Definition decode.c:219
HRLOCK_TYPE lock
Definition host.h:0
Packet * PacketDequeue(PacketQueue *q)
void PacketReleaseRefs(Packet *p)
Definition packet.c:70
void PacketReinit(Packet *p)
Recycle a packet structure for reuse.
Definition packet.c:80
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
struct PktPool_ * pool
Definition decode.h:670
uint8_t action
Definition decode.h:609
SCSpinlock tunnel_lock
Definition decode.h:683
struct Flow_ * flow
Definition decode.h:546
struct Packet_::@39 persistent
struct Packet_ * root
Definition decode.h:653
struct Packet_ * next
Definition decode.h:635
void(* ReleasePacket)(struct Packet_ *)
Definition decode.h:591
PktPoolLockedStack return_stack
uint32_t cnt
Packet * pending_tail
Packet * head
Packet * pending_head
struct PktPool_ * pending_pool
uint32_t pending_count
Per thread variable structure.
Definition threadvars.h:58
const char * name
Packet *(* InHandler)(ThreadVars *)
void(* OutHandler)(ThreadVars *, Packet *)
#define BUG_ON(x)
#define SCCondWait
#define SCCondInit
#define SCSpinlock
#define SCSpinUnlock
#define SCMutexUnlock(mut)
#define SCSpinLock
#define SCMutexInit(mut, mutattrs)
#define SCCondSignal
#define SCMutexLock(mut)
#define TM_FLAG_FLOWWORKER_TM
Definition tm-modules.h:34
Tmqh tmqh_table[TMQH_SIZE]
@ TMQH_PACKETPOOL
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
void PacketPoolWait(void)
void PacketPoolDestroy(void)
Packet * PacketPoolGetPacket(void)
Get a new packet from the packet pool.
thread_local PktPool thread_pkt_pool
Packet * TmqhInputPacketpool(ThreadVars *tv)
void PacketPoolInit(void)
void TmqhPacketpoolRegister(void)
TmqhPacketpoolRegister \initonly.
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
void TmqhReleasePacketsToPacketPool(PacketQueue *pq)
Release all the packets in the queue back to the packetpool. Mainly used by threads that have failed,...
#define RESERVED_PACKETS
void PacketPoolPostRunmodes(void)
Set the max_pending_return_packets value.
void PacketPoolReturnPacket(Packet *p)
Return packet to Packet pool.
#define MAX_PENDING_RETURN_PACKETS
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
#define SCEnter(...)
Definition util-debug.h:277
#define FatalError(...)
Definition util-debug.h:510
#define BOOL2STR(b)
Definition util-debug.h:535
#define SCLogDebug(...)
Definition util-debug.h:275
#define SCReturn
Definition util-debug.h:279
#define unlikely(expr)
#define PACKET_PROFILING_END(p)
#define DEBUG_VALIDATE_BUG_ON(exp)