scapy.contrib.isotp module

ISOTPSocket.

class scapy.contrib.isotp.ISOTP(*args, **kwargs)

Bases: scapy.packet.Packet

aliastypes = [<class 'scapy.contrib.isotp.ISOTP'>, <class 'scapy.packet.Packet'>]
answers(other)

DEV: true if self is an answer from other

default_fields
static defragment(can_frames, use_extended_addressing=None)
direction
dst
exdst
explicit
exsrc
fields
fields_desc = [<Field (ISOTP).data>]
fieldtype
fragment()
name
original
overload_fields
overloaded_fields
packetfields
payload
post_transforms
raw_packet_cache
raw_packet_cache_fields
sent_time
sniffed_on
src
time
underlayer
validate_fields()
wirelen
class scapy.contrib.isotp.ISOTPHeader

Bases: scapy.layers.can.CAN

aliastypes = [<class 'scapy.contrib.isotp.ISOTPHeader'>, <class 'scapy.layers.can.CAN'>, <class 'scapy.packet.Packet'>]
extract_padding(p)

DEV: to be overloaded to extract current layer’s padding.

Parameters:s (str) – the current layer
Returns:a couple of strings (actual layer, padding)
fields_desc = [<Field (ISOTPHeader,ISOTPHeaderEA).flags>, <Field (ISOTPHeader,ISOTPHeaderEA).identifier>, <Field (ISOTPHeader,ISOTPHeaderEA).length>, <Field (ISOTPHeader,ISOTPHeaderEA).reserved>]
guess_payload_class(payload)

ISOTP encodes the frame type in the first nibble of a frame.

post_build(pkt, pay)

This will set the ByteField ‘length’ to the correct value.

class scapy.contrib.isotp.ISOTPHeaderEA

Bases: scapy.contrib.isotp.ISOTPHeader

aliastypes = [<class 'scapy.contrib.isotp.ISOTPHeaderEA'>, <class 'scapy.contrib.isotp.ISOTPHeader'>, <class 'scapy.layers.can.CAN'>, <class 'scapy.packet.Packet'>]
fields_desc = [<Field (ISOTPHeader,ISOTPHeaderEA).flags>, <Field (ISOTPHeader,ISOTPHeaderEA).identifier>, <Field (ISOTPHeader,ISOTPHeaderEA).length>, <Field (ISOTPHeader,ISOTPHeaderEA).reserved>, <Field (ISOTPHeaderEA).extended_address>]
post_build(p, pay)

This will set the ByteField ‘length’ to the correct value. ‘chb(len(pay) + 1)’ is required, because the field ‘extended_address’ is counted as payload on the CAN layer

class scapy.contrib.isotp.ISOTP_SF

Bases: scapy.packet.Packet

aliastypes = [<class 'scapy.contrib.isotp.ISOTP_SF'>, <class 'scapy.packet.Packet'>]
fields_desc = [<Field (ISOTP_SF).type>, <Field (ISOTP_SF).message_size>, <Field (ISOTP_SF).data>]
class scapy.contrib.isotp.ISOTP_FF

Bases: scapy.packet.Packet

aliastypes = [<class 'scapy.contrib.isotp.ISOTP_FF'>, <class 'scapy.packet.Packet'>]
fields_desc = [<Field (ISOTP_FF).type>, <Field (ISOTP_FF).message_size>, <scapy.fields.ConditionalField object>, <Field (ISOTP_FF).data>]
class scapy.contrib.isotp.ISOTP_CF

Bases: scapy.packet.Packet

aliastypes = [<class 'scapy.contrib.isotp.ISOTP_CF'>, <class 'scapy.packet.Packet'>]
fields_desc = [<Field (ISOTP_CF).type>, <Field (ISOTP_CF).index>, <Field (ISOTP_CF).data>]
class scapy.contrib.isotp.ISOTP_FC

Bases: scapy.packet.Packet

aliastypes = [<class 'scapy.contrib.isotp.ISOTP_FC'>, <class 'scapy.packet.Packet'>]
fields_desc = [<Field (ISOTP_FC).type>, <Field (ISOTP_FC).fc_flag>, <Field (ISOTP_FC).block_size>, <Field (ISOTP_FC).separation_time>]
class scapy.contrib.isotp.ISOTPSoftSocket(can_socket=None, sid=0, did=0, extended_addr=None, extended_rx_addr=None, rx_block_size=0, rx_separation_time_min=0, padding=False, listen_only=False, basecls=<class 'scapy.contrib.isotp.ISOTP'>)

Bases: scapy.supersocket.SuperSocket

This class is a wrapper around the ISOTPSocketImplementation, for the reasons described below.

The ISOTPSoftSocket aims to be fully compatible with the Linux ISOTP sockets provided by the can-isotp kernel module, while being usable on any operating system. Therefore, this socket needs to be able to respond to an incoming FF frame with a FC frame even before the recv() method is called. A thread is needed for receiving CAN frames in the background, and since the lower layer CAN implementation is not guaranteed to have a functioning POSIX select(), each ISOTP socket needs its own CAN receiver thread. Additionally, 2 timers are necessary to keep track of the timeouts and frame separation times, and each timer is implemented in its own thread. In total, each ISOTPSoftSocket spawns 3 background threads when constructed, which must be terminated afterwards by calling the close() method. SuperSocket automatically calls the close() method when the GC destroys an ISOTPSoftSocket. However, note that if any thread holds a reference to an ISOTPSoftSocket object, it will not be collected by the GC.

