suricata
output-streaming.c
Go to the documentation of this file.
1/* Copyright (C) 2007-2022 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18/**
19 * \file
20 *
21 * \author Victor Julien <victor@inliniac.net>
22 *
23 * Logger for streaming data
24 */
25
26#include "suricata-common.h"
27#include "output.h"
28#include "output-streaming.h"
29#include "app-layer.h"
30#include "app-layer-parser.h"
31#include "app-layer-htp.h"
32#include "util-print.h"
33#include "conf.h"
34#include "util-profiling.h"
35#include "stream-tcp.h"
36#include "stream-tcp-inline.h"
38#include "util-validate.h"
39
40/** per thread data for this module, contains a list of per thread
41 * data for the packet loggers. */
46
47/* logger instance, a module + a output ctx,
48 * it's perfectly valid that have multiple instances of the same
49 * log module (e.g. http.log) with different output ctx'. */
60
61static OutputStreamingLogger *list = NULL;
62
66{
67 OutputStreamingLogger *op = SCCalloc(1, sizeof(*op));
68 if (op == NULL)
69 return -1;
70
71 op->LogFunc = LogFunc;
72 op->initdata = initdata;
73 op->name = name;
74 op->logger_id = id;
75 op->type = type;
78
79 if (list == NULL)
80 list = op;
81 else {
82 OutputStreamingLogger *t = list;
83 while (t->next)
84 t = t->next;
85 t->next = op;
86 }
87
88 if (op->type == STREAMING_TCP_DATA) {
90 }
91
92 SCLogDebug("OutputRegisterStreamingLogger happy");
93 return 0;
94}
95
103
104static int Streamer(void *cbdata, Flow *f, const uint8_t *data, uint32_t data_len, uint64_t tx_id, uint8_t flags)
105{
106 StreamerCallbackData *streamer_cbdata = (StreamerCallbackData *)cbdata;
107 DEBUG_VALIDATE_BUG_ON(streamer_cbdata == NULL);
108 OutputStreamingLogger *logger = streamer_cbdata->logger;
109 OutputLoggerThreadStore *store = streamer_cbdata->store;
110 ThreadVars *tv = streamer_cbdata->tv;
111#ifdef PROFILING
112 Packet *p = streamer_cbdata->p;
113#endif
114 DEBUG_VALIDATE_BUG_ON(logger == NULL);
115 DEBUG_VALIDATE_BUG_ON(store == NULL);
116
117 while (logger && store) {
118 DEBUG_VALIDATE_BUG_ON(logger->LogFunc == NULL);
119
120 if (logger->type == streamer_cbdata->type) {
121 SCLogDebug("logger %p", logger);
123 logger->LogFunc(tv, store->thread_data, (const Flow *)f, data, data_len, tx_id, flags);
125 }
126
127 logger = logger->next;
128 store = store->next;
129
130 DEBUG_VALIDATE_BUG_ON(logger == NULL && store != NULL);
131 DEBUG_VALIDATE_BUG_ON(logger != NULL && store == NULL);
132 }
133
134 return 0;
135}
136
137/** \brief Http Body Iterator for logging
138 *
139 * Global logic:
140 *
141 * - For each tx
142 * - For each body chunk
143 * - Invoke Streamer
144 */
145
146static int HttpBodyIterator(Flow *f, int close, void *cbdata, uint8_t iflags)
147{
148 SCLogDebug("called with %p, %d, %p, %02x", f, close, cbdata, iflags);
149
150 HtpState *s = f->alstate;
151 if (s == NULL || s->conn == NULL) {
152 return 0;
153 }
154
155 const int tx_progress_done_value_ts =
157 const int tx_progress_done_value_tc =
159 const uint64_t total_txs = AppLayerParserGetTxCnt(f, f->alstate);
160
161 uint64_t tx_id = 0;
162 for (tx_id = 0; tx_id < total_txs; tx_id++) { // TODO optimization store log tx
163 htp_tx_t *tx = AppLayerParserGetTx(f->proto, f->alproto, f->alstate, tx_id);
164 if (tx == NULL) {
165 continue;
166 }
167
168 int tx_done = 0;
169 int tx_logged = 0;
170 int tx_progress_ts = AppLayerParserGetStateProgress(
171 IPPROTO_TCP, ALPROTO_HTTP1, tx, FlowGetDisruptionFlags(f, STREAM_TOSERVER));
172 if (tx_progress_ts >= tx_progress_done_value_ts) {
173 int tx_progress_tc = AppLayerParserGetStateProgress(
174 IPPROTO_TCP, ALPROTO_HTTP1, tx, FlowGetDisruptionFlags(f, STREAM_TOCLIENT));
175 if (tx_progress_tc >= tx_progress_done_value_tc) {
176 tx_done = 1;
177 }
178 }
179
180 SCLogDebug("tx %p", tx);
181 HtpTxUserData *htud = (HtpTxUserData *)htp_tx_get_user_data(tx);
182 SCLogDebug("htud %p", htud);
183 HtpBody *body = NULL;
185 body = &htud->request_body;
186 else if (iflags & OUTPUT_STREAMING_FLAG_TOCLIENT)
187 body = &htud->response_body;
188
189 if (body == NULL) {
190 SCLogDebug("no body");
191 goto next;
192 }
193 if (body->first == NULL) {
194 SCLogDebug("no body chunks");
195 goto next;
196 }
197 if (body->last->logged == 1) {
198 SCLogDebug("all logged already");
199 goto next;
200 }
201
202 // for each chunk
203 HtpBodyChunk *chunk = body->first;
204 for ( ; chunk != NULL; chunk = chunk->next) {
205 if (chunk->logged) {
206 SCLogDebug("logged %d", chunk->logged);
207 continue;
208 }
209
210 uint8_t flags = iflags | OUTPUT_STREAMING_FLAG_TRANSACTION;
211 if (chunk->sbseg.stream_offset == 0)
213 /* if we need to close and we're at the last segment in the list
214 * we add the 'close' flag so the logger can close up. */
215 if ((tx_done || close) && chunk->next == NULL) {
217 }
218
219 const uint8_t *data = NULL;
220 uint32_t data_len = 0;
221 StreamingBufferSegmentGetData(body->sb, &chunk->sbseg, &data, &data_len);
222
223 // invoke Streamer
224 Streamer(cbdata, f, data, data_len, tx_id, flags);
225 //PrintRawDataFp(stdout, data, data_len);
226 chunk->logged = 1;
227 tx_logged = 1;
228 }
229
230 next:
231 /* if we need to close we need to invoke the Streamer for sure. If we
232 * logged no chunks, we call the Streamer with NULL data so it can
233 * close up. */
234 if (tx_logged == 0 && (close||tx_done)) {
235 Streamer(cbdata, f, NULL, 0, tx_id,
237 }
238 }
239 return 0;
240}
241
243 uint8_t flags;
246};
247
248static int StreamLogFunc(
249 void *cb_data, const uint8_t *data, const uint32_t data_len, const uint64_t _offset)
250{
251 struct StreamLogData *log = cb_data;
252
253 Streamer(log->streamer_cbdata, log->f, data, data_len, 0, log->flags);
254
255 /* hack: unset open flag after first run */
256 log->flags &= ~OUTPUT_STREAMING_FLAG_OPEN;
257
258 return 0;
259}
260
261static int TcpDataLogger (Flow *f, TcpSession *ssn, TcpStream *stream,
262 bool eof, uint8_t iflags, void *streamer_cbdata)
263{
264 uint8_t flags = iflags;
265 uint64_t progress = STREAM_LOG_PROGRESS(stream);
266
267 if (progress == 0)
269
270 struct StreamLogData log_data = { flags, streamer_cbdata, f };
271 StreamReassembleLog(ssn, stream,
272 StreamLogFunc, &log_data,
273 progress, &progress, eof);
274
275 if (progress > STREAM_LOG_PROGRESS(stream)) {
276 DEBUG_VALIDATE_BUG_ON(progress - STREAM_LOG_PROGRESS(stream) > UINT32_MAX);
277 uint32_t slide = (uint32_t)(progress - STREAM_LOG_PROGRESS(stream));
278 stream->log_progress_rel += slide;
279 }
280
281 if (eof) {
282 Streamer(streamer_cbdata, f, NULL, 0, 0, flags|OUTPUT_STREAMING_FLAG_CLOSE);
283 }
284 return 0;
285}
286
287static TmEcode OutputStreamingLog(ThreadVars *tv, Packet *p, void *thread_data)
288{
289 DEBUG_VALIDATE_BUG_ON(thread_data == NULL);
290
291 if (list == NULL) {
292 /* No child loggers. */
293 return TM_ECODE_OK;
294 }
295
296 OutputStreamingLoggerThreadData *op_thread_data =
297 (OutputStreamingLoggerThreadData *)thread_data;
298 OutputStreamingLogger *logger = list;
299 OutputLoggerThreadStore *store = op_thread_data->store;
300
301 StreamerCallbackData streamer_cbdata = { logger, store, tv, p , 0};
302
303 DEBUG_VALIDATE_BUG_ON(logger == NULL && store != NULL);
304 DEBUG_VALIDATE_BUG_ON(logger != NULL && store == NULL);
305 DEBUG_VALIDATE_BUG_ON(logger == NULL && store == NULL);
306
307 uint8_t flags = 0;
308 Flow * const f = p->flow;
309
310 /* no flow, no streaming */
311 if (f == NULL) {
313 }
314
315 if (!(StreamTcpInlineMode())) {
316 if (PKT_IS_TOCLIENT(p)) {
318 } else {
320 }
321 } else {
322 if (PKT_IS_TOSERVER(p)) {
324 } else {
326 }
327 }
328
329 if (op_thread_data->loggers & (1<<STREAMING_TCP_DATA)) {
330 TcpSession *ssn = f->protoctx;
331 if (ssn) {
332 int close = (ssn->state >= TCP_CLOSED);
333 close |= ((p->flags & PKT_PSEUDO_STREAM_END) ? 1 : 0);
334 SCLogDebug("close ? %s", close ? "yes" : "no");
335
336 TcpStream *stream = flags & OUTPUT_STREAMING_FLAG_TOSERVER ? &ssn->client : &ssn->server;
338 TcpDataLogger(f, ssn, stream, close, flags, (void *)&streamer_cbdata);
339 }
340 }
341 if (op_thread_data->loggers & (1<<STREAMING_HTTP_BODIES)) {
342 if (f->alproto == ALPROTO_HTTP1 && f->alstate != NULL) {
343 int close = 0;
344 TcpSession *ssn = f->protoctx;
345 if (ssn) {
346 close = (ssn->state >= TCP_CLOSED);
347 close |= ((p->flags & PKT_PSEUDO_STREAM_END) ? 1 : 0);
348 }
349 SCLogDebug("close ? %s", close ? "yes" : "no");
351 HttpBodyIterator(f, close, (void *)&streamer_cbdata, flags);
352 }
353 }
354
355 return TM_ECODE_OK;
356}
357
358/** \brief thread init for the tx logger
359 * This will run the thread init functions for the individual registered
360 * loggers */
361static TmEcode OutputStreamingLogThreadInit(ThreadVars *tv, const void *initdata, void **data) {
362 OutputStreamingLoggerThreadData *td = SCCalloc(1, sizeof(*td));
363 if (td == NULL)
364 return TM_ECODE_FAILED;
365
366 *data = (void *)td;
367
368 SCLogDebug("OutputStreamingLogThreadInit happy (*data %p)", *data);
369
370 OutputStreamingLogger *logger = list;
371 while (logger) {
372 if (logger->ThreadInit) {
373 void *retptr = NULL;
374 if (logger->ThreadInit(tv, logger->initdata, &retptr) == TM_ECODE_OK) {
375 OutputLoggerThreadStore *ts = SCCalloc(1, sizeof(*ts));
376 /* todo */ BUG_ON(ts == NULL);
377
378 /* store thread handle */
379 ts->thread_data = retptr;
380
381 if (td->store == NULL) {
382 td->store = ts;
383 } else {
385 while (tmp->next != NULL)
386 tmp = tmp->next;
387 tmp->next = ts;
388 }
389
390 SCLogDebug("%s is now set up", logger->name);
391 }
392 }
393
394 td->loggers |= (1<<logger->type);
395
396 logger = logger->next;
397 }
398
399 return TM_ECODE_OK;
400}
401
402static TmEcode OutputStreamingLogThreadDeinit(ThreadVars *tv, void *thread_data) {
403 OutputStreamingLoggerThreadData *op_thread_data =
404 (OutputStreamingLoggerThreadData *)thread_data;
405 OutputLoggerThreadStore *store = op_thread_data->store;
406 OutputStreamingLogger *logger = list;
407
408 while (logger && store) {
409 if (logger->ThreadDeinit) {
410 logger->ThreadDeinit(tv, store->thread_data);
411 }
412
413 OutputLoggerThreadStore *next_store = store->next;
414 SCFree(store);
415 logger = logger->next;
416 store = next_store;
417 }
418
419 SCFree(op_thread_data);
420 return TM_ECODE_OK;
421}
422
423static uint32_t OutputStreamingLoggerGetActiveCount(void)
424{
425 uint32_t cnt = 0;
426 for (OutputStreamingLogger *p = list; p != NULL; p = p->next) {
427 cnt++;
428 }
429 return cnt;
430}
431
433 OutputRegisterRootLogger(OutputStreamingLogThreadInit, OutputStreamingLogThreadDeinit,
434 OutputStreamingLog, OutputStreamingLoggerGetActiveCount);
435}
436
438{
439 OutputStreamingLogger *logger = list;
440 while (logger) {
441 OutputStreamingLogger *next_logger = logger->next;
442 SCFree(logger);
443 logger = next_logger;
444 }
445 list = NULL;
446}
struct HtpBodyChunk_ * next
int AppLayerParserGetStateProgressCompletionStatus(AppProto alproto, uint8_t direction)
uint64_t AppLayerParserGetTxCnt(const Flow *f, void *alstate)
void * AppLayerParserGetTx(uint8_t ipproto, AppProto alproto, void *alstate, uint64_t tx_id)
int AppLayerParserGetStateProgress(uint8_t ipproto, AppProto alproto, void *alstate, uint8_t flags)
get the progress value for a tx/protocol
@ ALPROTO_HTTP1
uint8_t flags
Definition decode-gre.h:0
uint16_t type
#define PKT_PSEUDO_STREAM_END
Definition decode.h:1268
#define PKT_IS_TOCLIENT(p)
Definition decode.h:239
#define PKT_IS_TOSERVER(p)
Definition decode.h:238
uint32_t id
TcpStreamCnf stream_config
Definition stream-tcp.c:219
uint8_t FlowGetDisruptionFlags(const Flow *f, uint8_t flags)
get 'disruption' flags: GAP/DEPTH/PASS
Definition flow.c:1141
ThreadVars * tv
void OutputStreamingLoggerRegister(void)
struct StreamerCallbackData_ StreamerCallbackData
struct OutputStreamingLogger_ OutputStreamingLogger
int SCOutputRegisterStreamingLogger(LoggerId id, const char *name, SCStreamingLogger LogFunc, void *initdata, enum SCOutputStreamingType type, ThreadInitFunc ThreadInit, ThreadDeinitFunc ThreadDeinit)
Register a streaming logger.
void OutputStreamingShutdown(void)
struct OutputStreamingLoggerThreadData_ OutputStreamingLoggerThreadData
#define OUTPUT_STREAMING_FLAG_TRANSACTION
#define OUTPUT_STREAMING_FLAG_TOCLIENT
int(* SCStreamingLogger)(ThreadVars *, void *thread_data, const Flow *f, const uint8_t *data, uint32_t data_len, uint64_t tx_id, uint8_t flags)
#define OUTPUT_STREAMING_FLAG_CLOSE
SCOutputStreamingType
@ STREAMING_TCP_DATA
@ STREAMING_HTTP_BODIES
#define OUTPUT_STREAMING_FLAG_TOSERVER
#define OUTPUT_STREAMING_FLAG_OPEN
void OutputRegisterRootLogger(ThreadInitFunc ThreadInit, ThreadDeinitFunc ThreadDeinit, OutputLogFunc LogFunc, OutputGetActiveCountFunc ActiveCntFunc)
Definition output.c:874
uint64_t ts
@ TCP_CLOSED
#define STREAM_LOG_PROGRESS(stream)
int StreamReassembleLog(const TcpSession *ssn, const TcpStream *stream, StreamReassembleRawFunc Callback, void *cb_data, const uint64_t progress_in, uint64_t *progress_out, const bool eof)
bool StreamTcpInlineMode(void)
See if stream engine is operating in inline mode.
Flow data structure.
Definition flow.h:356
uint8_t proto
Definition flow.h:378
AppProto alproto
application level protocol
Definition flow.h:450
void * alstate
Definition flow.h:479
void * protoctx
Definition flow.h:441
StreamingBufferSegment sbseg
struct HtpBodyChunk_ * next
HtpBodyChunk * last
HtpBodyChunk * first
StreamingBuffer * sb
htp_conn_t * conn
HtpBody response_body
struct OutputLoggerThreadStore_ * next
Definition output.h:35
OutputLoggerThreadStore * store
struct OutputStreamingLogger_ * next
ThreadDeinitFunc ThreadDeinit
enum SCOutputStreamingType type
SCStreamingLogger LogFunc
struct Flow_ * flow
Definition decode.h:546
struct Packet_ * next
Definition decode.h:635
uint32_t flags
Definition decode.h:544
OutputStreamingLogger * logger
OutputLoggerThreadStore * store
enum SCOutputStreamingType type
bool streaming_log_api
Definition stream-tcp.h:72
uint32_t log_progress_rel
Per thread variable structure.
Definition threadvars.h:58
#define BUG_ON(x)
TmEcode(* ThreadDeinitFunc)(ThreadVars *, void *)
Definition tm-modules.h:44
TmEcode(* ThreadInitFunc)(ThreadVars *, const void *, void **)
Definition tm-modules.h:43
@ TM_ECODE_FAILED
@ TM_ECODE_OK
const char * name
uint32_t cnt
#define SCLogDebug(...)
Definition util-debug.h:275
#define SCReturnInt(x)
Definition util-debug.h:281
#define SCFree(p)
Definition util-mem.h:61
#define SCCalloc(nm, sz)
Definition util-mem.h:53
#define PACKET_PROFILING_LOGGER_END(p, id)
#define PACKET_PROFILING_LOGGER_START(p, id)
void StreamingBufferSegmentGetData(const StreamingBuffer *sb, const StreamingBufferSegment *seg, const uint8_t **data, uint32_t *data_len)
#define DEBUG_VALIDATE_BUG_ON(exp)