scapy.layers.netflow

Cisco NetFlow protocol v1, v5, v9 and v10 (IPFix)

HowTo dissect NetflowV9/10 (IPFix) packets

# From a pcap / list of packets

Using sniff and sessions:

>>> sniff(offline=open("my_great_pcap.pcap", "rb"), session=NetflowSession)

Using the netflowv9_defragment/ipfix_defragment commands:

  • get a list of packets containing NetflowV9/10 packets

  • call netflowv9_defragment(plist) to defragment the list

(ipfix_defragment is an alias for netflowv9_defragment)

# Live / on-the-flow / other: use NetflowSession:

>>> sniff(session=NetflowSession, prn=[...])

Note

You will find more examples over https://scapy.readthedocs.io/en/latest/layers/netflow.html

scapy.layers.netflow.GetNetflowRecordV9(flowset, templateID=None)[source]

Get a NetflowRecordV9/10 for a specific NetflowFlowsetV9/10.

Have a look at the online doc for examples.

class scapy.layers.netflow.N9SecondsIntField(name, default, *args, **kargs)[source]

Bases: SecondsIntField, _AdjustableNetflowField

Defines dateTimeSeconds (without EPOCH: just seconds)

class scapy.layers.netflow.N9UTCTimeField(name, default, *args, **kargs)[source]

Bases: UTCTimeField, _AdjustableNetflowField

Defines dateTimeSeconds (EPOCH)

class scapy.layers.netflow.NetflowDataflowsetV9(_pkt, /, *, templateID=255, length=None, records=[])[source]

Bases: Packet

aliastypes = [<class 'scapy.layers.netflow.NetflowDataflowsetV9'>, <class 'scapy.packet.Packet'>]
classmethod dispatch_hook(_pkt=None, *args, **kargs)[source]
fields_desc: ClassVar[List[Field[Any, Any] | _FieldContainer]] = [<ShortField (NetflowDataflowsetV9).templateID>, <ShortField (NetflowDataflowsetV9).length>, <PacketListField (NetflowDataflowsetV9).records>]
payload_guess: List[Tuple[Dict[str, Any], Type[Packet]]] = [({}, <class 'scapy.layers.netflow.NetflowDataflowsetV9'>)]
post_build(pkt, pay)[source]
class scapy.layers.netflow.NetflowFlowsetV9(_pkt, /, *, flowSetID=0, length=None, templates=[])[source]

Bases: Packet

aliastypes = [<class 'scapy.layers.netflow.NetflowFlowsetV9'>, <class 'scapy.packet.Packet'>]
fields_desc: ClassVar[List[Field[Any, Any] | _FieldContainer]] = [<ShortField (NetflowFlowsetV9).flowSetID>, <FieldLenField (NetflowFlowsetV9).length>, <PacketListField (NetflowFlowsetV9).templates>]
payload_guess: List[Tuple[Dict[str, Any], Type[Packet]]] = [({}, <class 'scapy.layers.netflow.NetflowDataflowsetV9'>)]
class scapy.layers.netflow.NetflowHeader(_pkt, /, *, version=1)[source]

Bases: Packet

aliastypes = [<class 'scapy.layers.netflow.NetflowHeader'>, <class 'scapy.packet.Packet'>]
fields_desc: ClassVar[List[Field[Any, Any] | _FieldContainer]] = [<ShortField (NetflowHeader).version>]
payload_guess: List[Tuple[Dict[str, Any], Type[Packet]]] = [({'version': 1}, <class 'scapy.layers.netflow.NetflowHeaderV1'>), ({'version': 5}, <class 'scapy.layers.netflow.NetflowHeaderV5'>), ({'version': 9}, <class 'scapy.layers.netflow.NetflowHeaderV9'>), ({'version': 10}, <class 'scapy.layers.netflow.NetflowHeaderV10'>)]
class scapy.layers.netflow.NetflowHeaderV1(_pkt, /, *, count=None, sysUptime=0, unixSecs=0, unixNanoSeconds=0)[source]

Bases: Packet

aliastypes = [<class 'scapy.layers.netflow.NetflowHeaderV1'>, <class 'scapy.packet.Packet'>]
fields_desc: ClassVar[List[Field[Any, Any] | _FieldContainer]] = [<ShortField (NetflowHeaderV1).count>, <IntField (NetflowHeaderV1).sysUptime>, <UTCTimeField (NetflowHeaderV1).unixSecs>, <UTCTimeField (NetflowHeaderV1).unixNanoSeconds>]
payload_guess: List[Tuple[Dict[str, Any], Type[Packet]]] = [({}, <class 'scapy.layers.netflow.NetflowRecordV1'>)]
post_build(pkt, pay)[source]
class scapy.layers.netflow.NetflowHeaderV10(_pkt, /, *, length=None, ExportTime=0, flowSequence=0, ObservationDomainID=0)[source]

