scapy.contrib.isotp.isotp_soft_socket
- class scapy.contrib.isotp.isotp_soft_socket.ISOTPSocketImplementation(can_socket: 'CANSocket', tx_id: int, rx_id: int, padding: bool = False, ext_address: int | None = None, rx_ext_address: int | None = None, bs: int = 0, stmin: int = 0, listen_only: bool = False, fd: bool = False)[source]
Bases:
object
Implementation of an ISOTP “state machine”.
Most of the ISOTP logic was taken from https://github.com/hartkopp/can-isotp/blob/master/net/can/isotp.c
This class is separated from ISOTPSoftSocket to make sure the background thread can’t hold a reference to ISOTPSoftSocket, allowing it to be collected by the GC.
- Parameters:
can_socket – a CANSocket instance, preferably filtering only can frames with identifier equal to rx_id
tx_id – the CAN identifier of the sent CAN frames
rx_id – the CAN identifier of the received CAN frames
padding – If True, pads sending packets with 0x00 which not count to the payload. Does not affect receiving packets.
ext_address – Extended Address byte to be added at the beginning of every CAN frame _sent_ by this object. Can be None in order to disable extended addressing on sent frames.
rx_ext_address – Extended Address byte expected to be found at the beginning of every CAN frame _received_ by this object. Can be None in order to disable extended addressing on received frames.
bs – Block Size byte to be included in every Control Flow Frame sent by this object. The default value of 0 means that all the data will be received in a single block.
stmin – Time Minimum Separation byte to be included in every Control Flow Frame sent by this object. The default value of 0 indicates that the peer will not wait any time between sending frames.
listen_only – Disables send of flow control frames
- on_recv(cf: Packet) None [source]
Function that must be called every time a CAN frame is received, to advance the state machine.
- class scapy.contrib.isotp.isotp_soft_socket.ISOTPSoftSocket(can_socket: Optional["CANSocket"] = None, tx_id: int = 0, rx_id: int = 0, ext_address: Optional[int] = None, rx_ext_address: Optional[int] = None, bs: int = 0, stmin: int = 0, padding: bool = False, listen_only: bool = False, basecls: Type[Packet] = <class 'scapy.contrib.isotp.isotp_packet.ISOTP'>, fd: bool = False)[source]
Bases:
SuperSocket
This class is a wrapper around the ISOTPSocketImplementation, for the reasons described below.
The ISOTPSoftSocket aims to be fully compatible with the Linux ISOTP sockets provided by the can-isotp kernel module, while being usable on any operating system. Therefore, this socket needs to be able to respond to an incoming FF frame with a FC frame even before the recv() method is called. A thread is needed for receiving CAN frames in the background, and since the lower layer CAN implementation is not guaranteed to have a functioning POSIX select(), each ISOTP socket needs its own CAN receiver thread. SuperSocket automatically calls the close() method when the GC destroys an ISOTPSoftSocket. However, note that if any thread holds a reference to an ISOTPSoftSocket object, it will not be collected by the GC.
The implementation of the ISOTP protocol, along with the necessary thread, are stored in the ISOTPSocketImplementation class, and therefore:
There no reference from ISOTPSocketImplementation to ISOTPSoftSocket
ISOTPSoftSocket can be normally garbage collected
Upon destruction, ISOTPSoftSocket.close() will be called
ISOTPSoftSocket.close() will call ISOTPSocketImplementation.close()
RX background thread can be stopped by the garbage collector
Initialize an ISOTPSoftSocket using the provided underlying can socket.
- Example (with NativeCANSocket underneath):
>>> conf.contribs['ISOTP'] = {'use-can-isotp-kernel-module': False} >>> load_contrib('isotp') >>> with ISOTPSocket("can0", tx_id=0x641, rx_id=0x241) as sock: >>> sock.send(...)
- Example (with PythonCANSocket underneath):
>>> conf.contribs['ISOTP'] = {'use-can-isotp-kernel-module': False} >>> conf.contribs['CANSocket'] = {'use-python-can': True} >>> load_contrib('isotp') >>> with ISOTPSocket(CANSocket(bustype='socketcan', channel="can0"), tx_id=0x641, rx_id=0x241) as sock: >>> sock.send(...)
- Parameters:
can_socket – a CANSocket instance, preferably filtering only can frames with identifier equal to rx_id
tx_id – the CAN identifier of the sent CAN frames
rx_id – the CAN identifier of the received CAN frames
ext_address – the extended address of the sent ISOTP frames
rx_ext_address – the extended address of the received ISOTP frames
bs – block size sent in Flow Control ISOTP frames
stmin – minimum desired separation time sent in Flow Control ISOTP frames
padding – If True, pads sending packets with 0x00 which not count to the payload. Does not affect receiving packets.
listen_only – Does not send Flow Control frames if a First Frame is received
basecls – base class of the packets emitted by this socket
fd – enables the CanFD support for this socket
- recv_raw(x: int = 65535) Tuple[Type[Packet] | None, bytes | None, float | None] [source]
Receive a complete ISOTP message, blocking until a message is received or the specified timeout is reached. If self.timeout is 0, then this function doesn’t block and returns the first frame in the receive buffer or None if there isn’t any.
- static select(sockets: List[SuperSocket], remain: float | None = None) List[SuperSocket] [source]
This function is called during sendrecv() routine to wait for sockets to be ready to receive
- class scapy.contrib.isotp.isotp_soft_socket.TimeoutScheduler[source]
Bases:
object
A timeout scheduler which uses a single thread for all timeouts, unlike python’s own Timer objects which use a thread each.
- GRACE = 0.1
- class Handle(when: float, cb: Callable[[], None] | bool | None)[source]
Bases:
object
Handle for a timeout, consisting of a callback and a time when it should be executed.
- classmethod cancel(handle: Handle) None [source]
Provided its handle, cancels the execution of a timeout.
- logger = <Logger scapy.contrib.automotive.timeout_scheduler (INFO)>