suricata
source-erf-dag.c
Go to the documentation of this file.
1/* Copyright (C) 2010-2020 Open Information Security Foundation
2 *
3 * You can copy, redistribute or modify this Program under the terms of
4 * the GNU General Public License version 2 as published by the Free
5 * Software Foundation.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * version 2 along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15 * 02110-1301, USA.
16 */
17
18/**
19 * \file
20 *
21 * \author Endace Technology Limited.
22 * \author Jason MacLulich <jason.maclulich@endace.com>
23 *
24 * Support for reading ERF records from a DAG card.
25 *
26 * Only ethernet supported at this time.
27 */
28
29#include "suricata-common.h"
30#include "suricata.h"
31#include "tm-threads.h"
32
33#include "util-privs.h"
34#include "util-datalink.h"
35#include "util-device-private.h"
36#include "tmqh-packetpool.h"
37#include "source-erf-dag.h"
38
39#ifndef HAVE_DAG
40
41TmEcode NoErfDagSupportExit(ThreadVars *, const void *, void **);
42
43void
45{
46 tmm_modules[TMM_RECEIVEERFDAG].name = "ReceiveErfDag";
47 tmm_modules[TMM_RECEIVEERFDAG].ThreadInit = NoErfDagSupportExit;
53}
54
55void
57{
58 tmm_modules[TMM_DECODEERFDAG].name = "DecodeErfDag";
59 tmm_modules[TMM_DECODEERFDAG].ThreadInit = NoErfDagSupportExit;
65}
66
68NoErfDagSupportExit(ThreadVars *tv, const void *initdata, void **data)
69{
70 SCLogError("Error creating thread %s: you do not have support for DAG cards "
71 "enabled please recompile with --enable-dag",
72 tv->name);
73 exit(EXIT_FAILURE);
74}
75
76#else /* Implied we do have DAG support */
77
78#include <dagapi.h>
79
80/* Minimum amount of data to read from the DAG at a time. */
81#define MINDATA 32768
82
83/* Maximum time (us) to wait for MINDATA to be read. */
84#define MAXWAIT 20000
85
86/* Poll interval in microseconds. */
87#define POLL_INTERVAL 1000;
88
89/* Number of bytes per loop to process before fetching more data. */
90#define BYTES_PER_LOOP (4 * 1024 * 1024) /* 4 MB */
91
92extern uint32_t max_pending_packets;
93
94typedef struct ErfDagThreadVars_ {
97
98 int dagfd;
100 char dagname[DAGNAME_BUFSIZE];
101
102 struct timeval maxwait, poll; /* Could possibly be made static */
103
105
106 uint64_t bytes;
107 uint16_t packets;
108 uint16_t drops;
109
110 /* Current location in the DAG stream input buffer.
111 */
112 uint8_t *top;
113 uint8_t *btm;
114
116
117static inline TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t *top,
118 uint32_t *pkts_read);
119static inline TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec);
120TmEcode ReceiveErfDagLoop(ThreadVars *, void *data, void *slot);
121TmEcode ReceiveErfDagThreadInit(ThreadVars *, const void *, void **);
124TmEcode DecodeErfDagThreadInit(ThreadVars *, const void *, void **);
127void ReceiveErfDagCloseStream(int dagfd, int stream);
128
129/**
130 * \brief Register the ERF file receiver (reader) module.
131 */
132void
146
147/**
148 * \brief Register the ERF file decoder module.
149 */
150void
161
162/**
163 * \brief Initialize the ERF receiver thread, generate a single
164 * ErfDagThreadVar structure for each thread, this will
165 * contain a DAG file descriptor which is read when the
166 * thread executes.
167 *
168 * \param tv Thread variable to ThreadVars
169 * \param initdata Initial data to the interface passed from the user,
170 * this is processed by the user.
171 *
172 * We assume that we have only a single name for the DAG
173 * interface.
174 *
175 * \param data data pointer gets populated with
176 *
177 */
178TmEcode ReceiveErfDagThreadInit(ThreadVars *tv, const void *initdata, void **data)
179{
180 SCEnter();
181 int stream_count = 0;
182
183 if (initdata == NULL) {
184 SCLogError("Error: No DAG interface provided.");
186 }
187
188 ErfDagThreadVars *ewtn = SCCalloc(1, sizeof(ErfDagThreadVars));
189 if (unlikely(ewtn == NULL)) {
190 FatalError("Failed to allocate memory for ERF DAG thread vars.");
191 }
192
193 /* dag_parse_name will return a DAG device name and stream number
194 * to open for this thread.
195 */
196 if (dag_parse_name(initdata, ewtn->dagname, DAGNAME_BUFSIZE,
197 &ewtn->dagstream) < 0) {
198 SCLogError("Failed to parse DAG interface: %s", (const char *)initdata);
199 SCFree(ewtn);
200 exit(EXIT_FAILURE);
201 }
202
203 ewtn->livedev = LiveGetDevice(initdata);
204 if (ewtn->livedev == NULL) {
205 SCLogError("Unable to get %s live device", (const char *)initdata);
206 SCFree(ewtn);
208 }
209
210 SCLogInfo("Opening DAG: %s on stream: %d for processing",
211 ewtn->dagname, ewtn->dagstream);
212
213 if ((ewtn->dagfd = dag_open(ewtn->dagname)) < 0) {
214 SCLogError("Failed to open DAG: %s", ewtn->dagname);
215 SCFree(ewtn);
217 }
218
219 /* Check to make sure the card has enough available streams to
220 * support reading from the one specified.
221 */
222 if ((stream_count = dag_rx_get_stream_count(ewtn->dagfd)) < 0) {
223 SCLogError("Failed to open stream: %d, DAG: %s, could not query stream count",
224 ewtn->dagstream, ewtn->dagname);
225 SCFree(ewtn);
227 }
228
229 /* Check to make sure we have enough rx streams to open the stream
230 * the user is asking for.
231 */
232 if (ewtn->dagstream > stream_count * 2) {
233 SCLogError("Failed to open stream: %d, DAG: %s, insufficient streams: %d", ewtn->dagstream,
234 ewtn->dagname, stream_count);
235 SCFree(ewtn);
237 }
238
239 /* If we are transmitting into a soft DAG card then set the stream
240 * to act in reverse mode.
241 */
242 if (0 != (ewtn->dagstream & 0x01)) {
243 /* Setting reverse mode for using with soft dag from daemon side */
244 if (dag_set_mode(ewtn->dagfd, ewtn->dagstream, DAG_REVERSE_MODE)) {
245 SCLogError("Failed to set mode to DAG_REVERSE_MODE on stream: %d, DAG: %s",
246 ewtn->dagstream, ewtn->dagname);
247 SCFree(ewtn);
249 }
250 }
251
252 if (dag_attach_stream(ewtn->dagfd, ewtn->dagstream, 0, 0) < 0) {
253 SCLogError("Failed to open DAG stream: %d, DAG: %s", ewtn->dagstream, ewtn->dagname);
254 SCFree(ewtn);
256 }
257
258 if (dag_start_stream(ewtn->dagfd, ewtn->dagstream) < 0) {
259 SCLogError("Failed to start DAG stream: %d, DAG: %s", ewtn->dagstream, ewtn->dagname);
260 SCFree(ewtn);
262 }
263
264 SCLogInfo("Attached and started stream: %d on DAG: %s",
265 ewtn->dagstream, ewtn->dagname);
266
267 /*
268 * Initialise DAG Polling parameters.
269 */
270 timerclear(&ewtn->maxwait);
271 ewtn->maxwait.tv_usec = MAXWAIT;
272 timerclear(&ewtn->poll);
273 ewtn->poll.tv_usec = POLL_INTERVAL;
274
275 /* 32kB minimum data to return -- we still restrict the number of
276 * pkts that are processed to a maximum of dag_max_read_packets.
277 */
278 if (dag_set_stream_poll(ewtn->dagfd, ewtn->dagstream, MINDATA,
279 &(ewtn->maxwait), &(ewtn->poll)) < 0) {
280 SCLogError("Failed to set poll parameters for stream: %d, DAG: %s", ewtn->dagstream,
281 ewtn->dagname);
282 SCFree(ewtn);
284 }
285
286 ewtn->packets = StatsRegisterCounter("capture.dag_packets", tv);
287 ewtn->drops = StatsRegisterCounter("capture.dag_drops", tv);
288
289 ewtn->tv = tv;
290 *data = (void *)ewtn;
291
293
294 SCLogInfo("Starting processing packets from stream: %d on DAG: %s",
295 ewtn->dagstream, ewtn->dagname);
296
298}
299
300/**
301 * \brief Receives packets from a DAG interface.
302 *
303 * \param tv pointer to ThreadVars
304 * \param data pointer to ErfDagThreadVars
305 * \param slot slot containing task information
306 *
307 * \retval TM_ECODE_OK on success
308 * \retval TM_ECODE_FAILED on failure
309 */
311ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot)
312{
313 SCEnter();
314
316 uint32_t diff = 0;
317 int err;
318 uint8_t *top = NULL;
319 uint32_t pkts_read = 0;
320 TmSlot *s = (TmSlot *)slot;
321
322 dtv->slot = s->slot_next;
323
324 // Indicate that the thread is actually running its application level code (i.e., it can poll
325 // packets)
327
328 while (1) {
331 }
332
333 top = dag_advance_stream(dtv->dagfd, dtv->dagstream, &(dtv->btm));
334 if (top == NULL) {
335 if (errno == EAGAIN) {
336 if (dtv->dagstream & 0x1) {
337 TmThreadsCaptureHandleTimeout(tv, NULL);
338 usleep(10 * 1000);
339 dtv->btm = dtv->top;
340 }
341 continue;
342 } else {
343 SCLogError("Failed to read from stream: %d, DAG: %s when "
344 "using dag_advance_stream",
345 dtv->dagstream, dtv->dagname);
347 }
348 }
349
350 diff = top - dtv->btm;
351 if (diff == 0) {
352 continue;
353 }
354
355 assert(diff >= dag_record_size);
356
357 err = ProcessErfDagRecords(dtv, top, &pkts_read);
358
359 if (err == TM_ECODE_FAILED) {
360 SCLogError("Failed to read from stream: %d, DAG: %s", dtv->dagstream, dtv->dagname);
361 ReceiveErfDagCloseStream(dtv->dagfd, dtv->dagstream);
363 }
364
366
367 SCLogDebug("Read %d records from stream: %d, DAG: %s",
368 pkts_read, dtv->dagstream, dtv->dagname);
369 }
370
372}
373
374/**
375 * \brief Process a chunk of records read from a DAG interface.
376 *
377 * This function takes a pointer to buffer read from the DAG interface
378 * and processes it individual records.
379 */
380static inline TmEcode
381ProcessErfDagRecords(ErfDagThreadVars *ewtn, uint8_t *top, uint32_t *pkts_read)
382{
383 SCEnter();
384
385 int err = 0;
386 dag_record_t *dr = NULL;
387 char *prec = NULL;
388 int rlen;
389 char hdr_type = 0;
390 int processed = 0;
391
392 *pkts_read = 0;
393
394 while (((top - ewtn->btm) >= dag_record_size) &&
395 ((processed + dag_record_size) < BYTES_PER_LOOP)) {
396
397 /* Make sure we have at least one packet in the packet pool,
398 * to prevent us from alloc'ing packets at line rate. */
400
401 prec = (char *)ewtn->btm;
402 dr = (dag_record_t*)prec;
403 rlen = SCNtohs(dr->rlen);
404 hdr_type = dr->type;
405
406 /* If we don't have enough data to finish processing this ERF
407 * record return and maybe next time we will.
408 */
409 if ((top - ewtn->btm) < rlen)
411
412 ewtn->btm += rlen;
413 processed += rlen;
414
415 /* Only support ethernet at this time. */
416 switch (hdr_type & 0x7f) {
417 case ERF_TYPE_PAD:
418 case ERF_TYPE_META:
419 /* Skip. */
420 continue;
421 case ERF_TYPE_DSM_COLOR_ETH:
422 case ERF_TYPE_COLOR_ETH:
423 case ERF_TYPE_COLOR_HASH_ETH:
424 /* In these types the color value overwrites the lctr
425 * (drop count). */
426 break;
427 case ERF_TYPE_ETH:
428 if (dr->lctr) {
429 StatsAddUI64(ewtn->tv, ewtn->drops, SCNtohs(dr->lctr));
430 }
431 break;
432 default:
433 SCLogError("Processing of DAG record type: %d not implemented.", dr->type);
435 }
436
437 err = ProcessErfDagRecord(ewtn, prec);
438 if (err != TM_ECODE_OK) {
440 }
441
442 (*pkts_read)++;
443 }
444
446}
447
448/**
449 * \brief Process a DAG record into a TM packet buffer.
450 * \param prec pointer to a DAG record.
451 * \param
452 */
453static inline TmEcode
454ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec)
455{
456 SCEnter();
457
458 int wlen = 0;
459 int rlen = 0;
460 int hdr_num = 0;
461 char hdr_type = 0;
462 dag_record_t *dr = (dag_record_t*)prec;
463 erf_payload_t *pload;
464 Packet *p;
465
466 hdr_type = dr->type;
467 wlen = SCNtohs(dr->wlen);
468 rlen = SCNtohs(dr->rlen);
469
470 /* count extension headers */
471 while (hdr_type & 0x80) {
472 if (rlen < (dag_record_size + (hdr_num * 8))) {
473 SCLogError("Insufficient captured packet length.");
475 }
476 hdr_type = prec[(dag_record_size + (hdr_num * 8))];
477 hdr_num++;
478 }
479
480 /* Check that the whole frame was captured */
481 if (rlen < (dag_record_size + (8 * hdr_num) + 2 + wlen)) {
482 SCLogInfo("Incomplete frame captured.");
484 }
485
486 /* skip over extension headers */
487 pload = (erf_payload_t *)(prec + dag_record_size + (8 * hdr_num));
488
490 if (p == NULL) {
491 SCLogError("Failed to allocate a Packet on stream: %d, DAG: %s", ewtn->dagstream,
492 ewtn->dagname);
494 }
496
497 SET_PKT_LEN(p, wlen);
499
500 /* Take into account for link type Ethernet ETH frame starts
501 * after the ERF header + pad.
502 */
503 if (unlikely(PacketCopyData(p, pload->eth.dst, GET_PKT_LEN(p)))) {
504 TmqhOutputPacketpool(ewtn->tv, p);
506 }
507
508 /* Convert ERF time to SCTime_t */
509 uint64_t ts = dr->ts;
510 p->ts = SCTIME_FROM_SECS(ts >> 32);
511 ts = (ts & 0xffffffffULL) * 1000000;
512 ts += 0x80000000; /* rounding */
513 uint64_t usecs = ts >> 32;
514 p->ts = SCTIME_ADD_USECS(p->ts, usecs);
515
516 StatsIncr(ewtn->tv, ewtn->packets);
517 ewtn->bytes += wlen;
518
519 if (TmThreadsSlotProcessPkt(ewtn->tv, ewtn->slot, p) != TM_ECODE_OK) {
521 }
522
524}
525
526/**
527 * \brief Print some stats to the log at program exit.
528 *
529 * \param tv Pointer to ThreadVars.
530 * \param data Pointer to data, ErfFileThreadVars.
531 */
532void
534{
535 ErfDagThreadVars *ewtn = (ErfDagThreadVars *)data;
536
537 (void)SC_ATOMIC_SET(ewtn->livedev->pkts,
539 (void)SC_ATOMIC_SET(ewtn->livedev->drop,
541
542 SCLogInfo("Stream: %d; Bytes: %"PRIu64"; Packets: %"PRIu64
543 "; Drops: %"PRIu64,
544 ewtn->dagstream,
545 ewtn->bytes,
548}
549
550/**
551 * \brief Deinitializes the DAG card.
552 * \param tv pointer to ThreadVars
553 * \param data pointer that gets cast into PcapThreadVars for ptv
554 */
557{
558 SCEnter();
559
560 ErfDagThreadVars *ewtn = (ErfDagThreadVars *)data;
561
563
565}
566
567void
568ReceiveErfDagCloseStream(int dagfd, int stream)
569{
570 dag_stop_stream(dagfd, stream);
571 dag_detach_stream(dagfd, stream);
572 dag_close(dagfd);
573}
574
575/** Decode ErfDag */
576
577/**
578 * \brief This function passes off to link type decoders.
579 *
580 * DecodeErfDag decodes packets from DAG and passes
581 * them off to the proper link type decoder.
582 *
583 * \param t pointer to ThreadVars
584 * \param p pointer to the current packet
585 * \param data pointer that gets cast into PcapThreadVars for ptv
586 */
589{
590 SCEnter();
592
594
595 /* update counters */
597
598 /* call the decoder */
599 switch(p->datalink) {
602 break;
603 default:
604 SCLogError("Error: datalink type %" PRId32 " not yet supported in module DecodeErfDag",
605 p->datalink);
606 break;
607 }
608
610
612}
613
614TmEcode DecodeErfDagThreadInit(ThreadVars *tv, const void *initdata, void **data)
615{
616 SCEnter();
617 DecodeThreadVars *dtv = NULL;
618
620
621 if (dtv == NULL)
623
625
626 *data = (void *)dtv;
627
629}
630
633{
634 if (data != NULL)
637}
638
639#endif /* HAVE_DAG */
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition counters.c:952
void StatsSyncCountersIfSignalled(ThreadVars *tv)
Definition counters.c:450
uint64_t StatsGetLocalCounterValue(ThreadVars *tv, uint16_t id)
Get the value of the local copy of the counter that hold this id.
Definition counters.c:1255
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition counters.c:166
void StatsAddUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Adds a value of type uint64_t to the local counter.
Definition counters.c:146
int DecodeEthernet(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, const uint8_t *pkt, uint32_t len)
#define SET_PKT_LEN(p, len)
Definition decode.h:213
#define PKT_SET_SRC(p, src_val)
Definition decode.h:1325
#define GET_PKT_DATA(p)
Definition decode.h:209
#define GET_PKT_LEN(p)
Definition decode.h:208
#define PKT_IS_PSEUDOPKT(p)
return 1 if the packet is a pseudo packet
Definition decode.h:1321
@ PKT_SRC_WIRE
Definition decode.h:52
DecodeThreadVars * dtv
ThreadVars * tv
uint32_t max_pending_packets
Definition suricata.c:183
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition decode.c:293
void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
Definition decode.c:628
void PacketDecodeFinalize(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
Finalize decoding of a packet.
Definition decode.c:232
DecodeThreadVars * DecodeThreadVarsAlloc(ThreadVars *tv)
Alloc and setup DecodeThreadVars.
Definition decode.c:804
void DecodeThreadVarsFree(ThreadVars *tv, DecodeThreadVars *dtv)
Definition decode.c:822
int PacketCopyData(Packet *p, const uint8_t *pktdata, uint32_t pktlen)
Copy data to Packet payload and set packet length.
Definition decode.c:377
void DecodeUpdatePacketCounters(ThreadVars *tv, const DecodeThreadVars *dtv, const Packet *p)
Definition decode.c:770
#define MAXWAIT
TmEcode DecodeErfDag(ThreadVars *, Packet *, void *)
This function passes off to link type decoders.
TmEcode DecodeErfDagThreadDeinit(ThreadVars *tv, void *data)
#define BYTES_PER_LOOP
TmEcode ReceiveErfDagThreadInit(ThreadVars *, const void *, void **)
Initialize the ERF receiver thread, generate a single ErfDagThreadVar structure for each thread,...
TmEcode ReceiveErfDagLoop(ThreadVars *, void *data, void *slot)
Receives packets from a DAG interface.
void TmModuleReceiveErfDagRegister(void)
Register the ERF file receiver (reader) module.
void TmModuleDecodeErfDagRegister(void)
Register the ERF file decoder module.
TmEcode DecodeErfDagThreadInit(ThreadVars *, const void *, void **)
void ReceiveErfDagCloseStream(int dagfd, int stream)
TmEcode ReceiveErfDagThreadDeinit(ThreadVars *, void *)
Deinitializes the DAG card.
#define POLL_INTERVAL
struct ErfDagThreadVars_ ErfDagThreadVars
void ReceiveErfDagThreadExitStats(ThreadVars *, void *)
Print some stats to the log at program exit.
#define MINDATA
uint64_t ts
uint16_t rlen
uint16_t wlen
Structure to hold thread specific data for all decode modules.
Definition decode.h:963
LiveDevice * livedev
struct timeval maxwait poll
char dagname[DAGNAME_BUFSIZE]
SCTime_t ts
Definition decode.h:555
uint8_t type
Definition decode.h:511
int datalink
Definition decode.h:639
Per thread variable structure.
Definition threadvars.h:58
char name[16]
Definition threadvars.h:65
const char * name
Definition tm-modules.h:48
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition tm-modules.h:53
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition tm-modules.h:52
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition tm-modules.h:61
uint8_t cap_flags
Definition tm-modules.h:77
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition tm-modules.h:56
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition tm-modules.h:58
uint8_t flags
Definition tm-modules.h:80
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition tm-modules.h:51
struct TmSlot_ * slot_next
Definition tm-threads.h:62
#define SCNtohs(x)
#define BUG_ON(x)
volatile uint8_t suricata_ctl_flags
Definition suricata.c:172
#define SURICATA_STOP
Definition suricata.h:94
#define THV_RUNNING
Definition threadvars.h:55
TmModule tmm_modules[TMM_SIZE]
Definition tm-modules.c:29
#define TM_FLAG_RECEIVE_TM
Definition tm-modules.h:32
#define TM_FLAG_DECODE_TM
Definition tm-modules.h:33
@ TMM_DECODEERFDAG
@ TMM_RECEIVEERFDAG
@ TM_ECODE_FAILED
@ TM_ECODE_OK
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition tm-threads.c:101
void PacketPoolWait(void)
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
#define SCEnter(...)
Definition util-debug.h:277
#define FatalError(...)
Definition util-debug.h:510
#define SCLogDebug(...)
Definition util-debug.h:275
#define SCReturnInt(x)
Definition util-debug.h:281
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition util-debug.h:225
#define SCLogError(...)
Macro used to log ERROR messages.
Definition util-debug.h:267
LiveDevice * LiveGetDevice(const char *name)
Get a pointer to the device at idx.
#define SCFree(p)
Definition util-mem.h:61
#define SCCalloc(nm, sz)
Definition util-mem.h:53
#define unlikely(expr)
#define SC_CAP_NET_ADMIN
Definition util-privs.h:31
#define SCTIME_FROM_SECS(s)
Definition util-time.h:69
#define SCTIME_ADD_USECS(ts, us)
Definition util-time.h:59