Bases: Packet

IPFix (Netflow V10) Header

aliastypes = [<class 'scapy.layers.netflow.NetflowHeaderV10'>, <class 'scapy.packet.Packet'>]
fields_desc: ClassVar[List[Field[Any, Any] | _FieldContainer]] = [<ShortField (NetflowHeaderV10).length>, <UTCTimeField (NetflowHeaderV10).ExportTime>, <IntField (NetflowHeaderV10).flowSequence>, <IntField (NetflowHeaderV10).ObservationDomainID>]
payload_guess: List[Tuple[Dict[str, Any], Type[Packet]]] = [({}, <class 'scapy.layers.netflow.NetflowDataflowsetV9'>)]
post_build(pkt, pay)[source]
class scapy.layers.netflow.NetflowHeaderV5(_pkt, /, *, count=None, sysUptime=0, unixSecs=0, unixNanoSeconds=0, flowSequence=0, engineType=0, engineID=0, samplingInterval=0)[source]

Bases: Packet

aliastypes = [<class 'scapy.layers.netflow.NetflowHeaderV5'>, <class 'scapy.packet.Packet'>]
fields_desc: ClassVar[List[Field[Any, Any] | _FieldContainer]] = [<ShortField (NetflowHeaderV5).count>, <IntField (NetflowHeaderV5).sysUptime>, <UTCTimeField (NetflowHeaderV5).unixSecs>, <UTCTimeField (NetflowHeaderV5).unixNanoSeconds>, <IntField (NetflowHeaderV5).flowSequence>, <ByteField (NetflowHeaderV5).engineType>, <ByteField (NetflowHeaderV5).engineID>, <ShortField (NetflowHeaderV5).samplingInterval>]
payload_guess: List[Tuple[Dict[str, Any], Type[Packet]]] = [({}, <class 'scapy.layers.netflow.NetflowRecordV5'>)]
post_build(pkt, pay)[source]
class scapy.layers.netflow.NetflowHeaderV9(_pkt, /, *, count=None, sysUptime=0, unixSecs=None, packageSequence=0, SourceID=0)[source]

Bases: Packet

aliastypes = [<class 'scapy.layers.netflow.NetflowHeaderV9'>, <class 'scapy.packet.Packet'>]
fields_desc: ClassVar[List[Field[Any, Any] | _FieldContainer]] = [<ShortField (NetflowHeaderV9).count>, <IntField (NetflowHeaderV9).sysUptime>, <UTCTimeField (NetflowHeaderV9).unixSecs>, <IntField (NetflowHeaderV9).packageSequence>, <IntField (NetflowHeaderV9).SourceID>]
payload_guess: List[Tuple[Dict[str, Any], Type[Packet]]] = [({}, <class 'scapy.layers.netflow.NetflowDataflowsetV9'>)]
post_build(pkt, pay)[source]
class scapy.layers.netflow.NetflowOptionsFlowset10(_pkt, /, *, flowSetID=3, length=None, templateID=255, field_count=None, scope_field_count=None, scopes=[], options=[], pad=None)[source]

Bases: NetflowOptionsFlowsetV9

Netflow V10 (IPFix) Options Template FlowSet

aliastypes = [<class 'scapy.layers.netflow.NetflowOptionsFlowset10'>, <class 'scapy.layers.netflow.NetflowOptionsFlowsetV9'>, <class 'scapy.packet.Packet'>]
fields_desc: ClassVar[List[Field[Any, Any] | _FieldContainer]] = [<ShortField (NetflowOptionsFlowset10).flowSetID>, <ShortField (NetflowOptionsFlowset10).length>, <ShortField (NetflowOptionsFlowset10).templateID>, <FieldLenField (NetflowOptionsFlowset10).field_count>, <FieldLenField (NetflowOptionsFlowset10).scope_field_count>, <PacketListField (NetflowOptionsFlowset10).scopes>, <PacketListField (NetflowOptionsFlowset10).options>, <StrLenField (NetflowOptionsFlowset10).pad>]
post_build(pkt, pay)[source]
class scapy.layers.netflow.NetflowOptionsFlowsetOptionV9(_pkt, /, *, enterpriseBit=0, optionFieldType=None, optionFieldlength=0, enterpriseNumber=0)[source]

Bases: Packet

aliastypes = [<class 'scapy.layers.netflow.NetflowOptionsFlowsetOptionV9'>, <class 'scapy.packet.Packet'>]
default_payload_class(p)[source]
fields_desc: ClassVar[List[Field[Any, Any] | _FieldContainer]] = [<BitField (NetflowOptionsFlowsetOptionV9).enterpriseBit>, <BitEnumField (NetflowOptionsFlowsetOptionV9).optionFieldType>, <ShortField (NetflowOptionsFlowsetOptionV9).optionFieldlength>, <scapy.fields.ConditionalField object>]
class scapy.layers.netflow.NetflowOptionsFlowsetScopeV9(_pkt, /, *, scopeFieldType=None, scopeFieldlength=0)[source]

