Messages arrive and depart via the network at unpredictable times - asyncio lets you deal with such interactions simultaneously.
>>> home = {}
>>> home['ntoll'] = 'Towcester'
>>> home['voidspace'] = 'Bugbrooke'
>>> home['pinner'] = 'Coventry'
>>> home
{
'ntoll': 'Towcester',
'voidspace': 'Bugbrooke',
'pinner': 'Coventry'
}
>>> home['ntoll']
'Towcester'
A very simple key / value data store.
(Based on real events - participants have been replaced by unreasonably happy actors)
PEP 315 states that callbacks are...
“[...] strictly serialized: one callback must finish before the next one will be called. This is an important guarantee: when two or more callbacks use or modify shared state, each callback is guaranteed that while it is running, the shared state isn't changed by another callback.”
Task A overwrites the record containing task B's changes.
It can't get on with other stuff while waiting for A.
:-(
We make plans: when the washing machine finishes, take the clothes and hang them out to dry.
As humans we work on concurrent tasks (like preparing breakfast) in a similar non-blocking manner.
asyncio
avoids potentially confusing and
complicated “threaded” concurrency while
retaining the benefits of strictly sequential code.
You need to understand coroutines, futures and tasks.
(Are FUN!)
@asyncio.coroutine
def handle_request(self, message, payload):
""" Handle an incoming HTTP request. """
response_code = 405 # Method Not Allowed
response_data = None
if message.method == 'POST':
try:
raw_data = yield from payload.read()
response_data = yield from self.process_data(raw_data)
response_code = 200 # OK
except Exception as ex:
# Log all errors
log.error(ex)
response_code = 500 # Internal Server Error
# etc...
return response
How do I handle the result of a coroutine?
(Are also FUN!)
def handle_resolved_future(future):
"""
This function is a callback. Its only argument is the
resolved future whose result it logs.
"""
log.info(future.result())
# Instantiate the future we're going to use to represent the
# as-yet unknown result.
my_future = asyncio.Future()
# Add the callback to the list of things to do when the
# result is known (the future is resolved).
my_future.add_done_callback(handle_resolved_future)
(Time passes)
# in some coroutine that has the Future referenced
my_future.set_result('A result set some time later!')
def handle_resolved_task(task):
"""
This function is a callback. Its only argument is the
resolved task whose result it logs.
"""
log.info(task.result())
task = asyncio.Task(slow_coroutine_operation())
task.add_done_callback(handle_resolved_task)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(task)
finally:
loop.close()
No need to resolve the task in a coroutine!
my_future.add_done_callback(handle_resolved_future)
add_generic_callbacks_to(my_future_or_task)
Each node has a unique id that is within a set of all possible values for a certain hash function (for example, sha512).
The id's value indicates the node's position in the clock-face.
In this way we can tell where a node is located in the abstract network, who it is close to and how far away nodes are from each other (there is some notion of distance).
>>> from hashlib import sha512
>>> item = {
... 'my_key': 'Some value I want to store'
... }
>>> sha512('my_key').hexdigest()
'176b1c65a58c69bb83cf0f9e06695c4094bc35e69f2576464a027fa52fa53a7a
b35c2b4a39203aff98606aed641f45abbc0d39d2be0723f44cc04e9b3e7e0f87'
Data is a key / value pair.
The key is turned into a hash. The value is stored at nodes whose IDs are close to the hash of the key.
Its similar to understanding where to look things up in a multi-volume encyclopedia.
Articles are words (keys) and associated definitions (values) that are stored in volumes that cover some alphabetical range.
How do nodes know where to look..?
Each node maintains a local "routing table" that tracks the state of its peers.
(ID, IP address and port etc...)
All interactions result in the exchange of status information between nodes - that's how the routing table is populated and kept up-to-date..
The routing table splits up the "clock face" of nodes into buckets.
For the purposes of housekeeping:
Each node behaves according to some very simple rules.
All interactions are asynchronous.
Lookups are also
parallel (concurrent).
Lookup is a fundamental action for a DHT.
It's how to work out which peers are to be contacted to get or store a value.
A lookup is concurrent because several peers can be interrogated at once for this information.
Here's how it works.
Say I want to put a value with a key whose hash puts it in a position close to 6 o'clock.
The lookup ends when I can't find any nodes closer to the target key.
All interactions are asynchronous.
Lookups are also
parallel (concurrent).
How is this handled within the realm of asyncio..?
class Lookup(asyncio.Future):
"""
Encapsulates a lookup in the DHT given a particular target
key and message type. Will resolve when a result is found
or errback otherwise.
"""
def __init__(self, key, message_type, node, event_loop):
"""
key - sha512 of target key.
message_type - class to create inter-node messages.
node - the local node in the DHT.
event_loop - the event loop.
"""
...etc...
A lookup is something whose result we can't yet know (until we've finished looking it up).
my_lookup = Lookup(key, FindValue, my_node, my_event_loop)
def got_result(lookup):
""" Naive callback """
result = lookup.result()
if isinstance(lookup.message_type, FindValue):
for remote_node in result:
# result is a list of closest nodes to "key".
# PUT the value at these nodes.
...etc...
else:
# result is a value stored at the location of "key"
...etc...
my_lookup.add_done_callback(got_result)
The state of the lookup (the progess of finding nodes close to the target) is held within the Lookup instance. It resolves with the result.
The result is either a value or not_found exception in the case of a GET(), or a list of closest known nodes in the case of a PUT().
How does asyncio handle different networking protocols?
How do nodes on the DHT handle the "down the wire" aspect of I/O..?
(Are also a lot of FUN!)
Transports are provided by asyncio
to
handle TCP, UDP etc. They are handle the low level I/O
layer and buffering and the event loop sets these up.
Protocols handle network protocols at the application layer (e.g. HTTP or netstring).
Transports are concerned with how stuff moves over the network.
Protocols work out what to do with the stuff sent over the network. They work out how to turn the raw bytes into some meaningful message (such as a netstring)
You only need to work with Protocols.
class NetstringProtocol(asyncio.Protocol):
"""http://cr.yp.to/proto/netstrings.txt"""
def data_received(self, data):
"""
Called whenever the local node receives data from the
remote peer.
"""
self.__data = data
try:
while self.__data:
if self._reader_state == DATA:
self.handle_data()
elif self._reader_state == COMMA:
self.handle_comma()
elif self._reader_state == LENGTH:
self.handle_length()
else:
msg = 'Invalid Netstring mode'
raise RuntimeError(msg)
except NetstringParseError:
self.transport.close()
All protocol classes must override the data_received method to handle the incoming bytes.
My DHT is network agnostic so it can communicate via HTTP or Netstring.
Taken directly from Twisted. Very close. In fact asyncio transport/protocol is based upon how Twisted works in this way.
I love Twisted, the DHT was originally written using it. Very close. Asyncio feels more lightweight and Pythonic.
Testing is "normal" although how you organise your code is a key factor.
Asyncio makes it easy to think about concurrent problems. I believe the abstractions make it easy to write simple, short and comprehensible solutions.
DON'T USE ASYNCIO if you need to do something with lots of CPU overhead - it'll block the event loop!
That's it! My DHT project is called the drogulus.