class scapy.contrib.automotive.ecu.Ecu(logging: bool = True, verbose: bool = True, store_supported_responses: bool = True, lookahead: int = 10)[source]

Bases: object

An Ecu object can be used to
  • track the states of an Ecu.

  • to log all modification to an Ecu.

  • to extract supported responses of a real Ecu.


>>> print("This ecu logs, tracks and creates supported responses")
>>> my_virtual_ecu = Ecu()
>>> my_virtual_ecu.update(PacketList([...]))
>>> my_virtual_ecu.supported_responses
>>> print("Another ecu just tracks")
>>> my_tracking_ecu = Ecu(logging=False, store_supported_responses=False)
>>> my_tracking_ecu.update(PacketList([...]))
>>> print("Another ecu just logs all modifications to it")
>>> my_logging_ecu = Ecu(verbose=False, store_supported_responses=False)
>>> my_logging_ecu.update(PacketList([...]))
>>> my_logging_ecu.log
>>> print("Another ecu just creates supported responses")
>>> my_response_ecu = Ecu(verbose=False, logging=False)
>>> my_response_ecu.update(PacketList([...]))
>>> my_response_ecu.supported_responses

Parameters to initialize an Ecu object

  • logging – Turn logging on or off. Default is on.

  • verbose – Turn tracking on or off. Default is on.

  • store_supported_responses – Create a list of supported responses if True.

  • lookahead – Configuration for lookahead when computing supported responses

static extend_pkt_with_logging(cls: Type[Packet]) Callable[[Callable[[Packet], Tuple[str, Any]]], None][source]

Decorator to add a function as ‘get_log’ method to a given class. This allows dynamic modifications and additions to a protocol. :param cls: A packet class to be modified :return: Decorator function

reset() None[source]

Resets the internal state to a default EcuState.

static sort_key_func(resp: EcuResponse) Tuple[bool, int, int, int][source]

This sorts responses in the following order: 1. Positive responses first 2. Lower ServiceIDs first 3. Less supported states first 4. Longer (more specific) responses first :param resp: EcuResponse to be sorted :return: Tuple as sort key

property supported_responses

Returns a sorted list of supported responses. The sort is done in a way to provide the best possible results, if this list of supported responses is used to simulate an real world Ecu with the EcuAnsweringMachine object. :return: A sorted list of EcuResponse objects

property unanswered_packets

A list of all unanswered packets, which were processed by this Ecu object. :return: PacketList of unanswered packets

update(p: Union[Packet, PacketList]) None[source]

Processes a Packet or a list of Packets, according to the chosen configuration. :param p: Packet or list of Packets

class scapy.contrib.automotive.ecu.EcuAnsweringMachine(self, supported_responses=None, main_socket=None, broadcast_socket=None, basecls=<class 'scapy.packet.Raw'>, timeout=None, initial_ecu_state=None)[source]

Bases: AnsweringMachine[PacketList]

AnsweringMachine which emulates the basic behaviour of a real world ECU. Provide a list of EcuResponse objects to configure the behaviour of a AnsweringMachine.

>>> resp = EcuResponse(session=range(0,255), security_level=0, responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))
>>> sock = ISOTPSocket(can_iface, tx_id=0x700, rx_id=0x600, basecls=UDS)
>>> answering_machine = EcuAnsweringMachine(supported_responses=[resp], main_socket=sock, basecls=UDS)
>>> sim = threading.Thread(target=answering_machine, kwargs={'count': 4, 'timeout':5})
>>> sim.start()
function_name = 'EcuAnsweringMachine'
is_request(req: Packet) bool[source]
make_reply(req: Packet) PacketList[source]

Checks if a given request can be answered by the internal list of EcuResponses. First, it’s evaluated if the internal EcuState of this AnsweringMachine is supported by an EcuResponse, next it’s evaluated if a request answers the key_response of this EcuResponse object. The first fitting EcuResponse is used. If this EcuResponse modified the EcuState, the internal EcuState of this AnsweringMachine is updated, and the list of response Packets of the selected EcuResponse is returned. If no EcuResponse if found, a PacketList with a generic NegativeResponse is returned. :param req: A request packet :return: A list of response packets

optam0: Dict[str, Any]
optam1: Dict[str, Any]
optam2: Dict[str, Any]
optsend: Dict[str, Any]
optsniff: Dict[str, Any]
parse_options(supported_responses: ~typing.Optional[~typing.List[~scapy.contrib.automotive.ecu.EcuResponse]] = None, main_socket: ~typing.Optional[~scapy.supersocket.SuperSocket] = None, broadcast_socket: ~typing.Optional[~scapy.supersocket.SuperSocket] = None, basecls: ~typing.Type[~scapy.packet.Packet] = <class 'scapy.packet.Raw'>, timeout: ~typing.Optional[~typing.Union[int, float]] = None, initial_ecu_state: ~typing.Optional[~scapy.contrib.automotive.ecu.EcuState] = None) None[source]
  • supported_responses – List of EcuResponse objects to define the behaviour. The default response is generalReject.

  • main_socket – Defines the object of the socket to send and receive packets.

  • broadcast_socket – Defines the object of the broadcast socket. Listen-only, responds with the main_socket. None to disable broadcast capabilities.

  • basecls – Provide a basecls of the used protocol

  • timeout – Specifies the timeout for sniffing in seconds.