Bases: Packet

aliastypes = [<class 'scapy.layers.netflow.NetflowOptionsFlowsetScopeV9'>, <class 'scapy.packet.Packet'>]
default_payload_class(p)[source]
fields_desc: ClassVar[List[Field[Any, Any] | _FieldContainer]] = [<ShortEnumField (NetflowOptionsFlowsetScopeV9).scopeFieldType>, <ShortField (NetflowOptionsFlowsetScopeV9).scopeFieldlength>]
class scapy.layers.netflow.NetflowOptionsFlowsetV9(_pkt, /, *, flowSetID=1, length=None, templateID=255, option_scope_length=None, option_field_length=None, scopes=[], options=[], pad=None)[source]

Bases: Packet

aliastypes = [<class 'scapy.layers.netflow.NetflowOptionsFlowsetV9'>, <class 'scapy.packet.Packet'>]
default_payload_class(p)[source]
fields_desc: ClassVar[List[Field[Any, Any] | _FieldContainer]] = [<ShortField (NetflowOptionsFlowsetV9).flowSetID>, <ShortField (NetflowOptionsFlowsetV9).length>, <ShortField (NetflowOptionsFlowsetV9).templateID>, <FieldLenField (NetflowOptionsFlowsetV9).option_scope_length>, <FieldLenField (NetflowOptionsFlowsetV9).option_field_length>, <PacketListField (NetflowOptionsFlowsetV9).scopes>, <PacketListField (NetflowOptionsFlowsetV9).options>, <StrLenField (NetflowOptionsFlowsetV9).pad>]
payload_guess: List[Tuple[Dict[str, Any], Type[Packet]]] = [({}, <class 'scapy.layers.netflow.NetflowDataflowsetV9'>)]
post_build(pkt, pay)[source]
class scapy.layers.netflow.NetflowOptionsRecordOptionV9(_pkt, /, *, fieldValue=b'')[source]

Bases: NetflowRecordV9

aliastypes = [<class 'scapy.layers.netflow.NetflowOptionsRecordOptionV9'>, <class 'scapy.layers.netflow.NetflowRecordV9'>, <class 'scapy.packet.Packet'>]
fields_desc: ClassVar[List[Field[Any, Any] | _FieldContainer]] = [<StrField (NetflowRecordV9,NetflowOptionsRecordScopeV9,NetflowOptionsRecordOptionV9).fieldValue>]
class scapy.layers.netflow.NetflowOptionsRecordScopeV9(_pkt, /, *, fieldValue=b'')[source]

Bases: NetflowRecordV9

aliastypes = [<class 'scapy.layers.netflow.NetflowOptionsRecordScopeV9'>, <class 'scapy.layers.netflow.NetflowRecordV9'>, <class 'scapy.packet.Packet'>]
fields_desc: ClassVar[List[Field[Any, Any] | _FieldContainer]] = [<StrField (NetflowRecordV9,NetflowOptionsRecordScopeV9,NetflowOptionsRecordOptionV9).fieldValue>]
class scapy.layers.netflow.NetflowRecordV1(_pkt, /, *, ipsrc='0.0.0.0', ipdst='0.0.0.0', nexthop='0.0.0.0', inputIfIndex=0, outpuIfIndex=0, dpkts=0, dbytes=0, starttime=0, endtime=0, srcport=0, dstport=0, padding=0, proto=0, tos=0, padding1=0, padding2=0)[source]

Bases: Packet

aliastypes = [<class 'scapy.layers.netflow.NetflowRecordV1'>, <class 'scapy.packet.Packet'>]
fields_desc: ClassVar[List[Field[Any, Any] | _FieldContainer]] = [<IPField (NetflowRecordV1).ipsrc>, <IPField (NetflowRecordV1).ipdst>, <IPField (NetflowRecordV1).nexthop>, <ShortField (NetflowRecordV1).inputIfIndex>, <ShortField (NetflowRecordV1).outpuIfIndex>, <IntField (NetflowRecordV1).dpkts>, <IntField (NetflowRecordV1).dbytes>, <IntField (NetflowRecordV1).starttime>, <IntField (NetflowRecordV1).endtime>, <ShortField (NetflowRecordV1).srcport>, <ShortField (NetflowRecordV1).dstport>, <ShortField (NetflowRecordV1).padding>, <ByteField (NetflowRecordV1).proto>, <ByteField (NetflowRecordV1).tos>, <IntField (NetflowRecordV1).padding1>, <IntField (NetflowRecordV1).padding2>]
payload_guess: List[Tuple[Dict[str, Any], Type[Packet]]] = [({}, <class 'scapy.layers.netflow.NetflowRecordV1'>)]
class scapy.layers.netflow.NetflowRecordV5(_pkt, /, *, src='127.0.0.1', dst='127.0.0.1', nexthop='0.0.0.0', input=0, output=0, dpkts=1, dOctets=60, first=0, last=0, srcport=0, dstport=0, pad1=0, tcpFlags=<Flag 2 (S)>, prot=6, tos=0, src_as=0, dst_as=0, src_mask=0, dst_mask=0, pad2=0)[source]

