diff --git a/ChangeLog b/ChangeLog index f0d6e02..0b472f9 100755 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,6 @@ +2017-12-17 +- Add ipfix sampling. Process option template/record with sampling elements 34 and 35 + 2017-12-10 - Add lz4 compression - Remove old xstat legancy code, not needed diff --git a/bin/ipfix.c b/bin/ipfix.c index 9d61253..111acba 100644 --- a/bin/ipfix.c +++ b/bin/ipfix.c @@ -289,10 +289,16 @@ static uint32_t processed_records; extern int verbose; extern uint32_t Max_num_extensions; extern extension_descriptor_t extension_descriptor[]; +extern uint32_t default_sampling; extern uint32_t overwrite_sampling; extern uint32_t exporter_sysid; // prototypes +static void InsertStdSamplerOffset(FlowSource_t *fs, uint16_t id, uint16_t offset_std_sampler_interval, + uint16_t offset_std_sampler_algorithm); + +static void InsertSampler(FlowSource_t *fs, exporter_ipfix_domain_t *exporter, int32_t id, uint16_t mode, uint32_t interval); + static input_translation_t *add_translation_table(exporter_ipfix_domain_t *exporter, uint16_t id); static void remove_translation_table(FlowSource_t *fs, exporter_ipfix_domain_t *exporter, uint16_t id); @@ -311,6 +317,7 @@ static inline void Process_ipfix_template_add(exporter_ipfix_domain_t *exporter, static inline void Process_ipfix_template_withdraw(exporter_ipfix_domain_t *exporter, void *DataPtr, uint32_t size_left, FlowSource_t *fs); +static inline void Process_ipfix_option_data(exporter_ipfix_domain_t *exporter, void *data_flowset, FlowSource_t *fs); #include "inline.c" #include "nffile_inline.c" @@ -878,6 +885,126 @@ size_t size_required; } // End of setup_translation_table +static void InsertStdSamplerOffset( FlowSource_t *fs, uint16_t id, uint16_t offset_std_sampler_interval, uint16_t offset_std_sampler_algorithm) { +option_offset_t **t; + + t = &(fs->option_offset_table); + while ( *t ) { + if ( (*t)->id == id ) { // table already known to us - update data + dbg_printf("Found existing std sampling info in template %i\n", id); + break; + } + + t = &((*t)->next); + } + + if ( *t == NULL ) { // new table + dbg_printf("Allocate new std sampling info from template %i\n", id); + *t = (option_offset_t *)calloc(1, sizeof(option_offset_t)); + if ( !*t ) { + LogError("malloc() allocation error at %s line %u: %s" , __FILE__, __LINE__, strerror (errno)); + return ; + } + LogInfo("Process_v9: New std sampler: interval: %i, algorithm: %i", + offset_std_sampler_interval, offset_std_sampler_algorithm); + } // else existing table + + dbg_printf("Insert/Update sampling info from template %i\n", id); + SetFlag((*t)->flags, HAS_STD_SAMPLER_DATA); + (*t)->id = id; + (*t)->offset_id = 0; + (*t)->offset_mode = 0; + (*t)->offset_interval = 0; + (*t)->offset_std_sampler_interval = offset_std_sampler_interval; + (*t)->offset_std_sampler_algorithm = offset_std_sampler_algorithm; + +} // End of InsertStdSamplerOffset + +static void InsertSampler(FlowSource_t *fs, exporter_ipfix_domain_t *exporter, int32_t id, uint16_t mode, uint32_t interval) { +generic_sampler_t *sampler; + + dbg_printf("[%u] Insert Sampler: Exporter is 0x%llu\n", exporter->info.id, (long long unsigned)exporter); + if ( !exporter->sampler ) { + // no samplers so far + sampler = (generic_sampler_t *)malloc(sizeof(generic_sampler_t)); + if ( !sampler ) { + LogError( "Process_v9: Panic! malloc(): %s line %d: %s", __FILE__, __LINE__, strerror (errno)); + return; + } + + sampler->info.header.type = SamplerInfoRecordype; + sampler->info.header.size = sizeof(sampler_info_record_t); + sampler->info.exporter_sysid = exporter->info.sysid; + sampler->info.id = id; + sampler->info.mode = mode; + sampler->info.interval = interval; + sampler->next = NULL; + exporter->sampler = sampler; + + FlushInfoSampler(fs, &(sampler->info)); + LogInfo( "Add new sampler: ID: %i, mode: %u, interval: %u\n", + id, mode, interval); + dbg_printf("Add new sampler: ID: %i, mode: %u, interval: %u\n", + id, mode, interval); + + } else { + sampler = exporter->sampler; + while ( sampler ) { + // test for update of existing sampler + if ( sampler->info.id == id ) { + // found same sampler id - update record + dbg_printf("Update existing sampler id: %i, mode: %u, interval: %u\n", + id, mode, interval); + + // we update only on changes + if ( mode != sampler->info.mode || interval != sampler->info.interval ) { + FlushInfoSampler(fs, &(sampler->info)); + sampler->info.mode = mode; + sampler->info.interval = interval; + LogInfo( "Update existing sampler id: %i, mode: %u, interval: %u\n", + id, mode, interval); + } else { + dbg_printf("Sampler unchanged!\n"); + } + + break; + } + + // test for end of chain + if ( sampler->next == NULL ) { + // end of sampler chain - insert new sampler + sampler->next = (generic_sampler_t *)malloc(sizeof(generic_sampler_t)); + if ( !sampler->next ) { + LogError( "Process_v9: Panic! malloc(): %s line %d: %s", __FILE__, __LINE__, strerror (errno)); + return; + } + sampler = sampler->next; + + sampler->info.header.type = SamplerInfoRecordype; + sampler->info.header.size = sizeof(sampler_info_record_t); + sampler->info.exporter_sysid = exporter->info.sysid; + sampler->info.id = id; + sampler->info.mode = mode; + sampler->info.interval = interval; + sampler->next = NULL; + + FlushInfoSampler(fs, &(sampler->info)); + + LogInfo( "Append new sampler: ID: %u, mode: %u, interval: %u\n", + id, mode, interval); + dbg_printf("Append new sampler: ID: %u, mode: %u, interval: %u\n", + id, mode, interval); + break; + } + + // advance + sampler = sampler->next; + } + + } + +} // End of InsertSampler + static inline void Process_ipfix_templates(exporter_ipfix_domain_t *exporter, void *flowset_header, uint32_t size_left, FlowSource_t *fs) { ipfix_template_record_t *ipfix_template_record; void *DataPtr; @@ -1115,6 +1242,9 @@ uint16_t offset_std_sampler_interval, offset_std_sampler_algorithm, found_std_sa DataPtr += 6; size_left -= 6; + dbg_printf("Decode Option Template. id: %u, field count: %u, scope field count: %u\n", + id, field_count, scope_field_count); + if ( scope_field_count == 0 ) { LogError("Process_ipfx: [%u] scope field count error: length must not be zero", exporter->info.id); @@ -1122,7 +1252,7 @@ uint16_t offset_std_sampler_interval, offset_std_sampler_algorithm, found_std_sa return; } - size_required = (field_count + scope_field_count) * 2 * sizeof(uint16_t); + size_required = 2 * field_count * sizeof(uint16_t); dbg_printf("Size left: %u, size required: %u\n", size_left, size_required); if ( size_left < size_required ) { LogError("Process_ipfix: [%u] option template length error: size left %u too small for %u scopes length and %u options length", @@ -1132,15 +1262,22 @@ uint16_t offset_std_sampler_interval, offset_std_sampler_algorithm, found_std_sa return; } - dbg_printf("Decode Option Template. id: %u, field count: %u, scope field count: %u\n", - id, field_count, scope_field_count); - if ( scope_field_count == 0 ) { LogError("Process_ipfxi: [%u] scope field count error: length must not be zero", exporter->info.id); return; } + sampler_id_length = 0; + offset_sampler_id = 0; + offset_sampler_mode = 0; + offset_sampler_interval = 0; + offset_std_sampler_interval = 0; + offset_std_sampler_algorithm = 0; + found_sampler = 0; + found_std_sampling = 0; + offset = 0; + for ( i=0; iinfo.id, length); + } break; - case NF9_SAMPLING_ALGORITHM: - offset_std_sampler_algorithm = offset; - found_std_sampling++; - break; - - // individual samplers - case NF9_FLOW_SAMPLER_ID: - offset_sampler_id = offset; - sampler_id_length = length; - found_sampler++; - break; - case FLOW_SAMPLER_MODE: - offset_sampler_mode = offset; - found_sampler++; - break; - case NF9_FLOW_SAMPLER_RANDOM_INTERVAL: - offset_sampler_interval = offset; - found_sampler++; + case IPFIX_samplingAlgorithm: + if ( length == 1 ) { + offset_std_sampler_algorithm = offset; + dbg_printf(" 1 byte sampling algorithm option at offset: %u\n", offset); + found_std_sampling++; + } else { + LogError("Process_ipfix: [%u] option template error: algorithm option lenth != 1 byte: %u", + exporter->info.id, length); + } break; } + offset += length; - if ( found_sampler == 3 ) { // need all three tags - dbg_printf("[%u] Sampling information found\n", exporter->info.id); - InsertSamplerOffset(fs, id, offset_sampler_id, sampler_id_length, offset_sampler_mode, offset_sampler_interval); - } else if ( found_std_sampling == 2 ) { // need all two tags - dbg_printf("[%u] Std sampling information found\n", exporter->info.id); - InsertStdSamplerOffset(fs, id, offset_std_sampler_interval, offset_std_sampler_algorithm); - } else { - dbg_printf("[%u] No Sampling information found\n", exporter->info.id); } -*/ + + if ( found_std_sampling == 2 ) { // need all two tags + dbg_printf("[%u] Std sampling information found\n", exporter->info.id); + InsertStdSamplerOffset(fs, id, offset_std_sampler_interval, offset_std_sampler_algorithm); + } + dbg_printf("\n"); processed_records++; @@ -1275,33 +1397,19 @@ char *string; // Check if sampling is announced sampling_rate = 1; -/* ### - if ( table->sampler_offset && fs->sampler ) { - uint32_t sampler_id; - if ( table->sampler_size == 2 ) { - sampler_id = Get_val16((void *)&in[table->sampler_offset]); - } else { - sampler_id = in[table->sampler_offset]; - } - if ( fs->sampler[sampler_id] ) { - sampling_rate = fs->sampler[sampler_id]->interval; - dbg_printf("[%u] Sampling ID %u available\n", exporter->info.id, sampler_id); - dbg_printf("[%u] Sampler_offset : %u\n", exporter->info.id, table->sampler_offset); - dbg_printf("[%u] Sampler Data : %s\n", exporter->info.id, fs->sampler == NULL ? "not available" : "available"); - dbg_printf("[%u] Sampling rate: %llu\n", exporter->info.id, (long long unsigned)sampling_rate); - } else { - sampling_rate = default_sampling; - dbg_printf("[%u] Sampling ID %u not (yet) available\n", exporter->info.id, sampler_id); - } - } else if ( fs->std_sampling.interval > 0 ) { - sampling_rate = fs->std_sampling.interval; + generic_sampler_t *sampler = exporter->sampler; + while ( sampler && sampler->info.id != -1 ) + sampler = sampler->next; + + if ( sampler ) { + sampling_rate = sampler->info.interval; dbg_printf("[%u] Std sampling available for this flow source: Rate: %llu\n", exporter->info.id, (long long unsigned)sampling_rate); } else { sampling_rate = default_sampling; dbg_printf("[%u] No Sampling record found\n", exporter->info.id); } -### */ + if ( overwrite_sampling > 0 ) { sampling_rate = overwrite_sampling; dbg_printf("[%u] Hard overwrite sampling rate: %llu\n", exporter->info.id, (long long unsigned)sampling_rate); @@ -1594,6 +1702,70 @@ char *string; } // End of Process_ipfix_data +static inline void Process_ipfix_option_data(exporter_ipfix_domain_t *exporter, void *data_flowset, FlowSource_t *fs) { +option_offset_t *offset_table; +uint32_t id; +uint8_t *in; + + id = GET_FLOWSET_ID(data_flowset); + + offset_table = fs->option_offset_table; + while ( offset_table && offset_table->id != id ) + offset_table = offset_table->next; + + if ( !offset_table ) { + // should never happen - catch it anyway + LogError( "Process_ipfix: Panic! - No Offset table found! : %s line %d", __FILE__, __LINE__); + return; + } + +#ifdef DEVEL + uint32_t size_left = GET_FLOWSET_LENGTH(data_flowset) - 4; // -4 for data flowset header -> id and length + dbg_printf("[%u] Process option data flowset size: %u\n", exporter->info.id, size_left); +#endif + + // map input buffer as a byte array + in = (uint8_t *)(data_flowset + 4); // skip flowset header + + if ( TestFlag(offset_table->flags, HAS_SAMPLER_DATA) ) { + int32_t id; + uint16_t mode; + uint32_t interval; + if (offset_table->sampler_id_length == 2) { + id = Get_val16((void *)&in[offset_table->offset_id]); + } else { + id = in[offset_table->offset_id]; + } + mode = in[offset_table->offset_mode]; + interval = Get_val32((void *)&in[offset_table->offset_interval]); + + InsertSampler(fs, exporter, id, mode, interval); + + dbg_printf("Extracted Sampler data:\n"); + dbg_printf("Sampler ID : %u\n", id); + dbg_printf("Sampler mode : %u\n", mode); + dbg_printf("Sampler interval: %u\n", interval); + } + + if ( TestFlag(offset_table->flags, HAS_STD_SAMPLER_DATA) ) { + int32_t id = -1; + uint16_t mode = in[offset_table->offset_std_sampler_algorithm]; + uint32_t interval = Get_val32((void *)&in[offset_table->offset_std_sampler_interval]); + + InsertSampler(fs, exporter, id, mode, interval); + + dbg_printf("Extracted Std Sampler data:\n"); + dbg_printf("Sampler ID : %i\n", id); + dbg_printf("Sampler algorithm: %u\n", mode); + dbg_printf("Sampler interval : %u\n", interval); + + dbg_printf("Set std sampler: algorithm: %u, interval: %u\n", + mode, interval); + } + processed_records++; + +} // End of Process_ipfix_option_data + void Process_IPFIX(void *in_buff, ssize_t in_buff_cnt, FlowSource_t *fs) { exporter_ipfix_domain_t *exporter; ssize_t size_left; @@ -1723,7 +1895,7 @@ static uint32_t packet_cntr = 0; Process_ipfix_data(exporter, flowset_header, fs, table); exporter->DataRecords++; } else if ( HasOptionTable(fs, flowset_id) ) { - // Process_ipfix_option_data(exporter, flowset_header, fs); + Process_ipfix_option_data(exporter, flowset_header, fs); } else { // maybe a flowset with option data dbg_printf("Process ipfix: [%u] No table for id %u -> Skip record\n", diff --git a/bin/ipfix.h b/bin/ipfix.h index 71b3c6a..6c1feed 100644 --- a/bin/ipfix.h +++ b/bin/ipfix.h @@ -224,7 +224,9 @@ typedef struct ipfix_template_elements_e_s { #define IPFIX_DestinationIPv6PrefixLength 30 #define IPFIX_flowLabelIPv6 31 #define IPFIX_icmpTypeCodeIPv4 32 -// reserved 34, 35 +// 33 igmpTYpe +#define IPFIX_samplingInterval 34 +#define IPFIX_samplingAlgorithm 35 // reserved 38, 39 // reserved 48, 49, 50, 51