scapy.automaton

Automata with states, transitions and actions.

class scapy.automaton.ATMT[source]

Bases: object

ACTION = 'Action'
CONDITION = 'Condition'
IOEVENT = 'I/O event'
exception NewStateRequested(state_func: Any, automaton: scapy.automaton.ATMT, *args: Any, **kargs: Any)[source]

Bases: Exception

action_parameters(*args: Any, **kargs: Any) scapy.automaton.ATMT.NewStateRequested[source]
run() Any[source]
RECV = 'Receive condition'
STATE = 'State'
TIMEOUT = 'Timeout condition'
static action(cond: Any, prio: int = 0) Callable[[scapy.automaton._StateWrapper, scapy.automaton._StateWrapper], scapy.automaton._StateWrapper][source]
static condition(state: Any, prio: int = 0) Callable[[scapy.automaton._StateWrapper, scapy.automaton._StateWrapper], scapy.automaton._StateWrapper][source]
static ioevent(state: scapy.automaton._StateWrapper, name: str, prio: int = 0, as_supersocket: Optional[str] = None) Callable[[scapy.automaton._StateWrapper, scapy.automaton._StateWrapper], scapy.automaton._StateWrapper][source]
static receive_condition(state: scapy.automaton._StateWrapper, prio: int = 0) Callable[[scapy.automaton._StateWrapper, scapy.automaton._StateWrapper], scapy.automaton._StateWrapper][source]
static state(initial: int = 0, final: int = 0, stop: int = 0, error: int = 0) Callable[[scapy.compat.DecoratorCallable], scapy.compat.DecoratorCallable][source]
static timeout(state: scapy.automaton._StateWrapper, timeout: int) Callable[[scapy.automaton._StateWrapper, scapy.automaton._StateWrapper, int], scapy.automaton._StateWrapper][source]
class scapy.automaton.Automaton(self, debug=0, store=1, **kargs)[source]

Bases: object

exception AutomatonError(msg: str, state: Optional[scapy.automaton.Message] = None, result: Optional[str] = None)[source]

Bases: scapy.automaton.Automaton.AutomatonException

exception AutomatonException(msg: str, state: Optional[scapy.automaton.Message] = None, result: Optional[str] = None)[source]

Bases: Exception

exception AutomatonStopped(msg: str, state: Optional[scapy.automaton.Message] = None, result: Optional[str] = None)[source]

Bases: scapy.automaton.Automaton.AutomatonException

exception Breakpoint(msg: str, state: Optional[scapy.automaton.Message] = None, result: Optional[str] = None)[source]

Bases: scapy.automaton.Automaton.AutomatonStopped

exception CommandMessage(msg: str, state: Optional[scapy.automaton.Message] = None, result: Optional[str] = None)[source]

Bases: scapy.automaton.Automaton.AutomatonException

exception ErrorState(msg: str, state: Optional[scapy.automaton.Message] = None, result: Optional[str] = None)[source]

Bases: scapy.automaton.Automaton.AutomatonException

exception InterceptionPoint(msg: str, state: Optional[scapy.automaton.Message] = None, result: Optional[str] = None, packet: Optional[str] = None)[source]

Bases: scapy.automaton.Automaton.AutomatonStopped

exception Singlestep(msg: str, state: Optional[scapy.automaton.Message] = None, result: Optional[str] = None)[source]

Bases: scapy.automaton.Automaton.AutomatonStopped

exception Stuck(msg: str, state: Optional[scapy.automaton.Message] = None, result: Optional[str] = None)[source]

Bases: scapy.automaton.Automaton.AutomatonException

accept_packet(pkt: Optional[scapy.packet.Packet] = None, wait: Optional[bool] = False) Any[source]
actions: Dict[str, List[scapy.automaton._StateWrapper]] = {}
add_breakpoints(*bps: Any) None[source]
add_interception_points(*ipts: Any) None[source]
conditions: Dict[str, List[scapy.automaton._StateWrapper]] = {}
debug(lvl: int, msg: str) None[source]
forcestop(wait: bool = True) None[source]
initial_states: List[scapy.automaton._StateWrapper] = []
ioevents: Dict[str, List[scapy.automaton._StateWrapper]] = {}
ionames: List[str] = []
iosupersockets: List[scapy.supersocket.SuperSocket] = []
master_filter(pkt: scapy.packet.Packet) bool[source]
my_send(pkt: scapy.packet.Packet) None[source]
next() Any[source]
parse_args(debug: int = 0, store: int = 1, **kargs: Any) None[source]
recv_conditions: Dict[str, List[scapy.automaton._StateWrapper]] = {}
reject_packet(wait: Optional[bool] = False) Any[source]
remove_breakpoints(*bps: Any) None[source]
remove_interception_points(*ipts: Any) None[source]
restart(*args: Any, **kargs: Any) None[source]
run(resume: Optional[scapy.automaton.Message] = None, wait: Optional[bool] = True) Any[source]
runbg(resume: Optional[scapy.automaton.Message] = None, wait: Optional[bool] = False) None[source]
send(pkt: scapy.packet.Packet) None[source]
start(*args: Any, **kargs: Any) None[source]
state: scapy.automaton.ATMT.NewStateRequested = None
states: Dict[str, scapy.automaton._StateWrapper] = {}
stop(wait: bool = True) None[source]
stop_states: List[scapy.automaton._StateWrapper] = []
timeout: Dict[str, List[Tuple[int, scapy.automaton._StateWrapper]]] = {}
class scapy.automaton.Automaton_metaclass(name: str, bases: Tuple[Any], dct: Dict[str, Any])[source]

Bases: type

build_graph() str[source]
graph(**kargs: Any) Optional[str][source]
class scapy.automaton.Message(**args: Any)[source]

Bases: object

exc_info: Union[Tuple[None, None, None], Tuple[BaseException, Exception, types.TracebackType]] = None
pkt: scapy.packet.Packet = None
result: str = None
state: scapy.automaton.Message = None
type: str = None
class scapy.automaton.ObjectPipe(name: Optional[str] = None)[source]

Bases: Generic[scapy.automaton._T]

close() None[source]
empty() bool[source]
fileno() int[source]
flush() None[source]
read(n: Optional[int] = 0) Optional[scapy.automaton._T][source]
recv(n: Optional[int] = 0) Optional[scapy.automaton._T][source]
static select(sockets: List[scapy.supersocket.SuperSocket], remain: Optional[float] = 0.05) List[scapy.supersocket.SuperSocket][source]
send(obj: scapy.automaton._T) int[source]
write(obj: scapy.automaton._T) None[source]
scapy.automaton.select_objects(inputs: Iterable[Any], remain: Optional[Union[float, int]]) List[Any][source]

Select objects. Same than: select.select(inputs, [], [], remain)

But also works on Windows, only on objects whose fileno() returns a Windows event. For simplicity, just use ObjectPipe() as a queue that you can select on whatever the platform is.

If you want an object to be always included in the output of select_objects (i.e. it’s not selectable), just make fileno() return a strictly negative value.

Example

>>> a, b = ObjectPipe("a"), ObjectPipe("b")
>>> b.send("test")
>>> select_objects([a, b], 1)
[b]
Parameters
  • inputs – objects to process

  • remain – timeout. If 0, return [].