Source code for narla.runner.runner

from __future__ import annotations

import os
import sys
import time
import subprocess
from typing import Dict, List

import tqdm

import narla


[docs]class Runner: """ A Runner will execute a list of Settings across available GPUs :param all_settings: List of Settings. Will execute a run of ``main.py`` per Setting :param available_gpus: List of available GPU device IDs :param jobs_per_gpu: Number of Jobs to put on each GPU """ def __init__( self, all_settings: List[narla.settings.Settings], available_gpus: List[int] = (0,), jobs_per_gpu: int = 1, ): self._all_settings = all_settings self._available_gpus = available_gpus self._jobs_per_gpu = jobs_per_gpu # Internal dictionary mapping GPU ID to list of Jobs running on that device self._jobs_on_gpus: Dict[int, List[narla.runner.Job]] = {gpu: [] for gpu in available_gpus} self._progress_bar = tqdm.tqdm(total=len(all_settings), position=0, leave=True)
[docs] def execute(self): """ Execute a Job for each group of Settings passed to the runner. Will distribute Jobs evenly across GPUs as they become available """ while self._all_settings: self._fill_free_gpus() self._wait_for_free_gpu() self._wait_for_running_jobs() self._progress_bar.close()
@staticmethod def _execute_job(settings: narla.settings.Settings, gpu: int): settings.trial_settings.gpu = gpu trial_path = narla.io.format_trial_path(settings) narla.io.make_directories(trial_path) log_file = os.path.join(trial_path, "log.txt") process = subprocess.Popen( args=[f"{sys.executable} main.py " + settings.to_command_string()], shell=True, stdout=open(log_file, "a"), stderr=subprocess.STDOUT, ) job = narla.runner.Job(settings=settings, process=process) time.sleep(1) return job def _fill_free_gpus(self): for gpu, running_jobs in self._jobs_on_gpus.items(): while len(running_jobs) < self._jobs_per_gpu: settings = self._all_settings.pop(0) job = self._execute_job(settings=settings, gpu=gpu) running_jobs.append(job)
[docs] def is_done(self) -> bool: """ Returns ``True`` if all Jobs have completed """ for running_jobs in self._jobs_on_gpus.values(): if running_jobs: return False return True
def _remove_completed_jobs(self) -> bool: for running_jobs in self._jobs_on_gpus.values(): for job_index in reversed(range(len(running_jobs))): job = running_jobs[job_index] if job.is_done(): running_jobs.pop(job_index) self._progress_bar.update(1) return True return False def _wait_for_free_gpu(self): while True: job_removed = self._remove_completed_jobs() if job_removed: return def _wait_for_running_jobs(self): while not self.is_done(): self._remove_completed_jobs()