scapy.automaton

Automata with states, transitions and actions.

class scapy.automaton.ATMT[source]

Bases: object

ACTION = 'Action'
CONDITION = 'Condition'
IOEVENT = 'I/O event'
exception NewStateRequested(state_func, automaton, *args, **kargs)[source]

Bases: Exception

action_parameters(*args, **kargs)[source]
run()[source]
RECV = 'Receive condition'
STATE = 'State'
TIMEOUT = 'Timeout condition'
static action(cond, prio=0)[source]
static condition(state, prio=0)[source]
static ioevent(state, name, prio=0, as_supersocket=None)[source]
static receive_condition(state, prio=0)[source]
static state(initial=0, final=0, stop=0, error=0)[source]
static timeout(state, timeout)[source]
class scapy.automaton.Automaton(*args, **kargs)[source]

Bases: object

exception AutomatonError(msg, state=None, result=None)[source]

Bases: scapy.automaton.Automaton.AutomatonException

exception AutomatonException(msg, state=None, result=None)[source]

Bases: Exception

exception AutomatonStopped(msg, state=None, result=None)[source]

Bases: scapy.automaton.Automaton.AutomatonException

exception Breakpoint(msg, state=None, result=None)[source]

Bases: scapy.automaton.Automaton.AutomatonStopped

exception CommandMessage(msg, state=None, result=None)[source]

Bases: scapy.automaton.Automaton.AutomatonException

exception ErrorState(msg, state=None, result=None)[source]

Bases: scapy.automaton.Automaton.AutomatonException

exception InterceptionPoint(msg, state=None, result=None, packet=None)[source]

Bases: scapy.automaton.Automaton.AutomatonStopped

exception Singlestep(msg, state=None, result=None)[source]

Bases: scapy.automaton.Automaton.AutomatonStopped

exception Stuck(msg, state=None, result=None)[source]

Bases: scapy.automaton.Automaton.AutomatonException

accept_packet(pkt=None, wait=False)[source]
actions = {}
add_breakpoints(*bps)[source]
add_interception_points(*ipts)[source]
conditions = {}
debug(lvl, msg)[source]
forcestop()[source]
initial_states = []
ioevents = {}
ionames = []
iosupersockets = []
master_filter(pkt)[source]
my_send(pkt)[source]
next()[source]
parse_args(debug=0, store=1, **kargs)[source]
recv_conditions = {}
reject_packet(wait=False)[source]
remove_breakpoints(*bps)[source]
remove_interception_points(*ipts)[source]
restart(*args, **kargs)[source]
run(resume=None, wait=True)[source]
runbg(resume=None, wait=False)[source]
send(pkt)[source]
start(*args, **kargs)[source]
state = None
states = {}
stop()[source]
stop_states = []
timeout = {}
class scapy.automaton.Automaton_metaclass(name, bases, dct)[source]

Bases: type

build_graph()[source]
graph(**kargs)[source]
class scapy.automaton.Message(**args)[source]

Bases: object

class scapy.automaton.ObjectPipe(name=None)[source]

Bases: object

close()[source]
empty()[source]
fileno()[source]
flush()[source]
read(n=0)[source]
recv(n=0)[source]
static select(sockets, remain=0.05)[source]
send(obj)[source]
write(obj)[source]
scapy.automaton.select_objects(inputs, remain)[source]

Select objects. Same than: select.select(inputs, [], [], remain)

But also works on Windows, only on objects whose fileno() returns a Windows event. For simplicity, just use ObjectPipe() as a queue that you can select on whatever the platform is.

If you want an object to be always included in the output of select_objects (i.e. it’s not selectable), just make fileno() return a strictly negative value.

Example

>>> a, b = ObjectPipe("a"), ObjectPipe("b")
>>> b.send("test")
>>> select_objects([a, b], 1)
[b]
Parameters
  • inputs – objects to process

  • remain – timeout. If 0, return [].