suricata
tm-threads.h
Go to the documentation of this file.
1/* Copyright (C) 2007-2024 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18/**
19 * \file
20 *
21 * \author Victor Julien <victor@inliniac.net>
22 * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23 */
24
25#ifndef SURICATA_TM_THREADS_H
26#define SURICATA_TM_THREADS_H
27
28#include "tmqh-packetpool.h"
29#include "tm-threads-common.h"
30#include "tm-modules.h"
31#include "flow.h" // for the FlowQueue
32
33#ifdef OS_WIN32
34static inline void SleepUsec(uint64_t usec)
35{
36 uint64_t msec = 1;
37 if (usec > 1000) {
38 msec = usec / 1000;
39 }
40 Sleep(msec);
41}
42#define SleepMsec(msec) Sleep((msec))
43#else
44#define SleepUsec(usec) usleep((usec))
45#define SleepMsec(msec) usleep((msec) * 1000)
46#endif
47
48#define TM_QUEUE_NAME_MAX 16
49#define TM_THREAD_NAME_MAX 16
50
51typedef TmEcode (*TmSlotFunc)(ThreadVars *, Packet *, void *);
52
53typedef struct TmSlot_ {
54 /* function pointers */
55 union {
57 TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);
59 };
60 /** linked list of slots, used when a pipeline has multiple slots
61 * in a single thread. */
63
64 SC_ATOMIC_DECLARE(void *, slot_data);
65
66 /** copy of the TmModule::flags */
67 uint8_t tm_flags;
68
69 /* store the thread module id */
70 int tm_id;
71
72 TmEcode (*SlotThreadInit)(ThreadVars *, const void *, void **);
75
76 /* data storage */
77 const void *slot_initdata;
78
80
82
84
85void TmSlotSetFuncAppend(ThreadVars *, TmModule *, const void *);
86
87ThreadVars *TmThreadCreate(const char *, const char *, const char *, const char *, const char *, const char *,
88 void *(fn_p)(void *), int);
89ThreadVars *TmThreadCreatePacketHandler(const char *, const char *, const char *, const char *, const char *,
90 const char *);
91ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int);
92ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
93 int mucond);
94ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
95 int mucond);
99void TmThreadKillThreadsFamily(int family);
100void TmThreadKillThreads(void);
101void TmThreadClearThreadsFamily(int family);
102void TmThreadAppend(ThreadVars *, int);
103void TmThreadSetGroupName(ThreadVars *tv, const char *name);
104
110int TmThreadGetNbThreads(uint8_t type);
111
114void TmThreadContinueThreads(void);
115void TmThreadCheckThreadState(void);
117
118int TmThreadsCheckFlag(ThreadVars *, uint32_t);
119void TmThreadsSetFlag(ThreadVars *, uint32_t);
120void TmThreadsUnsetFlag(ThreadVars *, uint32_t);
121void TmThreadWaitForFlag(ThreadVars *, uint32_t);
122
124
126 const uint16_t set, const uint16_t check, const uint8_t module_flags);
128
129uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags);
130
132
134 ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot);
135
136static inline void TmThreadsCleanDecodePQ(PacketQueueNoLock *pq)
137{
138 while (1) {
140 if (unlikely(p == NULL))
141 break;
142 TmqhOutputPacketpool(NULL, p);
143 }
144}
145
146static inline void TmThreadsSlotProcessPktFail(ThreadVars *tv, Packet *p)
147{
148 if (p != NULL) {
150 }
151 TmThreadsCleanDecodePQ(&tv->decode_pq);
152 if (tv->stream_pq_local) {
156 }
158}
159
160/**
161 * \brief Handle timeout from the capture layer. Checks
162 * stream_pq which may have been filled by the flow
163 * manager.
164 * \param s pipeline to run on these packets.
165 */
166static inline bool TmThreadsHandleInjectedPackets(ThreadVars *tv)
167{
169 if (pq && pq->len > 0) {
170 while (1) {
171 SCMutexLock(&pq->mutex_q);
172 Packet *extra_p = PacketDequeue(pq);
174 if (extra_p == NULL)
175 break;
176#ifdef DEBUG_VALIDATION
177 BUG_ON(extra_p->flow != NULL);
178#endif
180 if (r == TM_ECODE_FAILED) {
181 TmThreadsSlotProcessPktFail(tv, extra_p);
182 break;
183 }
184 tv->tmqh_out(tv, extra_p);
185 }
186 return true;
187 } else {
188 return false;
189 }
190}
191
192/**
193 * \brief Process the rest of the functions (if any) and queue.
194 */
195static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet *p)
196{
197 if (s == NULL) {
198 tv->tmqh_out(tv, p);
199 return TM_ECODE_OK;
200 }
201
202 TmEcode r = TmThreadsSlotVarRun(tv, p, s);
203 if (unlikely(r == TM_ECODE_FAILED)) {
204 TmThreadsSlotProcessPktFail(tv, p);
205 return TM_ECODE_FAILED;
206 }
207
208 tv->tmqh_out(tv, p);
209
210 TmThreadsHandleInjectedPackets(tv);
211
212 return TM_ECODE_OK;
213}
214
215/** \brief inject packet if THV_CAPTURE_INJECT_PKT is set
216 * Allow caller to supply their own packet
217 *
218 * Meant for detect reload process that interrupts an sleeping capture thread
219 * to force a packet through the engine to complete a reload */
220static inline void TmThreadsCaptureInjectPacket(ThreadVars *tv, Packet *p)
221{
223 if (p == NULL)
225 if (p != NULL) {
228 if (TmThreadsSlotProcessPkt(tv, tv->tm_flowworker, p) != TM_ECODE_OK) {
230 }
231 }
232}
233
234/** \brief handle capture timeout
235 * When a capture method times out we check for house keeping
236 * tasks in the capture thread.
237 *
238 * \param p packet. Capture method may have taken a packet from
239 * the pool prior to the timing out call. We will then
240 * use that packet. Otherwise we can get our own.
241 */
242static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, Packet *p)
243{
245 TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
246 return;
247
248 } else {
249 if (!TmThreadsHandleInjectedPackets(tv)) {
250 /* see if we have to do some house keeping */
251 if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty)) {
252 TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
253 return;
254 }
255 }
256 }
257
258 /* packet could have been passed to us that we won't use
259 * return it to the pool. */
260 if (p != NULL)
261 tv->tmqh_out(tv, p);
262}
263
264static inline void TmThreadsCaptureBreakLoop(ThreadVars *tv)
265{
266 if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
267 return;
268 }
269 /* find the correct slot */
270 TmSlot *s = tv->tm_slots;
272 if (tm->flags & TM_FLAG_RECEIVE_TM) {
273 /* if the method supports it, BreakLoop. Otherwise we rely on
274 * the capture method's recv timeout */
275 if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
276 tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
277 }
279 }
280}
281
282void TmThreadsSealThreads(void);
283void TmThreadsUnsealThreads(void);
284void TmThreadsListThreads(void);
286void TmThreadsUnregisterThread(const int id);
287void TmThreadsInjectFlowById(Flow *f, const int id);
288
290void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts);
291void TmThreadsGetMinimalTimestamp(struct timeval *ts);
292SCTime_t TmThreadsGetThreadTime(const int idx);
293uint16_t TmThreadsGetWorkerThreadMax(void);
296
297/** \brief Wait for a thread to become unpaused.
298 *
299 * Check if a thread should wait to be unpaused and wait if so, or
300 * until the thread kill flag is set.
301 *
302 * \returns true if the thread was unpaused, false if killed.
303 */
305
306#endif /* SURICATA_TM_THREADS_H */
uint8_t flags
Definition decode-gre.h:0
uint16_t type
#define PKT_PSEUDO_STREAM_END
Definition decode.h:1268
#define PKT_SET_SRC(p, src_val)
Definition decode.h:1325
@ PKT_SRC_CAPTURE_TIMEOUT
Definition decode.h:62
ThreadVars * tv
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition decode.c:293
Packet * PacketDequeue(PacketQueue *q)
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
uint64_t ts
Flow data structure.
Definition flow.h:356
simple fifo queue for packets
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
uint32_t len
SCMutex mutex_q
struct Flow_ * flow
Definition decode.h:546
uint32_t flags
Definition decode.h:544
Per thread variable structure.
Definition threadvars.h:58
struct PacketQueue_ * stream_pq_local
Definition threadvars.h:117
struct TmSlot_ * tm_slots
Definition threadvars.h:96
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition threadvars.h:106
PacketQueueNoLock decode_pq
Definition threadvars.h:112
uint8_t tmm_flags
Definition threadvars.h:79
struct TmSlot_ * tm_flowworker
Definition threadvars.h:101
struct FlowQueue_ * flow_queue
Definition threadvars.h:135
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition tm-modules.h:61
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition tm-modules.h:58
uint8_t flags
Definition tm-modules.h:80
SC_ATOMIC_DECLARE(void *, slot_data)
const void * slot_initdata
Definition tm-threads.h:77
TmSlotFunc SlotFunc
Definition tm-threads.h:56
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition tm-threads.h:57
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition tm-threads.h:74
struct TmSlot_ * slot_next
Definition tm-threads.h:62
uint8_t tm_flags
Definition tm-threads.h:67
TmEcode(* Management)(ThreadVars *, void *)
Definition tm-threads.h:58
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition tm-threads.h:72
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition tm-threads.h:73
int tm_id
Definition tm-threads.h:70
#define BUG_ON(x)
#define SCMutex
#define SCMutexUnlock(mut)
#define SCMutexLock(mut)
#define THV_CAPTURE_INJECT_PKT
Definition threadvars.h:53
#define THV_FAILED
Definition threadvars.h:41
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition tm-modules.c:69
#define TM_FLAG_RECEIVE_TM
Definition tm-modules.h:32
@ TVT_MAX
@ TM_ECODE_FAILED
@ TM_ECODE_OK
const char * name
void TmThreadKillThreads(void)
TmEcode TmThreadSetupOptions(ThreadVars *)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition tm-threads.c:863
void TmThreadClearThreadsFamily(int family)
void TmSlotSetFuncAppend(ThreadVars *, TmModule *, const void *)
Appends a new entry to the slots.
Definition tm-threads.c:658
SCTime_t TmThreadsGetThreadTime(const int idx)
void TmThreadInitMC(ThreadVars *)
Initializes the mutex and condition variables for this TV.
bool SCTmThreadsSlotPacketLoopFinish(ThreadVars *tv)
Definition tm-threads.c:273
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread,...
void TmThreadAppend(ThreadVars *, int)
Appends this TV to tv_root based on its type.
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition tm-threads.c:133
ThreadVars * TmThreadCreate(const char *, const char *, const char *, const char *, const char *, const char *, void *(fn_p)(void *), int)
struct TmSlot_ TmSlot
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
int TmThreadGetNbThreads(uint8_t type)
Definition tm-threads.c:847
void TmThreadsUnsealThreads(void)
void TmThreadsSetFlag(ThreadVars *, uint32_t)
Set a thread flag.
Definition tm-threads.c:101
TmEcode TmThreadSpawn(ThreadVars *)
Spawns a thread associated with the ThreadVars instance tv.
void TmThreadSetGroupName(ThreadVars *tv, const char *name)
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
uint16_t TmThreadsGetWorkerThreadMax(void)
void TmThreadsUnsetFlag(ThreadVars *, uint32_t)
Unset a thread flag.
Definition tm-threads.c:109
TmEcode(* TmSlotFunc)(ThreadVars *, Packet *, void *)
Definition tm-threads.h:51
void TmThreadContinue(ThreadVars *)
Unpauses a thread.
TmEcode TmThreadSetThreadPriority(ThreadVars *, int)
Set the thread options (thread priority).
Definition tm-threads.c:774
void TmThreadsInjectFlowById(Flow *f, const int id)
inject a flow into a threads flow queue
TmEcode TmThreadSetCPU(ThreadVars *, uint8_t)
Definition tm-threads.c:831
void TmThreadsListThreads(void)
ThreadVars * tv_root[TVT_MAX]
Definition tm-threads.c:82
TmEcode TmThreadLibSpawn(ThreadVars *)
Spawns a "fake" lib thread associated with the ThreadVars instance tv.
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
ThreadVars * TmThreadCreatePacketHandler(const char *, const char *, const char *, const char *, const char *, const char *)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn't support custo...
void TmThreadsInitThreadsTimestamp(const SCTime_t ts)
void TmThreadsUnregisterThread(const int id)
int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
Definition tm-threads.c:166
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
#define SleepUsec(usec)
Definition tm-threads.h:44
void TmThreadKillThreadsFamily(int family)
int TmThreadsCheckFlag(ThreadVars *, uint32_t)
Check if a thread flag is set.
Definition tm-threads.c:93
void TmThreadContinueThreads(void)
Unpauses all threads present in tv_root.
void TmThreadSetPrio(ThreadVars *)
Adjusting nice value for threads.
Definition tm-threads.c:785
SCMutex tv_root_lock
Definition tm-threads.c:85
bool TmThreadsWaitForUnpause(ThreadVars *tv)
Wait for a thread to become unpaused.
Definition tm-threads.c:363
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
void TmThreadDisablePacketThreads(const uint16_t set, const uint16_t check, const uint8_t module_flags)
Disable all packet threads.
void TmThreadWaitForFlag(ThreadVars *, uint32_t)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
TmEcode TmThreadSetCPUAffinity(ThreadVars *, uint16_t)
Set the thread options (cpu affinity).
Definition tm-threads.c:822
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
bool TmThreadsTimeSubsysIsReady(void)
void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
void TmThreadsSealThreads(void)
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
TmEcode TmThreadWaitOnThreadRunning(void)
Waits for all threads to be in a running state.
TmEcode TmThreadsProcessDecodePseudoPackets(ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
Definition tm-threads.c:114
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
void TmqhReleasePacketsToPacketPool(PacketQueue *pq)
Release all the packets in the queue back to the packetpool. Mainly used by threads that have failed,...
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
#define unlikely(expr)