Skip to main content

Use Multiple CPU Cores

Question

How can I use multiple CPU cores within Metaflow tasks?

Answer

You can use multiple cores like you normally would in your Python code in a Metaflow task. For example, you can use Python's concurrent.future module to launch multiple processes and threads.

1Write Flow with Concurrency in a Task

This flow shows how to:

  • Call the do_task function four times on different cores using Python's built-in concurrent.futures.ThreadPoolExecutor.
  • Print the total time elapsed to demonstrate the tasks ran in parallel.
use_multiple_cpu_cores.py
from metaflow import FlowSpec, step, Parameter
from concurrent import futures
import time

def do_task(name):
time.sleep(3) # a long-running task
return name

class MulticoreFlow(FlowSpec):

num_cores = Parameter('num-cores', default=4)

@step
def start(self):
threads = [
"thread_%s" % i
for i in range(self.num_cores)
]
t0 = time.time()
with futures.ThreadPoolExecutor(
max_workers = self.num_cores
) as exe:
self.messages = [
msg for msg in exe.map(do_task, threads)
]
self.time_elapsed = time.time() - t0
self.next(self.end)

@step
def end(self):
print_msg = "All tasks completed in %.3fs"
print(print_msg % self.time_elapsed)
[print(msg) for msg in self.messages]

if __name__ == '__main__':
MulticoreFlow()

2Run Flow

python use_multiple_cpu_cores.py run
     Workflow starting (run-id 1663278003721691):
[1663278003721691/start/1 (pid 47937)] Task is starting.
[1663278003721691/start/1 (pid 47937)] Task finished successfully.
[1663278003721691/end/2 (pid 47941)] Task is starting.
[1663278003721691/end/2 (pid 47941)] All tasks completed in 3.011s
[1663278003721691/end/2 (pid 47941)] thread_0
[1663278003721691/end/2 (pid 47941)] thread_1
[1663278003721691/end/2 (pid 47941)] thread_2
[1663278003721691/end/2 (pid 47941)] thread_3
[1663278003721691/end/2 (pid 47941)] Task finished successfully.
Done!

Further Reading