Skip to main content

Download Metaflow Task Code Package

Question

How to download and unpack the code package that Metaflow versions in the cloud?

Solution

On every run where you use a remote datastore, Metaflow snapshots your code package and pushes it to the datastore. It is both useful for versioning, and it is used to bootstrap the task runtime environment.

This page shows how to use the Metaflow Client API to download the code package from any task onto your workstation.

1Find the Relevant Task Pathspec

from metaflow import Task

# You can find your this info in the CLI logs
# If you run the Metaflow GUI it is also visible there
# Alternatively, if you use Outerbounds, find it in the dashboard
flow_name = 'TrainiumBERTFinetune'
run_id = '216825'
step_name = 'tune_bert'
task_id = 'control-216825-cache_dataset-1332173'
task_pathspec = f'{flow_name}/{run_id}/{step_name}/{task_id}'

# use the Metaflow Client API
task = Task(task_pathspec)

Once you have identified and loaded the Task object using Metaflow's Client API, you can download and unpack the code that was run on the remote machine.

2Download and Unpack the Code Package

EXTRACTION_PATH = task_id # set EXTRACTION_PATH to your desired location
task.code.tarball.extractall(path=EXTRACTION_PATH)
# What is the structure of the remote code package? 
# Note: this will be different for each task.
ls {EXTRACTION_PATH}
    INFO                 flow.py              neuron_monitor.py
config.py metaflow ops.py
custom_decorators.py metaflow_extensions train.py

3Bonus: Compare Code Package Versions

Suppose you want compare to versions of the codebase used to run a task. This section will provide a function to help you get started.

def compare_code_packages(
pathspec1,
pathspec2,
tmp_dir='/tmp/code-package-diffs',
html_dir='code-package-file-diffs'
):

import os
import pprint
from tempfile import TemporaryDirectory
import difflib
from filecmp import dircmp, cmpfiles
from metaflow import Task

def print_diff_files(dcmp):
"https://docs.python.org/3/library/filecmp.html"
for name in dcmp.diff_files:
print("diff_file %s found in %s and %s" % (
name, dcmp.left, dcmp.right))
for sub_dcmp in dcmp.subdirs.values():
print_diff_files(sub_dcmp)

def print_common_file_cmp(dir1, dir2, common, shallow=False):
match, mismatch, errors = cmpfiles(
dir1, dir2, common, shallow=shallow)
print('Matching files: '); pprint.pprint(match)
print('\nMismatching files: '); pprint.pprint(mismatch)
print('\nErrors: '); pprint.pprint(errors)
return mismatch

def create_file_diff(old_file, new_file, output_file):
file_1 = open(old_file, 'r').readlines()
file_2 = open(new_file, 'r').readlines()
if output_file:
delta = difflib.HtmlDiff().make_file(
file_1, file_2, old_file, new_file
)
with open(output_file, "w") as f:
print('Writing: ', output_file)
f.write(delta)
else:
delta = difflib.unified_diff(
file_1, file_2, old_file.name, new_file.name)
sys.stdout.writelines(delta)

def list_files_recursive(directory):
file_paths = []
for root, _, files in os.walk(directory):
for file in files:
file_paths.append(os.path.relpath(
os.path.join(root, file), start=directory))
return file_paths

def find_files_intersection(dir1, dir2):
files_dir1 = set(list_files_recursive(dir1))
files_dir2 = set(list_files_recursive(dir2))
file_intersection = list(files_dir1 & files_dir2)
return file_intersection

# download code packages
os.makedirs(os.path.join(tmp_dir, pathspec1), exist_ok=True)
os.makedirs(os.path.join(tmp_dir, pathspec2), exist_ok=True)
td1 = TemporaryDirectory(dir=tmp_dir, prefix=pathspec1)
td2 = TemporaryDirectory(dir=tmp_dir, prefix=pathspec2)
Task(pathspec1).code.tarball.extractall(path=td1.name)
Task(pathspec2).code.tarball.extractall(path=td2.name)

# find files with a diff
file_intersection = list(filter(
lambda x: not x.startswith('metaflow'), # metaflow code package is versioned already
find_files_intersection(td1.name, td2.name)
))

# find unique files
unique_files_task1 = list(filter(
lambda x: x not in file_intersection and not x.startswith('metaflow'),
list_files_recursive(td1.name)
))
unique_files_task2 = list(filter(
lambda x: x not in file_intersection and not x.startswith('metaflow'),
list_files_recursive(td2.name)
))

