scapy.automaton
Automata with states, transitions and actions.
- class scapy.automaton.ATMT[source]
Bases:
object
- ACTION = 'Action'
- CONDITION = 'Condition'
- EOF = 'EOF condition'
- IOEVENT = 'I/O event'
- exception NewStateRequested(state_func: Any, automaton: ATMT, *args: Any, **kargs: Any)[source]
Bases:
Exception
- action_parameters(*args: Any, **kargs: Any) NewStateRequested [source]
- RECV = 'Receive condition'
- STATE = 'State'
- TIMEOUT = 'Timeout condition'
- static action(cond: Any, prio: int = 0) Callable[[_StateWrapper, _StateWrapper], _StateWrapper] [source]
- static condition(state: Any, prio: int = 0) Callable[[_StateWrapper, _StateWrapper], _StateWrapper] [source]
- static ioevent(state: _StateWrapper, name: str, prio: int = 0, as_supersocket: str | None = None) Callable[[_StateWrapper, _StateWrapper], _StateWrapper] [source]
- static receive_condition(state: _StateWrapper, prio: int = 0) Callable[[_StateWrapper, _StateWrapper], _StateWrapper] [source]
- static state(initial: int = 0, final: int = 0, stop: int = 0, error: int = 0) Callable[[DecoratorCallable], DecoratorCallable] [source]
- class scapy.automaton.Automaton(self, debug=0, store=0, **kargs)[source]
Bases:
object
- exception AutomatonError(msg: str, state: Message | None = None, result: str | None = None)[source]
Bases:
AutomatonException
- exception AutomatonException(msg: str, state: Message | None = None, result: str | None = None)[source]
Bases:
Exception
- exception AutomatonStopped(msg: str, state: Message | None = None, result: str | None = None)[source]
Bases:
AutomatonException
- exception Breakpoint(msg: str, state: Message | None = None, result: str | None = None)[source]
Bases:
AutomatonStopped
- exception CommandMessage(msg: str, state: Message | None = None, result: str | None = None)[source]
Bases:
AutomatonException
- exception ErrorState(msg: str, state: Message | None = None, result: str | None = None)[source]
Bases:
AutomatonException
- exception InterceptionPoint(msg: str, state: Message | None = None, result: str | None = None, packet: Packet | None = None)[source]
Bases:
AutomatonStopped
- exception Singlestep(msg: str, state: Message | None = None, result: str | None = None)[source]
Bases:
AutomatonStopped
- exception Stuck(msg: str, state: Message | None = None, result: str | None = None)[source]
Bases:
AutomatonException
- actions: Dict[str, List[_StateWrapper]] = {}
- conditions: Dict[str, List[_StateWrapper]] = {}
- destroy() None [source]
Destroys a stopped Automaton: this cleanups all opened file descriptors. Required on PyPy for instance where the garbage collector behaves differently.
- eofs: Dict[str, _StateWrapper] = {}
- initial_states: List[_StateWrapper] = []
- ioevents: Dict[str, List[_StateWrapper]] = {}
- ionames: List[str] = []
- iosupersockets: List[SuperSocket] = []
- recv_conditions: Dict[str, List[_StateWrapper]] = {}
- socketcls[source]
alias of
StreamSocket
- classmethod spawn(port: int, iface: NetworkInterface | str | None = None, bg: bool = False, **kwargs: Any) socket | None [source]
Spawn a TCP server that listens for connections and start the automaton for each new client.
- Parameters:
port – the port to listen to
bg – background mode? (default: False)
Note that in background mode, you shall close the TCP server as such:
srv = MyAutomaton.spawn(8080, bg=True) srv.shutdown(socket.SHUT_RDWR) # important srv.close()
- state: NewStateRequested = None
- states: Dict[str, _StateWrapper] = {}
- stop_state: _StateWrapper | None = None
- timeout: Dict[str, _TimerList] = {}
- update_sock(sock: SuperSocket) None [source]
Update the socket used by the automata. Typically used in an eof event to reconnect.
- class scapy.automaton.Automaton_metaclass(name: str, bases: Tuple[Any], dct: Dict[str, Any])[source]
Bases:
type
- class scapy.automaton.Message(**args: Any)[source]
Bases:
object
- exc_info: Tuple[None, None, None] | Tuple[BaseException, Exception, TracebackType] = None
- result: str = None
- type: str = None
- class scapy.automaton.ObjectPipe(name: str | None = None)[source]
Bases:
Generic
[_T
]- static select(sockets: List[SuperSocket], remain: float | None = 0.05) List[SuperSocket] [source]
- class scapy.automaton.Timer(time: int | float, prio: int = 0, autoreload: bool = False)[source]
Bases:
object
- scapy.automaton.select_objects(inputs: Iterable[Any], remain: float | int | None) List[Any] [source]
Select objects. Same than:
select.select(inputs, [], [], remain)
But also works on Windows, only on objects whose fileno() returns a Windows event. For simplicity, just use ObjectPipe() as a queue that you can select on whatever the platform is.
If you want an object to be always included in the output of select_objects (i.e. it’s not selectable), just make fileno() return a strictly negative value.
Example
>>> a, b = ObjectPipe("a"), ObjectPipe("b") >>> b.send("test") >>> select_objects([a, b], 1) [b]
- Parameters:
inputs – objects to process
remain – timeout. If 0, poll. If None, block.