Using Crochet involves three parts: reactor setup, defining functions that call into Twisted’s reactor, and using those functions.
Crochet does a number of things for you as part of setup. Most significantly,
it runs Twisted’s reactor in a thread it manages. Doing setup is easy, just
from crochet import setup setup()
Since Crochet is intended to be used as a library, multiple calls work just
fine; if more than one library does
crochet.setup() only the first one
will do anything.
@wait_for: Blocking calls into Twisted¶
Now that you’ve got the reactor running, the next stage is defining some
functions that will run inside the Twisted reactor thread. Twisted’s APIs are
not thread-safe, and so they cannot be called directly from another
thread. Moreover, results may not be available immediately. The easiest way to
deal with these issues is to decorate a function that calls Twisted APIs with
- When the decorated function is called, the code will not run in the calling thread, but rather in the reactor thread.
- The function blocks until a result is available from the code running in the Twisted thread. The returned result is the result of running the code; if the code throws an exception, an exception is thrown.
- If the underlying code returns a
Deferred, it is handled transparently; its results are extracted and passed to the caller.
floatindicating the number of seconds to wait until a result is available. If the given number of seconds pass and the underlying operation is still unfinished a
crochet.TimeoutErrorexception is raised, and the wrapped
Deferredis canceled. If the underlying API supports cancellation this might free up any unused resources, close outgoing connections etc., but cancellation is not guaranteed and should not be relied on.
To see what this means, let’s return to the first example in the documentation:
#!/usr/bin/python """ Do a DNS lookup using Twisted's APIs. """ from __future__ import print_function # The Twisted code we'll be using: from twisted.names import client from crochet import setup, wait_for setup() # Crochet layer, wrapping Twisted's DNS library in a blocking call. @wait_for(timeout=5.0) def gethostbyname(name): """Lookup the IP of a given hostname. Unlike socket.gethostbyname() which can take an arbitrary amount of time to finish, this function will raise crochet.TimeoutError if more than 5 seconds elapse without an answer being received. """ d = client.lookupAddress(name) d.addCallback(lambda result: result.payload.dottedQuad()) return d if __name__ == '__main__': # Application code using the public API - notice it works in a normal # blocking manner, with no event loop visible: import sys name = sys.argv ip = gethostbyname(name) print(name, "->", ip)
lookupAddress() call returns a
Deferred, but the code calling the
gethostbyname() doesn’t know that. As far as the caller
concerned is just calling a blocking function that returns a result or raises
an exception. Run on the command line with a valid domain we get:
$ python blockingdns.py twistedmatrix.com twistedmatrix.com -> 188.8.131.52
If we try to call the function with an invalid domain, we get back an exception:
$ python blockingdns.py doesnotexist Trace back (most recent call last): File "examples/blockingdns.py", line 33, in <module> ip = gethostbyname(name) File "/home/itamar/crochet/crochet/_eventloop.py", line 434, in wrapper return eventual_result.wait(timeout) File "/home/itamar/crochet/crochet/_eventloop.py", line 216, in wait result.raiseException() File "<string>", line 2, in raiseException twisted.names.error.DNSNameError: <Message id=36791 rCode=3 maxSize=0 flags=answer,recDes,recAv queries=[Query('doesnotexist', 1, 1)] authority=[<RR name= type=SOA class=IN ttl=1694s auth=False>]>
You can, similarly, wrap an
#!/usr/bin/python """ Async/await DNS lookup using Twisted's APIs. """ from __future__ import print_function # The Twisted code we'll be using: from twisted.names import client from crochet import setup, wait_for setup() # Crochet layer, wrapping Twisted's DNS library in a blocking call. # Uses async/await. @wait_for(timeout=5.0) async def gethostbyname(name): """Lookup the IP of a given hostname. Unlike socket.gethostbyname() which can take an arbitrary amount of time to finish, this function will raise crochet.TimeoutError if more than 5 seconds elapse without an answer being received. """ result = await client.lookupAddress(name) return result.payload.dottedQuad() if __name__ == '__main__': # Application code using the public API - notice it works in a normal # blocking manner, with no event loop visible: import sys name = sys.argv ip = gethostbyname(name) print(name, "->", ip)
@run_in_reactor: Asynchronous results¶
wait_for is implemented using
run_in_reactor, a more sophisticated and
lower-level API. Rather than waiting until a result is available, it returns a
special object supporting multiple attempts to retrieve results, as well as
manual cancellation. This can be useful for running tasks “in the background”,
i.e. asynchronously, as opposed to blocking and waiting for them to finish.
Decorating a function that calls Twisted APIs with
run_in_reactor has two
- When the function is called, the code will not run in the calling thread, but rather in the reactor thread.
- The return result from a decorated function is an
EventualResultinstance, wrapping the result of the underlying code, with built-in support for functions that return
Deferredinstances as well as
EventualResult has the following basic methods:
wait(timeout): Return the result when it becomes available; if the result is an exception it will be raised. The timeout argument is a
floatindicating a number of seconds;
crochet.TimeoutErrorif the timeout is hit.
cancel(): Cancel the operation tied to the underlying
Deferred. Many, but not all,
Deferredresults returned from Twisted allow the underlying operation to be canceled. Even if implemented, cancellation may not be possible for a variety of reasons, e.g. it may be too late. Its main purpose to free up no longer used resources, and it should not be relied on otherwise.
There are also some more specialized methods:
original_failure()returns the underlying Twisted Failure object if your result was a raised exception, allowing you to print the original traceback that caused the exception. This is necessary because the default exception you will see raised from
EventualResult.wait()won’t include the stack from the underlying Twisted code where the exception originated.
stash(): Sometimes you want to store the
EventualResultin memory for later retrieval. This is specifically useful when you want to store a reference to the
EventualResultin a web session like Flask’s (see the example below).
EventualResultin memory, and returns an integer uid that can be used to retrieve the result using
crochet.retrieve_result(uid). Note that retrieval works only once per uid. You will need the stash the
EventualResultagain (with a new resulting uid) if you want to retrieve it again later.
In the following example, you can see all of these APIs in use. For each user session, a download is started in the background. Subsequent page refreshes will eventually show the downloaded page.
#!/usr/bin/python """ A flask web application that downloads a page in the background. """ import logging from flask import Flask, session, escape from crochet import setup, run_in_reactor, retrieve_result, TimeoutError # Can be called multiple times with no ill-effect: setup() app = Flask(__name__) @run_in_reactor def download_page(url): """ Download a page. """ from twisted.web.client import getPage return getPage(url) @app.route('/') def index(): if 'download' not in session: # Calling an @run_in_reactor function returns an EventualResult: result = download_page('http://www.google.com') session['download'] = result.stash() return "Starting download, refresh to track progress." # Retrieval is a one-time operation, so the uid in the session cannot be # reused: result = retrieve_result(session.pop('download')) try: download = result.wait(timeout=0.1) return "Downloaded: " + escape(download) except TimeoutError: session['download'] = result.stash() return "Download in progress..." except: # The original traceback of the exception: return "Download failed:\n" + result.original_failure().getTraceback() if __name__ == '__main__': import os, sys logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) app.secret_key = os.urandom(24) app.run()
Using Crochet from Twisted applications¶
If your application is already planning on running the Twisted reactor itself
(e.g. you’re using Twisted as a WSGI container), Crochet’s default behavior of
running the reactor in a thread is a problem. To solve this, Crochet provides
no_setup() function, which causes future calls to
setup() to do
nothing. Thus, an application that will run the Twisted reactor but also wants
to use a Crochet-using library must run it first:
from crochet import no_setup no_setup() # Only now do we import libraries that might run crochet.setup(): import blockinglib # ... setup application ... from twisted.internet import reactor reactor.run()
@run_in_reactor expose the underlying Twisted
function via a
__wrapped__ attribute. This allows unit testing of the
Twisted code without having to go through the Crochet layer.
#!/usr/bin/python """ Demonstration of accessing wrapped functions for testing. """ from __future__ import print_function from crochet import setup, run_in_reactor setup() @run_in_reactor def add(x, y): return x + y if __name__ == '__main__': print("add(1, 2) returns EventualResult:") print(" ", add(1, 2)) print("add.__wrapped__(1, 2) is the result of the underlying function:") print(" ", add.__wrapped__(1, 2))
When run, this gives the following output:
add(1, 2) returns EventualResult: <crochet._eventloop.EventualResult object at 0x2e8b390> add.__wrapped__(1, 2) returns result of underlying function: 3