Bases: Packet

aliastypes = [<class 'scapy.layers.netflow.NetflowRecordV5'>, <class 'scapy.packet.Packet'>]
fields_desc: ClassVar[List[Field[Any, Any] | _FieldContainer]] = [<IPField (NetflowRecordV5).src>, <IPField (NetflowRecordV5).dst>, <IPField (NetflowRecordV5).nexthop>, <ShortField (NetflowRecordV5).input>, <ShortField (NetflowRecordV5).output>, <IntField (NetflowRecordV5).dpkts>, <IntField (NetflowRecordV5).dOctets>, <IntField (NetflowRecordV5).first>, <IntField (NetflowRecordV5).last>, <ShortField (NetflowRecordV5).srcport>, <ShortField (NetflowRecordV5).dstport>, <ByteField (NetflowRecordV5).pad1>, <FlagsField (NetflowRecordV5).tcpFlags>, <ByteEnumField (NetflowRecordV5).prot>, <ByteField (NetflowRecordV5).tos>, <ShortField (NetflowRecordV5).src_as>, <ShortField (NetflowRecordV5).dst_as>, <ByteField (NetflowRecordV5).src_mask>, <ByteField (NetflowRecordV5).dst_mask>, <ShortField (NetflowRecordV5).pad2>]
payload_guess: List[Tuple[Dict[str, Any], Type[Packet]]] = [({}, <class 'scapy.layers.netflow.NetflowRecordV5'>)]
class scapy.layers.netflow.NetflowRecordV9(_pkt, /, *, fieldValue=b'')[source]

Bases: Packet

aliastypes = [<class 'scapy.layers.netflow.NetflowRecordV9'>, <class 'scapy.packet.Packet'>]
default_payload_class(p)[source]
fields_desc: ClassVar[List[Field[Any, Any] | _FieldContainer]] = [<StrField (NetflowRecordV9,NetflowOptionsRecordScopeV9,NetflowOptionsRecordOptionV9).fieldValue>]
class scapy.layers.netflow.NetflowSession(*args, **kwargs)[source]

Bases: IPSession

Session used to defragment NetflowV9/10 packets on the flow. See help(scapy.layers.netflow) for more infos.

process(pkt: Packet) Packet | None[source]
class scapy.layers.netflow.NetflowTemplateFieldV9(_pkt, /, *, enterpriseBit=0, fieldType=None, fieldLength=None, enterpriseNumber=0)[source]

Bases: Packet

aliastypes = [<class 'scapy.layers.netflow.NetflowTemplateFieldV9'>, <class 'scapy.packet.Packet'>]
default_payload_class(p)[source]
fields_desc: ClassVar[List[Field[Any, Any] | _FieldContainer]] = [<BitField (NetflowTemplateFieldV9).enterpriseBit>, <BitEnumField (NetflowTemplateFieldV9).fieldType>, <ShortField (NetflowTemplateFieldV9).fieldLength>, <scapy.fields.ConditionalField object>]
class scapy.layers.netflow.NetflowTemplateV9(_pkt, /, *, templateID=255, fieldCount=None, template_fields=[])[source]

Bases: Packet

aliastypes = [<class 'scapy.layers.netflow.NetflowTemplateV9'>, <class 'scapy.packet.Packet'>]
default_payload_class(p)[source]
fields_desc: ClassVar[List[Field[Any, Any] | _FieldContainer]] = [<ShortField (NetflowTemplateV9).templateID>, <FieldLenField (NetflowTemplateV9).fieldCount>, <PacketListField (NetflowTemplateV9).template_fields>]
class scapy.layers.netflow.ShortOrInt(name: str, default: int | None)[source]

Bases: IntField

getfield(pkt, x)[source]
scapy.layers.netflow.ipfix_defragment(*args, **kwargs)[source]

Alias for netflowv9_defragment

scapy.layers.netflow.netflowv9_defragment(plist, verb=1)[source]

Process all NetflowV9/10 Packets to match IDs of the DataFlowsets with the Headers

params:
  • plist: the list of mixed NetflowV9/10 packets.

  • verb: verbose print (0/1)