Using Crochet¶
Using Crochet involves three parts: reactor setup, defining functions that call into Twisted’s reactor, and using those functions.
Setup¶
Crochet does a number of things for you as part of setup. Most significantly,
it runs Twisted’s reactor in a thread it manages. Doing setup is easy, just
call the setup()
function:
from crochet import setup
setup()
Since Crochet is intended to be used as a library, multiple calls work just
fine; if more than one library does crochet.setup()
only the first one
will do anything.
@wait_for: Blocking calls into Twisted¶
Now that you’ve got the reactor running, the next stage is defining some
functions that will run inside the Twisted reactor thread. Twisted’s APIs are
not thread-safe, and so they cannot be called directly from another
thread. Moreover, results may not be available immediately. The easiest way to
deal with these issues is to decorate a function that calls Twisted APIs with
crochet.wait_for
.
- When the decorated function is called, the code will not run in the calling thread, but rather in the reactor thread.
- The function blocks until a result is available from the code running in the Twisted thread. The returned result is the result of running the code; if the code throws an exception, an exception is thrown.
- If the underlying code returns a
Deferred
, it is handled transparently; its results are extracted and passed to the caller. crochet.wait_for
takes atimeout
argument, afloat
indicating the number of seconds to wait until a result is available. If the given number of seconds pass and the underlying operation is still unfinished acrochet.TimeoutError
exception is raised, and the wrappedDeferred
is canceled. If the underlying API supports cancellation this might free up any unused resources, close outgoing connections etc., but cancellation is not guaranteed and should not be relied on.
To see what this means, let’s return to the first example in the documentation:
#!/usr/bin/python
"""
Do a DNS lookup using Twisted's APIs.
"""
from __future__ import print_function
# The Twisted code we'll be using:
from twisted.names import client
from crochet import setup, wait_for
setup()
# Crochet layer, wrapping Twisted's DNS library in a blocking call.
@wait_for(timeout=5.0)
def gethostbyname(name):
"""Lookup the IP of a given hostname.
Unlike socket.gethostbyname() which can take an arbitrary amount of time
to finish, this function will raise crochet.TimeoutError if more than 5
seconds elapse without an answer being received.
"""
d = client.lookupAddress(name)
d.addCallback(lambda result: result[0][0].payload.dottedQuad())
return d
if __name__ == '__main__':
# Application code using the public API - notice it works in a normal
# blocking manner, with no event loop visible:
import sys
name = sys.argv[1]
ip = gethostbyname(name)
print(name, "->", ip)
Twisted’s lookupAddress()
call returns a Deferred
, but the code calling the
decorated gethostbyname()
doesn’t know that. As far as the caller
concerned is just calling a blocking function that returns a result or raises
an exception. Run on the command line with a valid domain we get:
$ python blockingdns.py twistedmatrix.com
twistedmatrix.com -> 66.35.39.66
If we try to call the function with an invalid domain, we get back an exception:
$ python blockingdns.py doesnotexist
Trace back (most recent call last):
File "examples/blockingdns.py", line 33, in <module>
ip = gethostbyname(name)
File "/home/itamar/crochet/crochet/_eventloop.py", line 434, in wrapper
return eventual_result.wait(timeout)
File "/home/itamar/crochet/crochet/_eventloop.py", line 216, in wait
result.raiseException()
File "<string>", line 2, in raiseException
twisted.names.error.DNSNameError: <Message id=36791 rCode=3 maxSize=0 flags=answer,recDes,recAv queries=[Query('doesnotexist', 1, 1)] authority=[<RR name= type=SOA class=IN ttl=1694s auth=False>]>
You can, similarly, wrap an async
function:
#!/usr/bin/python
"""
Async/await DNS lookup using Twisted's APIs.
"""
from __future__ import print_function
# The Twisted code we'll be using:
from twisted.names import client
from crochet import setup, wait_for
setup()
# Crochet layer, wrapping Twisted's DNS library in a blocking call.
# Uses async/await.
@wait_for(timeout=5.0)
async def gethostbyname(name):
"""Lookup the IP of a given hostname.
Unlike socket.gethostbyname() which can take an arbitrary amount of time
to finish, this function will raise crochet.TimeoutError if more than 5
seconds elapse without an answer being received.
"""
result = await client.lookupAddress(name)
return result[0][0].payload.dottedQuad()
if __name__ == '__main__':
# Application code using the public API - notice it works in a normal
# blocking manner, with no event loop visible:
import sys
name = sys.argv[1]
ip = gethostbyname(name)
print(name, "->", ip)
@run_in_reactor: Asynchronous results¶
wait_for
is implemented using run_in_reactor
, a more sophisticated and
lower-level API. Rather than waiting until a result is available, it returns a
special object supporting multiple attempts to retrieve results, as well as
manual cancellation. This can be useful for running tasks “in the background”,
i.e. asynchronously, as opposed to blocking and waiting for them to finish.
Decorating a function that calls Twisted APIs with run_in_reactor
has two
consequences:
- When the function is called, the code will not run in the calling thread, but rather in the reactor thread.
- The return result from a decorated function is an
EventualResult
instance, wrapping the result of the underlying code, with built-in support for functions that returnDeferred
instances as well asasync
functions.
EventualResult
has the following basic methods:
wait(timeout)
: Return the result when it becomes available; if the result is an exception it will be raised. The timeout argument is afloat
indicating a number of seconds;wait()
will throwcrochet.TimeoutError
if the timeout is hit.cancel()
: Cancel the operation tied to the underlyingDeferred
. Many, but not all,Deferred
results returned from Twisted allow the underlying operation to be canceled. Even if implemented, cancellation may not be possible for a variety of reasons, e.g. it may be too late. Its main purpose to free up no longer used resources, and it should not be relied on otherwise.
There are also some more specialized methods:
original_failure()
returns the underlying Twisted Failure object if your result was a raised exception, allowing you to print the original traceback that caused the exception. This is necessary because the default exception you will see raised fromEventualResult.wait()
won’t include the stack from the underlying Twisted code where the exception originated.stash()
: Sometimes you want to store theEventualResult
in memory for later retrieval. This is specifically useful when you want to store a reference to theEventualResult
in a web session like Flask’s (see the example below).stash()
stores theEventualResult
in memory, and returns an integer uid that can be used to retrieve the result usingcrochet.retrieve_result(uid)
. Note that retrieval works only once per uid. You will need the stash theEventualResult
again (with a new resulting uid) if you want to retrieve it again later.
In the following example, you can see all of these APIs in use. For each user session, a download is started in the background. Subsequent page refreshes will eventually show the downloaded page.
#!/usr/bin/python
"""
A flask web application that downloads a page in the background.
"""
import logging
from flask import Flask, session, escape
from crochet import setup, run_in_reactor, retrieve_result, TimeoutError
# Can be called multiple times with no ill-effect:
setup()
app = Flask(__name__)
@run_in_reactor
def download_page(url):
"""
Download a page.
"""
from twisted.web.client import getPage
return getPage(url)
@app.route('/')
def index():
if 'download' not in session:
# Calling an @run_in_reactor function returns an EventualResult:
result = download_page('http://www.google.com')
session['download'] = result.stash()
return "Starting download, refresh to track progress."
# Retrieval is a one-time operation, so the uid in the session cannot be
# reused:
result = retrieve_result(session.pop('download'))
try:
download = result.wait(timeout=0.1)
return "Downloaded: " + escape(download)
except TimeoutError:
session['download'] = result.stash()
return "Download in progress..."
except:
# The original traceback of the exception:
return "Download failed:\n" + result.original_failure().getTraceback()
if __name__ == '__main__':
import os, sys
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
app.secret_key = os.urandom(24)
app.run()
Using Crochet from Twisted applications¶
If your application is already planning on running the Twisted reactor itself
(e.g. you’re using Twisted as a WSGI container), Crochet’s default behavior of
running the reactor in a thread is a problem. To solve this, Crochet provides
the no_setup()
function, which causes future calls to setup()
to do
nothing. Thus, an application that will run the Twisted reactor but also wants
to use a Crochet-using library must run it first:
from crochet import no_setup
no_setup()
# Only now do we import libraries that might run crochet.setup():
import blockinglib
# ... setup application ...
from twisted.internet import reactor
reactor.run()
Unit testing¶
Both @wait_for
and @run_in_reactor
expose the underlying Twisted
function via a __wrapped__
attribute. This allows unit testing of the
Twisted code without having to go through the Crochet layer.
#!/usr/bin/python
"""
Demonstration of accessing wrapped functions for testing.
"""
from __future__ import print_function
from crochet import setup, run_in_reactor
setup()
@run_in_reactor
def add(x, y):
return x + y
if __name__ == '__main__':
print("add(1, 2) returns EventualResult:")
print(" ", add(1, 2))
print("add.__wrapped__(1, 2) is the result of the underlying function:")
print(" ", add.__wrapped__(1, 2))
When run, this gives the following output:
add(1, 2) returns EventualResult:
<crochet._eventloop.EventualResult object at 0x2e8b390>
add.__wrapped__(1, 2) returns result of underlying function:
3