suricata
log-flush.c
Go to the documentation of this file.
1/* Copyright (C) 2024 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18/**
19 * \file
20 *
21 * \author Jeff Lucovsky <jlucovsky@oisf.net>
22 */
23
24#include "suricata-common.h"
25#include "suricata.h"
26#include "detect.h"
27#include "detect-engine.h"
28#include "flow-worker.h"
29#include "log-flush.h"
30#include "tm-threads.h"
31#include "conf.h"
32#include "conf-yaml-loader.h"
33#include "util-privs.h"
34
35/**
36 * \brief Trigger detect threads to flush their output logs
37 *
38 * This function is intended to be called at regular intervals to force
39 * buffered log data to be persisted
40 */
41static void WorkerFlushLogs(void)
42{
43 SCEnter();
44
45 /* count detect threads in use */
46 uint32_t no_of_detect_tvs = TmThreadCountThreadsByTmmFlags(TM_FLAG_FLOWWORKER_TM);
47 /* can be zero in unix socket mode */
48 if (no_of_detect_tvs == 0) {
49 return;
50 }
51
52 /* prepare swap structures */
53 void *fw_threads[no_of_detect_tvs];
54 ThreadVars *detect_tvs[no_of_detect_tvs];
55 memset(fw_threads, 0x00, (no_of_detect_tvs * sizeof(void *)));
56 memset(detect_tvs, 0x00, (no_of_detect_tvs * sizeof(ThreadVars *)));
57
58 /* start by initiating the log flushes */
59
60 uint32_t i = 0;
62 /* get reference to tv's and setup fw_threads array */
63 for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
64 if ((tv->tmm_flags & TM_FLAG_FLOWWORKER_TM) == 0) {
65 continue;
66 }
67 for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
68 TmModule *tm = TmModuleGetById(s->tm_id);
69 if (!(tm->flags & TM_FLAG_FLOWWORKER_TM)) {
70 continue;
71 }
72
73 if (suricata_ctl_flags != 0) {
75 goto error;
76 }
77
78 fw_threads[i] = FlowWorkerGetThreadData(SC_ATOMIC_GET(s->slot_data));
79 if (fw_threads[i]) {
80 FlowWorkerSetFlushAck(fw_threads[i]);
81 SCLogDebug("Setting flush-ack for thread %s[i=%d]", tv->printable_name, i);
82 detect_tvs[i] = tv;
83 }
84
85 i++;
86 break;
87 }
88 }
89 BUG_ON(i != no_of_detect_tvs);
90
92
93 SCLogDebug("Creating flush pseudo packets for %d threads", no_of_detect_tvs);
94 InjectPacketsForFlush(detect_tvs, no_of_detect_tvs);
95
96 uint32_t threads_done = 0;
97retry:
98 for (i = 0; i < no_of_detect_tvs; i++) {
99 if (suricata_ctl_flags != 0) {
100 threads_done = no_of_detect_tvs;
101 break;
102 }
103 SleepMsec(1);
104 if (fw_threads[i] && FlowWorkerGetFlushAck(fw_threads[i])) {
105 SCLogDebug("thread slot %d has ack'd flush request", i);
106 threads_done++;
107 } else if (detect_tvs[i]) {
108 SCLogDebug("thread slot %d not yet ack'd flush request", i);
109 TmThreadsCaptureBreakLoop(detect_tvs[i]);
110 }
111 }
112 if (threads_done < no_of_detect_tvs) {
113 threads_done = 0;
114 SleepMsec(250);
115 goto retry;
116 }
117
118error:
119 return;
120}
121
122static int OutputFlushInterval(void)
123{
124 intmax_t output_flush_interval = 0;
125 if (SCConfGetInt("heartbeat.output-flush-interval", &output_flush_interval) == 0) {
126 output_flush_interval = 0;
127 }
128 if (output_flush_interval < 0 || output_flush_interval > 60) {
129 SCLogConfig("flush_interval must be 0 or less than 60; using 0");
130 output_flush_interval = 0;
131 }
132
133 return (int)output_flush_interval;
134}
135
136static void *LogFlusherWakeupThread(void *arg)
137{
138 int output_flush_interval = OutputFlushInterval();
139 /* This was checked by the logic creating this thread */
140 BUG_ON(output_flush_interval == 0);
141
142 SCLogConfig("Using output-flush-interval of %d seconds", output_flush_interval);
143 /*
144 * Calculate the number of sleep intervals based on the output flush interval. This is necessary
145 * because this thread pauses a fixed amount of time to react to shutdown situations more
146 * quickly.
147 */
148 const int log_flush_sleep_time = 500; /* milliseconds */
149 const int flush_wait_count = (1000 * output_flush_interval) / log_flush_sleep_time;
150
151 ThreadVars *tv_local = (ThreadVars *)arg;
152 SCSetThreadName(tv_local->name);
153
154 if (tv_local->thread_setup_flags != 0)
155 TmThreadSetupOptions(tv_local);
156
157 /* Set the threads capability */
158 tv_local->cap_flags = 0;
159 SCDropCaps(tv_local);
160
162
163 int wait_count = 0;
164 uint64_t worker_flush_count = 0;
165 bool run = TmThreadsWaitForUnpause(tv_local);
166 while (run) {
167 SleepMsec(log_flush_sleep_time);
168
169 if (++wait_count == flush_wait_count) {
170 worker_flush_count++;
171 WorkerFlushLogs();
172 wait_count = 0;
173 }
174
175 if (TmThreadsCheckFlag(tv_local, THV_KILL)) {
176 break;
177 }
178 }
179
182 TmThreadsSetFlag(tv_local, THV_CLOSED);
183 SCLogInfo("%s: initiated %" PRIu64 " flushes", tv_local->name, worker_flush_count);
184 return NULL;
185}
186
188{
189 if (0 == OutputFlushInterval()) {
190 SCLogConfig("log flusher thread not used with heartbeat.output-flush-interval of 0");
191 return;
192 }
193
194 ThreadVars *tv_log_flush =
195 TmThreadCreateMgmtThread(thread_name_heartbeat, LogFlusherWakeupThread, 1);
196 if (!tv_log_flush || (TmThreadSpawn(tv_log_flush) != 0)) {
197 FatalError("Unable to create and start log flush thread");
198 }
199}
int SCConfGetInt(const char *name, intmax_t *val)
Retrieve a configuration value as an integer.
Definition conf.c:414
void InjectPacketsForFlush(ThreadVars **detect_tvs, int no_of_detect_tvs)
void * FlowWorkerGetThreadData(void *flow_worker)
void FlowWorkerSetFlushAck(void *flow_worker)
bool FlowWorkerGetFlushAck(void *flow_worker)
ThreadVars * tv
void LogFlushThreads(void)
Definition log-flush.c:187
const char * thread_name_heartbeat
Definition runmodes.c:77
Per thread variable structure.
Definition threadvars.h:58
uint8_t cap_flags
Definition threadvars.h:81
struct TmSlot_ * tm_slots
Definition threadvars.h:96
char name[16]
Definition threadvars.h:65
uint8_t tmm_flags
Definition threadvars.h:79
struct ThreadVars_ * next
Definition threadvars.h:125
char * printable_name
Definition threadvars.h:66
uint8_t thread_setup_flags
Definition threadvars.h:69
uint8_t flags
Definition tm-modules.h:80
#define BUG_ON(x)
volatile uint8_t suricata_ctl_flags
Definition suricata.c:172
#define SCMutexUnlock(mut)
#define SCMutexLock(mut)
#define SCSetThreadName(n)
Definition threads.h:304
#define THV_RUNNING_DONE
Definition threadvars.h:46
#define THV_CLOSED
Definition threadvars.h:42
#define THV_DEINIT
Definition threadvars.h:45
#define THV_KILL
Definition threadvars.h:40
#define THV_RUNNING
Definition threadvars.h:55
#define THV_INIT_DONE
Definition threadvars.h:37
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition tm-modules.c:69
#define TM_FLAG_FLOWWORKER_TM
Definition tm-modules.h:34
@ TVT_PPT
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition tm-threads.c:93
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
ThreadVars * tv_root[TVT_MAX]
Definition tm-threads.c:82
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
SCMutex tv_root_lock
Definition tm-threads.c:85
bool TmThreadsWaitForUnpause(ThreadVars *tv)
Wait for a thread to become unpaused.
Definition tm-threads.c:363
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition tm-threads.c:101
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition tm-threads.c:863
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
#define SleepMsec(msec)
Definition tm-threads.h:45
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
#define SCEnter(...)
Definition util-debug.h:277
#define FatalError(...)
Definition util-debug.h:510
#define SCLogDebug(...)
Definition util-debug.h:275
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition util-debug.h:225
#define SCLogConfig(...)
Definition util-debug.h:229
#define SCDropCaps(...)
Definition util-privs.h:89