mismatches = print_common_file_cmp(
dir1=td1.name,
dir2=td2.name,
common=file_intersection
)
print('\nUnique files:')
print(f'\n{pathspec1}')
pprint.pprint(unique_files_task1)
print(f'\n{pathspec2}')
pprint.pprint(unique_files_task2)

# create html of diff foreach mismatch
out_files = []
print('\n')
for mismatch_file in mismatches:
file1 = os.path.join(td1.name, mismatch_file)
file2 = os.path.join(td2.name, mismatch_file)
output_html_file = os.path.join(
html_dir, mismatch_file.replace('.', '_') + '.html')
if not os.path.exists(output_html_file):
os.makedirs(
'/'.join(output_html_file.split('/')[:-1]),
exist_ok=True
)
create_file_diff(file1, file2, output_html_file)
out_files.append(output_html_file)

td1.cleanup(); td2.cleanup()
return out_files
pathspec1 = 'DBTFlow/216737/jaffle_models/1331782'
pathspec2 = 'DBTFlow/216798/jaffle_models/1332054'
generated_html_diffs = compare_code_packages(pathspec1, pathspec2)
    Matching files: 
['jaffle_shop/models/staging/schema.yml',
'conda.manifest',
'test.py',
'jaffle_shop/models/overview.md',
'jaffle_shop/models/orders.sql',
'jaffle_shop/dbt_project.yml',
'jaffle_shop/models/docs.md',
'jaffle_shop/models/schema.yml',
'profiles.yml',
'jaffle_shop/seeds/raw_orders.csv',
'config.py',
'jaffle_shop/models/staging/stg_payments.sql',
'jaffle_shop/seeds/raw_payments.csv',
'remotedbtflow.py',
'jaffle_shop/seeds/raw_customers.csv',
'jaffle_shop/models/staging/stg_customers.sql',
'sub.py',
'jaffle_shop/models/staging/stg_orders.sql',
'jaffle_shop/models/customers.sql']

Mismatching files:
['INFO', 'jaffletest.py', 'test_yield.py']

Errors:
[]

Unique files:

DBTFlow/216737/jaffle_models/1331782
[]

DBTFlow/216798/jaffle_models/1332054
[]


Writing: code-package-file-diffs/INFO.html
Writing: code-package-file-diffs/jaffletest_py.html
Writing: code-package-file-diffs/test_yield_py.html
from IPython.display import display, HTML

path = generated_html_diffs[2]
with open(path, 'r') as html:
contents = html.read()
display(HTML(contents))

/tmp/code-package-diffs/DBTFlow/216737/jaffle_models/133178241e4_rfi/test_yield.py
/tmp/code-package-diffs/DBTFlow/216798/jaffle_models/1332054laidzm33/test_yield.py
f1import subprocessf1import subprocess
22
33
4def main():4def main():
5    print("before yield")5    print("before yield")
6    for s in no():6    for s in no():
7        print(s)7        print(s)
n8    subprocess.call(["echo", "TEST"])n8    for line in subp_ret():
9        print(line)
9    for s in yes():10    for s in yes():
10        print(s)11        print(s)
11    12    
12    print("after yield")13    print("after yield")
1314
14def yes():15def yes():
15    yield "ahh"16    yield "ahh"
16    yield "no!"17    yield "no!"
17    yield "does this work?"18    yield "does this work?"
1819
19def no():20def no():
20    yield "no"21    yield "no"
21    yield "say it aint so"22    yield "say it aint so"
2223
tt24def subp_ret():
25    return subp()
26 
27 
28def subp():
29    try:
30        cmd = []
31        process = subprocess.Popen(
32            ["./echo.sh", "5"],
33            stdout=subprocess.PIPE,
34        )
35        while True:
36            process.poll()
37            if process.returncode is None:
38                # process is still running
39                line = process.stdout.readline()
40                if not line:
41                    # end of stdout, but process has not ended yet.
42                    continue
43                yield line.decode()
44            elif process.returncode == 0:
45                break
46            elif process.returncode != 0:
47                raise Exception("Derped")
48    finally:
49        pass
50 
23if __name__=="__main__":51if __name__=="__main__":
24    main()52    main()
Legends
Colors
 Added 
Changed
Deleted
Links
(f)irst change
(n)ext change
(t)op

Further Reading