Multithreading in napari
As described in An introduction to the event loop in napari, napari
, like most GUI
applications, runs in an event loop that is continually receiving and
responding to events like button presses and mouse events. This works fine
until one of the events takes a very long time to process. A long-running
function (such as training a machine learning model or running a complicated
analysis routine) may “block” the event loop in the main thread, leading to a
completely unresponsive viewer. The example used there was:
import napari
import numpy as np
viewer = napari.Viewer()
# everything is fine so far... but if we trigger a long computation
image = np.random.rand(1024, 512, 512).mean(0)
viewer.add_image(image)
# the entire interface freezes!
In order to avoid freezing the viewer during a long-running blocking function, you must run your function in another thread or process.
Processes, threads, and asyncio
¶
There are multiple ways to achieve “concurrency” (multiple things happening at the same time) in python, each with their own advantages and disadvantages. It’s a rich, complicated topic, and a full treatment is well beyond the scope of this document, but strategies generally fall into one of three camps:
Multithreading
Multprocessing
Single-thread concurrency with asyncio
For a good high level overview on concurrency in python, see
this post.
See the
trio docs
for a good introduction to Python’s new async/await
syntax.
And of course, see the python docs on
threading,
multiprocessing,
concurrent.futures,
and asyncio.
If you already have experience with any of these methods, you should be able to
immediately leverage them in napari. napari
also provides a few
convenience functions that allow you to easily run your long-running
methods in another thread.
Threading in napari with @thread_worker
¶
The simplest way to run a function in another thread in napari is to decorate
your function with the
@thread_worker
decorator.
Continuing with the example above:
import napari
import numpy as np
from napari.qt.threading import thread_worker
@thread_worker
def average_large_image():
return np.random.rand(1024, 512, 512).mean(0)
viewer = napari.Viewer()
worker = average_large_image() # create "worker" object
worker.returned.connect(viewer.add_image) # connect callback functions
worker.start() # start the thread!
napari.run()
The @thread_worker
decorator
converts your function into one that returns a
WorkerBase
instance. The worker
manages the work being done by your function in another thread. It also
exposes a few “signals” that let you respond to events happening in the other
thread. Here, we connect the worker.returned
signal to the
viewer.add_image
function, which has the effect of adding the result to the viewer when it is
ready. Lastly, we start the worker with
start()
because workers do not
start themselves by default.
The @thread_worker
decorator also
accepts keyword arguments like connect
, and start_thread
, which may
enable more concise syntax. The example below is equivalent to lines 7-15 in
the above example:
viewer = napari.Viewer()
@thread_worker(connect={"returned": viewer.add_image})
def average_large_image():
return np.random.rand(1024, 512, 512).mean(0)
average_large_image()
napari.run()
Note
When the connect
argument to
@thread_worker
is not None
, the thread will start
by default when the decorated function is called. Otherwise the thread must
be manually started by calling
worker.start()
.
Responding to feedback from threads¶
As shown above, the worker
object returned by a function decorated with
@thread_worker
has a number of
signals that are emitted in response to certain events. The base signals
provided by the worker
are:
started
- emitted when the work is startedfinished
- emitted when the work is finishedreturned
[value] - emitted with return value when the function returnserrored
[exception] - emitted with anException
object if an exception is raised in the thread.
Example: Custom exception handler¶
Because debugging issues in multithreaded applications can be tricky, the
default behavior of a @thread_worker
- decorated function is to re-raise
any exceptions in the main thread. But just as we connected the
worker.returned
event above to the viewer.add_image
method, you can
also connect your own custom handler to the worker.errored
event:
def my_handler(exc):
if isinstance(exc, ValueError):
print(f"We had a minor problem {exc}")
else:
raise exc
@thread_worker(connect={"errored": my_handler})
def error_prone_function():
...
Generators for the win!¶
quick reminder
A generator function is a special kind of function that returns a lazy iterator. To make a generator, you “yield” results rather than (or in addition to) “returning” them:
def my_generator():
for i in range(10):
yield i
Use a generator! By writing our decorated function as a generator that
yields
results instead of a function that returns
a single result at
the end, we gain a number of valuable features, and a few extra signals and
methods on the worker
.
yielded
[value]- emitted with a value when a value is yieldedpaused
- emitted when a running job has successfully pausedresumed
- emitted when a paused job has successfully resumedaborted
- emitted when a running job is successfully aborted
Additionally, generator workers
will also have a few additional methods:
send
- send a value into the thread (see below)pause
- send a request to pause a running workerresume
- send a request to resume a paused workertoggle_pause
- send a request to toggle the running state of the workerquit
- send a request to abort the worker
Retrieving intermediate results¶
The most obvious benefit of using a generator is that you can monitor intermediate results back in the main thread. Continuing with our example of taking the mean projection of a large stack, if we yield the cumulative average as it is generated (rather than taking the average of the fully generated stack) we can watch the mean projection as it builds:
import napari
import numpy as np
from napari.qt.threading import thread_worker
viewer = napari.Viewer()
def update_layer(new_image):
try:
# if the layer exists, update the data
viewer.layers['result'].data = new_image
except KeyError:
# otherwise add it to the viewer
viewer.add_image(
new_image, contrast_limits=(0.45, 0.55), name='result'
)
@thread_worker(connect={'yielded': update_layer})
def large_random_images():
cumsum = np.zeros((512, 512))
for i in range(1024):
cumsum += np.random.rand(512, 512)
if i % 16 == 0:
yield cumsum / (i + 1)
large_random_images() # call the function!
napari.run()
Note how we periodically (every 16 iterations) yield
the image result in
the large_random_images
function. We also connected the
yielded
event in the
@thread_worker
decorator to the previously-defined update_layer
function. The result is
that the image in the viewer is updated every time a new image is yielded.
Any time you can break up a long-running function into a stream of shorter-running yield statements like this, you not only benefit from the increased responsiveness in the viewer, you can often save on precious memory resources.
Flow control and escape hatches¶
A perhaps even more useful aspect of yielding periodically in our long running
function is that we provide a “hook” for the main thread to control the flow of
our long running function. When you use the
@thread_worker
decorator on a
generator function, the ability to stop, start, and quit a thread comes for
free. In the example below we decorate what would normally be an infinitely
yielding generator, but add a button that aborts the worker when clicked:
import time
import napari
from napari.qt.threading import thread_worker
from qtpy.QtWidgets import QPushButton
viewer = napari.Viewer()
def update_layer(new_image):
try:
viewer.layers['result'].data = new_image
except KeyError:
viewer.add_image(
new_image, name='result', contrast_limits=(-0.8, 0.8)
)
@thread_worker
def yield_random_images_forever():
i = 0
while True: # infinite loop!
yield np.random.rand(512, 512) * np.cos(i * 0.2)
i += 1
time.sleep(0.05)
worker = yield_random_images_forever()
worker.yielded.connect(update_layer)
# add a button to the viewer that, when clicked, stops the worker
button = QPushButton("STOP!")
button.clicked.connect(worker.quit)
worker.finished.connect(button.clicked.disconnect)
viewer.window.add_dock_widget(button)
worker.start()
napari.run()
Graceful exit¶
A side-effect of this added flow control is that napari
can gracefully
shutdown any still-running workers when you try to quit the program. Try the
example above, but quit the program without pressing the “STOP” button. No
problem! napari
asks the thread to stop itself the next time it yields,
and then closes without leaving any orphaned threads.
Now go back to the first example with the pure (non-generator) function, and try quitting before the function has returned (i.e. before the image appears). You’ll notice that it takes a while to quit: it has to wait for the background thread to finish because there is no good way to communicate the request that it quit! If you had a very long function, you’d be left with no choice but to force quit your program.
So whenever possible, sprinkle your long-running functions with yield
.
Full two-way communication¶
So far we’ve mostly been receiving results from the threaded function, but we
can send values into a generator-based thread as well using
worker.send()
This works
exactly like a standard python
generator.send
pattern. This next example ties together a number of concepts and demonstrates
two-thread communication with conditional flow control. It’s a simple
cumulative multiplier that runs in another thread, and exits if the product
hits “0”:
import napari
import time
from napari.qt.threading import thread_worker
from qtpy.QtWidgets import QLineEdit, QLabel, QWidget, QVBoxLayout
from qtpy.QtGui import QDoubleValidator
@thread_worker
def multiplier():
total = 1
while True:
time.sleep(0.1)
new = yield total
total *= new if new is not None else 1
if total == 0:
return "Game Over!"
viewer = napari.Viewer()
# make a widget to control the worker
# (not the main point of this example...)
widget = QWidget()
layout = QVBoxLayout()
widget.setLayout(layout)
result_label = QLabel()
line_edit = QLineEdit()
line_edit.setValidator(QDoubleValidator())
layout.addWidget(line_edit)
layout.addWidget(result_label)
viewer.window.add_dock_widget(widget)
# create the worker
worker = multiplier()
# define some callbacks
def on_yielded(value):
worker.pause()
result_label.setText(str(value))
line_edit.setText('1')
def on_return(value):
line_edit.setText('')
line_edit.setEnabled(False)
result_label.setText(value)
def send_next_value():
worker.send(float(line_edit.text()))
worker.resume()
worker.yielded.connect(on_yielded)
worker.returned.connect(on_return)
line_edit.returnPressed.connect(send_next_value)
worker.start()
napari.run()
Let’s break it down:
As usual, we decorate our generator function with
@thread_worker
and instantiate it to create aworker
.The most interesting line in this example is where we both
yield
the currenttotal
to the main thread (yield total
), and receive a new value from the main thread (withnew = yield
).In the main thread, we have connected that
worker.yielded
event to a callback that pauses the worker and updates theresult_label
widget.The thread will then wait indefinitely for the
resume()
command, which we have connected to theline_edit.returnPressed
signal.However, before that
resume()
command gets sent, we useworker.send()
to send the current value of theline_edit
widget into the thread for multiplication by the existing total.Lastly, if the thread total ever goes to “0”, we stop the thread by returning the string
"Game Over"
. In the main thread, theworker.returned
event is connected to a callback that disables theline_edit
widget and shows the string returned from the thread.
This example is a bit contrived, since there’s little need to put such a basic
computation in another thread. But it demonstrates some of the power and
features provided when decorating a generator function with the
@thread_worker
decorator.
Syntactic sugar¶
The @thread_worker
decorator is
just syntactic sugar for calling create_worker()
on
your function. In turn, create_worker()
is just a
convenient “factory function” that creates the right subtype of Worker
depending on your function type. The following three examples are equivalent:
Using the @thread_worker
decorator:
from napari.qt.threading import thread_worker
@thread_worker
def my_function(arg1, arg2=None):
...
worker = my_function('hello', arg2=42)
Using the create_worker
function:
from napari.qt.threading import create_worker
def my_function(arg1, arg2=None):
...
worker = create_worker(my_function, 'hello', arg2=42)
Using a Worker
class:
from napari.qt.threading import FunctionWorker
def my_function(arg1, arg2=None):
...
worker = FunctionWorker(my_function, 'hello', arg2=42)
(the main difference between using create_worker
and directly instantiating
the FunctionWorker
class is that create_worker
will automatically
dispatch the appropriate type of Worker
class depending on whether the
function is a generator or not).
Using a custom worker class¶
If you need even more control over the worker – such as the ability to define
custom methods or signals that the worker can emit, then you can subclass the
napari WorkerBase
class. When doing so, please
keep in mind the following guidelines:
The subclass must either implement the
work()
method (preferred), or in extreme cases, may directly reimplement therun()
method. (When a worker “start” is started withstart()
, the call order is alwaysworker.start()
→worker.run()
→worker.work()
.When implementing the
work()
method, it is important that you periodically checkself.abort_requested
in your thread loop, and exit the thread accordingly, otherwisenapari
will not be able to gracefully exit a long-running thread.def work(self): i = 0 while True: if self.abort_requested: self.aborted.emit() break time.sleep(0.5)
It is also important to be mindful of the fact that the
worker.start()
method adds the worker to a global Pool, such that it can request shutdown when exiting napari. So if you re-implementstart
, please be sure to callsuper().start()
to keep track of theworker
.When reimplementing the
run()
method, it is your responsibility to emit thestarted
,returned
,finished
, anderrored
signals at the appropriate moments.
For examples of subclassing WorkerBase
, have a
look at the two main concrete subclasses in napari:
FunctionWorker
and
GeneratorWorker
. You may also wish to simply
subclass one of those two classes.
Adding custom signals¶
In order to emit signals, an object must inherit from QObject
. However,
due to challenges with multiple inheritance in Qt, the signals for
WorkerBase
objects actually live in the
WorkerBase._signals
attribute (though they are accessible directly in the
worker namespace). To add custom signals to a
WorkerBase
subclass you must first create a new
QObject
with signals as class attributes:
from qtpy.QtCore import QObject, Signal
class MyWorkerSignals(QObject):
signal_name = Signal()
# or subclass one of the existing signals objects to "add"
# additional signals:
from napari.qt.threading import WorkerBaseSignals
# WorkerBaseSignals already has started, finished, errored...
class MyWorkerSignals(WorkerBaseSignals):
signal_name = Signal()
and then either directly override the self._signals
attribute on the
WorkerBase
class with an instance of your
signals class:
class MyWorker(WorkerBase):
def __init__(self):
super().__init__()
self._signals = MyWorkerSignals()
… or pass the signals class as the SignalsClass
argument when
initializing the superclass in your __init__
method:
class MyWorker(WorkerBase):
def __init__(self):
super().__init__(SignalsClass=MyWorkerSignals)