scapy.automaton module

Automata with states, transitions and actions.

class scapy.automaton.ATMT

Bases: object

ACTION = 'Action'
CONDITION = 'Condition'
IOEVENT = 'I/O event'
exception NewStateRequested(state_func, automaton, *args, **kargs)

Bases: Exception

action_parameters(*args, **kargs)
run()
RECV = 'Receive condition'
STATE = 'State'
TIMEOUT = 'Timeout condition'
static action(cond, prio=0)
static condition(state, prio=0)
static ioevent(state, name, prio=0, as_supersocket=None)
static receive_condition(state, prio=0)
static state(initial=0, final=0, error=0)
static timeout(state, timeout)
class scapy.automaton.Automaton(*args, **kargs)

Bases: object

exception AutomatonError(msg, state=None, result=None)

Bases: scapy.automaton.AutomatonException

exception AutomatonException(msg, state=None, result=None)

Bases: Exception

exception AutomatonStopped(msg, state=None, result=None)

Bases: scapy.automaton.AutomatonException

exception Breakpoint(msg, state=None, result=None)

Bases: scapy.automaton.AutomatonStopped

exception CommandMessage(msg, state=None, result=None)

Bases: scapy.automaton.AutomatonException

exception ErrorState(msg, state=None, result=None)

Bases: scapy.automaton.AutomatonException

exception InterceptionPoint(msg, state=None, result=None, packet=None)

Bases: scapy.automaton.AutomatonStopped

exception Singlestep(msg, state=None, result=None)

Bases: scapy.automaton.AutomatonStopped

exception Stuck(msg, state=None, result=None)

Bases: scapy.automaton.AutomatonException

accept_packet(pkt=None, wait=False)
actions = {}
add_breakpoints(*bps)
add_interception_points(*ipts)
conditions = {}
debug(lvl, msg)
initial_states = []
ioevents = {}
ionames = []
iosupersockets = []
master_filter(pkt)
my_send(pkt)
next()
parse_args(debug=0, store=1, **kargs)
recv_conditions = {}
reject_packet(wait=False)
remove_breakpoints(*bps)
remove_interception_points(*ipts)
restart(*args, **kargs)
run(resume=None, wait=True)
runbg(resume=None, wait=False)
send(pkt)
start(*args, **kargs)
state = None
states = {}
stop()
timeout = {}
class scapy.automaton.Automaton_metaclass

Bases: type

build_graph()
graph(**kargs)
class scapy.automaton.Message(**args)

Bases: object

class scapy.automaton.ObjectPipe

Bases: scapy.automaton.SelectableObject

check_recv()

DEV: will be called only once (at beginning) to check if the object is ready.

close()
fileno()
read(n=0)
read_allowed_exceptions = ()
recv(n=0)
static select(sockets, remain=0.05)
send(obj)
write(obj)
class scapy.automaton.SelectableObject

Bases: object

DEV: to implement one of those, you need to add 2 things to your object: - add “check_recv” function - call “self.call_release” once you are ready to be read

You can set the __selectable_force_select__ to True in the class, if you want to # noqa: E501 force the handler to use fileno(). This may only be usable on sockets created using # noqa: E501 the builtin socket API.

call_release(arborted=False)

DEV: Must be call when the object becomes ready to read. Relesases the lock of _wait_non_ressources

check_recv()

DEV: will be called only once (at beginning) to check if the object is ready.

register_hook(hook)

DEV: When call_release() will be called, the hook will also

wait_return(callback)

Entry point of SelectableObject: register the callback

class scapy.automaton.SelectableSelector(inputs, remain)

Bases: object

Select SelectableObject objects.

inputs: objects to process remain: timeout. If 0, return []. customTypes: types of the objects that have the check_recv function.

process()

Entry point of SelectableSelector

scapy.automaton.select_objects(inputs, remain)

Select SelectableObject objects. Same than: select.select([inputs], [], [], remain) But also works on Windows, only on SelectableObject.

Parameters:
  • inputs – objects to process
  • remain – timeout. If 0, return [].