reset_state() None[source]
send_reply(reply: PacketList, send_function: Optional[Any] = None) None[source]

Sends all Packets of a EcuResponse object. This allows to send multiple packets up on a request. If the list contains more than one packet, a random time between each packet is waited until the next packet will be sent. :param reply: List of packets to be sent.

sniff_options_list = ['store', 'opened_socket', 'count', 'filter', 'prn', 'stop_filter', 'timeout']
property state
class scapy.contrib.automotive.ecu.EcuResponse(state: ~typing.Optional[~typing.Union[~scapy.contrib.automotive.ecu.EcuState, ~typing.Iterable[~scapy.contrib.automotive.ecu.EcuState]]] = None, responses: ~typing.Union[~typing.Iterable[~scapy.packet.Packet], ~scapy.plist.PacketList, ~scapy.packet.Packet] = <Raw  load='\x7f\x10' |>, answers: ~typing.Optional[~typing.Callable[[~scapy.packet.Packet, ~scapy.packet.Packet], bool]] = None)[source]

Bases: object

Encapsulates responses and the according EcuStates. A list of this objects can be used to configure an EcuAnsweringMachine. This is useful, if you want to clone the behaviour of a real Ecu.


>>> EcuResponse(EcuState(session=2, security_level=2), responses=UDS()/UDS_RDBIPR(dataIdentifier=2)/Raw(b"deadbeef1"))
>>> EcuResponse([EcuState(session=range(2, 5), security_level=2), EcuState(session=3, security_level=5)], responses=UDS()/UDS_RDBIPR(dataIdentifier=9)/Raw(b"deadbeef4"))

Initialize an EcuResponse capsule

  • state – EcuState or list of EcuStates in which this response is allowed to be sent. If no state provided, the response packet will always be send.

  • responses – A Packet or a list of Packet objects. By default the last packet is asked if it answers an incoming packet. This allows to send for example requestCorrectlyReceived-ResponsePending packets.

  • answers – Optional argument to provide a custom answer here: lambda resp, req: return resp.answers(req) This allows the modification of a response depending on a request. Custom SecurityAccess mechanisms can be implemented in this way or generic NegativeResponse messages which answers to everything can be realized in this way.

answers(other: Packet) Union[int, bool][source]
command() str[source]
property key_response
property responses
property states
supports_state(state: EcuState) bool[source]
class scapy.contrib.automotive.ecu.EcuSession(*args: Any, **kwargs: Any)[source]

Bases: DefaultSession

Tracks modification to an Ecu object ‘on-the-flow’.

The parameters for the internal Ecu object are obtained from the kwargs dict.

logging: Turn logging on or off. Default is on. verbose: Turn tracking on or off. Default is on. store_supported_responses: Create a list of supported responses, if True.


>>> sniff(session=EcuSession)
on_packet_received(pkt: Optional[Packet]) None[source]
class scapy.contrib.automotive.ecu.EcuState(**kwargs: Any)[source]

Bases: object

Stores the state of an Ecu. The state is defined by a protocol, for example UDS or GMLAN. A EcuState supports comparison and serialization (command()).

command() str[source]
static extend_pkt_with_modifier(cls: Type[Packet]) Callable[[Callable[[Packet, Packet, EcuState], None]], None][source]

Decorator to add a function as ‘modify_ecu_state’ method to a given class. This allows dynamic modifications and additions to a protocol. :param cls: A packet class to be modified :return: Decorator function

static get_modified_ecu_state(response: Packet, request: Packet, state: EcuState, modify_in_place: bool = False) EcuState[source]

Helper function to get a modified EcuState from a Packet and a previous EcuState. An EcuState is always modified after a response Packet is received. In some protocols, the belonging request packet is necessary to determine the precise state of the Ecu

  • response – Response packet that supports modify_ecu_state

  • request – Belonging request of the response that modifies Ecu

  • state – The previous/current EcuState

  • modify_in_place – If True, the given EcuState will be modified


The modified EcuState or a modified copy

static is_modifier_pkt(pkt: Packet) bool[source]

Helper function to determine if a Packet contains a layer that modifies the EcuState. :param pkt: Packet to be analyzed :return: True if pkt contains layer that implements modify_ecu_state

reset() None[source]