How can I use multiple CPU cores within Metaflow tasks?
You can use multiple cores like you normally would in your Python code in a Metaflow task. For example, you can use Python's concurrent.future module to launch multiple processes and threads.
1Write Flow with Concurrency in a Task
This flow shows how to:
- Call the
do_taskfunction four times on different cores using Python's built-in
- Print the total time elapsed to demonstrate the tasks ran in parallel.
from metaflow import FlowSpec, step, Parameter
from concurrent import futures
time.sleep(3) # a long-running task
num_cores = Parameter('num-cores', default=4)
threads = [
"thread_%s" % i
for i in range(self.num_cores)
t0 = time.time()
max_workers = self.num_cores
) as exe:
self.messages = [
msg for msg in exe.map(do_task, threads)
self.time_elapsed = time.time() - t0
print_msg = "All tasks completed in %.3fs"
print(print_msg % self.time_elapsed)
[print(msg) for msg in self.messages]
if __name__ == '__main__':
python use_multiple_cpu_cores.py run
Workflow starting (run-id 1663278003721691):
[1663278003721691/start/1 (pid 47937)] Task is starting.
[1663278003721691/start/1 (pid 47937)] Task finished successfully.
[1663278003721691/end/2 (pid 47941)] Task is starting.
[1663278003721691/end/2 (pid 47941)] All tasks completed in 3.011s
[1663278003721691/end/2 (pid 47941)] thread_0
[1663278003721691/end/2 (pid 47941)] thread_1
[1663278003721691/end/2 (pid 47941)] thread_2
[1663278003721691/end/2 (pid 47941)] thread_3
[1663278003721691/end/2 (pid 47941)] Task finished successfully.