The implementation of the ISOTP protocol, along with the necessary threads, are stored in the ISOTPSocketImplementation class, and therefore:

  • There no reference from ISOTPSocketImplementation to ISOTPSoftSocket
  • ISOTPSoftSocket can be normally garbage collected
  • Upon destruction, ISOTPSoftSocket.close() will be called
  • ISOTPSoftSocket.close() will call ISOTPSocketImplementation.close()
  • All background threads can be stopped by the garbage collector
begin_send(p)

Begin the transmission of message p. This method returns after sending the first frame. If multiple frames are necessary to send the message, this socket will unable to send other messages until either the transmission of this frame succeeds or it fails.

close()
nonblocking_socket = True
recv(x=65535)
recv_raw(x=65535)

Receive a complete ISOTP message, blocking until a message is received or the specified timeout is reached. If self.timeout is 0, then this function doesn’t block and returns the first frame in the receive buffer or None if there isn’t any.

static select(sockets, remain=None)

This function is called during sendrecv() routine to wait for sockets to be ready to receive

class scapy.contrib.isotp.ISOTPSession(*args, **kwargs)

Bases: scapy.sessions.DefaultSession

Defragment ISOTP packets ‘on-the-flow’.

Usage: >>> sniff(session=ISOTPSession)

on_packet_received(pkt)

DEV: entry point. Will be called by sniff() for each received packet (that passes the filters).

scapy.contrib.isotp.ISOTPSocket

alias of scapy.contrib.isotp.ISOTPSoftSocket

class scapy.contrib.isotp.ISOTPSocketImplementation(can_socket, src_id, dst_id, padding=False, extended_addr=None, extended_rx_addr=None, rx_block_size=0, rx_separation_time_min=0, listen_only=False)

Bases: scapy.automaton.SelectableObject

Implementation of an ISOTP “state machine”.

Most of the ISOTP logic was taken from https://github.com/hartkopp/can-isotp/blob/master/net/can/isotp.c

This class is separated from ISOTPSoftSocket to make sure the background threads can’t hold a reference to ISOTPSoftSocket, allowing it to be collected by the GC.

begin_send(x)

Begins sending an ISOTP message. This method does not block.

can_send(load)
check_recv()

Implementation for SelectableObject

close()
on_can_recv(p)
on_recv(cf)

Function that must be called every time a CAN frame is received, to advance the state machine.

recv(timeout=None)

Receive an ISOTP frame, blocking if none is available in the buffer for at most ‘timeout’ seconds.

send(p)

Send an ISOTP frame and block until the message is sent or an error happens.

class scapy.contrib.isotp.ISOTPMessageBuilder(use_ext_addr=None, did=None, basecls=None)

Bases: object

Utility class to build ISOTP messages out of CAN frames, used by both ISOTP.defragment() and ISOTPSession.

This class attempts to interpret some CAN frames as ISOTP frames, both with and without extended addressing at the same time. For example, if an extended address of 07 is being used, all frames will also be interpreted as ISOTP single-frame messages.

CAN frames are fed to an ISOTPMessageBuilder object with the feed() method and the resulting ISOTP frames can be extracted using the pop() method.

class Bucket(total_len, first_piece)

Bases: object

push(piece)
count

Returns the number of ready ISOTP messages built from the provided can frames

feed(can)

Attempt to feed an incoming CAN frame into the state machine

pop(identifier=None, ext_addr=None)

Returns a built ISOTP message

Parameters:
  • identifier – if not None, only return isotp messages with this destination
  • ext_addr – if identifier is not None, only return isotp messages with this extended address for destination
Returns:

an ISOTP packet, or None if no message is ready

scapy.contrib.isotp.ISOTPScan(sock, scan_range=range(0, 2048), extended_addressing=False, noise_listen_time=2, sniff_time=0.1, output_format=None, can_interface='can0', verbose=False)

Scan for ISOTP Sockets on a bus and return findings

Parameters:
  • sock – CANSocket object to communicate with the bus under scan
  • scan_range – hexadecimal range of CAN-Identifiers to scan. Default is 0x0 - 0x7ff
  • extended_addressing – scan with ISOTP extended addressing
  • noise_listen_time – seconds to listen for default communication on the bus
  • sniff_time – time the scan waits for isotp flow control responses after sending a first frame
  • output_format – defines the format of the returned results (text, code or sockets). Provide a string e.g. “text”. Default is “socket”.
  • can_interface – interface used to create the returned code/sockets
  • verbose – displays information during scan

Scan for ISOTP Sockets in the defined range and returns found sockets in a specified format. The format can be:

  • text: human readable output
  • code: python code for copy&paste
  • sockets: if output format is not specified, ISOTPSockets will be created and returned in a list
class scapy.contrib.isotp.ISOTPNativeSocket(iface=None, sid=0, did=0, extended_addr=None, extended_rx_addr=None, listen_only=False, padding=False, transmit_time=100, basecls=<class 'scapy.contrib.isotp.ISOTP'>)

Bases: scapy.supersocket.SuperSocket

can_isotp_fc_options_fmt = '@3B'
can_isotp_ll_options_fmt = '@3B'
can_isotp_options_fmt = '@2I4B'
desc = 'read/write packets at a given CAN interface using CAN_ISOTP socket '
recv(x=65535)
recv_raw(x=65535)

Receives a packet, then returns a tuple containing (cls, pkt_data, time)

sockaddr_can_fmt = '@H3I'