scapy.automaton
Automata with states, transitions and actions.
- class scapy.automaton.ATMT[source]
Bases:
object
- ACTION = 'Action'
- CONDITION = 'Condition'
- IOEVENT = 'I/O event'
- exception NewStateRequested(state_func: Any, automaton: ATMT, *args: Any, **kargs: Any)[source]
Bases:
Exception
- action_parameters(*args: Any, **kargs: Any) NewStateRequested [source]
- RECV = 'Receive condition'
- STATE = 'State'
- TIMEOUT = 'Timeout condition'
- static action(cond: Any, prio: int = 0) Callable[[_StateWrapper, _StateWrapper], _StateWrapper] [source]
- static condition(state: Any, prio: int = 0) Callable[[_StateWrapper, _StateWrapper], _StateWrapper] [source]
- static ioevent(state: _StateWrapper, name: str, prio: int = 0, as_supersocket: str | None = None) Callable[[_StateWrapper, _StateWrapper], _StateWrapper] [source]
- static receive_condition(state: _StateWrapper, prio: int = 0) Callable[[_StateWrapper, _StateWrapper], _StateWrapper] [source]
- static state(initial: int = 0, final: int = 0, stop: int = 0, error: int = 0) Callable[[DecoratorCallable], DecoratorCallable] [source]
- class scapy.automaton.Automaton(self, debug=0, store=1, **kargs)[source]
Bases:
object
- exception AutomatonError(msg: str, state: Message | None = None, result: str | None = None)[source]
Bases:
AutomatonException
- exception AutomatonException(msg: str, state: Message | None = None, result: str | None = None)[source]
Bases:
Exception
- exception AutomatonStopped(msg: str, state: Message | None = None, result: str | None = None)[source]
Bases:
AutomatonException
- exception Breakpoint(msg: str, state: Message | None = None, result: str | None = None)[source]
Bases:
AutomatonStopped
- exception CommandMessage(msg: str, state: Message | None = None, result: str | None = None)[source]
Bases:
AutomatonException
- exception ErrorState(msg: str, state: Message | None = None, result: str | None = None)[source]
Bases:
AutomatonException
- exception InterceptionPoint(msg: str, state: Message | None = None, result: str | None = None, packet: Packet | None = None)[source]
Bases:
AutomatonStopped
- exception Singlestep(msg: str, state: Message | None = None, result: str | None = None)[source]
Bases:
AutomatonStopped
- exception Stuck(msg: str, state: Message | None = None, result: str | None = None)[source]
Bases:
AutomatonException
- actions: Dict[str, List[_StateWrapper]] = {}
- conditions: Dict[str, List[_StateWrapper]] = {}
- destroy() None [source]
Destroys a stopped Automaton: this cleanups all opened file descriptors. Required on PyPy for instance where the garbage collector behaves differently.
- initial_states: List[_StateWrapper] = []
- ioevents: Dict[str, List[_StateWrapper]] = {}
- ionames: List[str] = []
- iosupersockets: List[SuperSocket] = []
- recv_conditions: Dict[str, List[_StateWrapper]] = {}
- state: NewStateRequested = None
- states: Dict[str, _StateWrapper] = {}
- stop_states: List[_StateWrapper] = []
- timeout: Dict[str, _TimerList] = {}
- class scapy.automaton.Automaton_metaclass(name: str, bases: Tuple[Any], dct: Dict[str, Any])[source]
Bases:
type
- class scapy.automaton.Message(**args: Any)[source]
Bases:
object
- exc_info: Tuple[None, None, None] | Tuple[BaseException, Exception, TracebackType] = None
- result: str = None
- type: str = None
- class scapy.automaton.ObjectPipe(name: str | None = None)[source]
Bases:
Generic
[_T
]- static select(sockets: List[SuperSocket], remain: float | None = 0.05) List[SuperSocket] [source]
- class scapy.automaton.Timer(time: int | float, prio: int = 0, autoreload: bool = False)[source]
Bases:
object
- scapy.automaton.select_objects(inputs: Iterable[Any], remain: float | int | None) List[Any] [source]
Select objects. Same than:
select.select(inputs, [], [], remain)
But also works on Windows, only on objects whose fileno() returns a Windows event. For simplicity, just use ObjectPipe() as a queue that you can select on whatever the platform is.
If you want an object to be always included in the output of select_objects (i.e. it’s not selectable), just make fileno() return a strictly negative value.
Example
>>> a, b = ObjectPipe("a"), ObjectPipe("b") >>> b.send("test") >>> select_objects([a, b], 1) [b]
- Parameters:
inputs – objects to process
remain – timeout. If 0, poll.