Sessions: decode flow of packets when sniffing

class scapy.sessions.DefaultSession(supersession: Self | None = None)[source]

Bases: object

Default session: no stream decoding

process(pkt: Packet) Packet | None[source]

Called to pre-process the packet

recv(sock: SuperSocket) Iterator[Packet][source]

Will be called by sniff() to ask for a packet

class scapy.sessions.IPSession(*args: Any, **kwargs: Any)[source]

Bases: DefaultSession

Defragment IP packets ‘on-the-flow’.

Usage: >>> sniff(session=IPSession)

process(packet: Packet) Packet | None[source]
class scapy.sessions.StringBuffer[source]

Bases: object

StringBuffer is an object used to re-order data received during a TCP transmission.

Each TCP fragment contains a sequence number, which marks (relatively to the first sequence number) the index of the data contained in the fragment.

If a TCP fragment is missed, this class will fill the missing space with zeros.

append(data: bytes, seq: int | None = None) None[source]
clear() None[source]
full() bool[source]
shiftleft(i: int) None[source]
class scapy.sessions.TCPSession(app: bool = False, *args: Any, **kwargs: Any)[source]

Bases: IPSession

A Session that reconstructs TCP streams.

NOTE: this has the same effect as wrapping a real socket.socket into StreamSocket, but for all concurrent TCP streams (can be used on pcaps or sniffed sessions).

NOTE: only protocols that implement a tcp_reassemble function will be processed by this session. Other protocols will not be reconstructed.

DEV: implement a class-function tcp_reassemble in your Packet class:

def tcp_reassemble(cls, data, metadata, session):
    # data = the reassembled data from the same request/flow
    # metadata = empty dictionary, that can be used to store data
    #            during TCP reassembly
    # session = a dictionary proper to the bidirectional TCP session,
    #           that can be used to store anything
    # If the packet is available, return it. Otherwise don't.
    # Whenever you return a packet, the buffer will be discarded.
    return pkt
    # Otherwise, maybe store stuff in metadata, and return None,
    # as you need additional data.
    return None

For more details and a real example, see:


app – Whether the socket is on application layer = has no TCP layer. This is identical to StreamSocket so only use this if your underlying source of data isn’t a socket.socket.

process(pkt: Packet, cls: Type[Packet] | None = None) Packet | None[source]

Process each packet: matches the TCP seq/ack numbers to follow the TCP streams, and orders the fragments.

recv(sock: SuperSocket) Iterator[Packet][source]

Will be called by sniff() to ask for a packet

scapy.sessions.streamcls(cls: Type[Packet]) Callable[[bytes, Dict[str, Any], Dict[str, Any]], Packet | None][source]

Wraps a class for use when dissecting streams.