Skip to content

Commit

Permalink
Better initialization
Browse files Browse the repository at this point in the history
Start by assigning a ready task to the first slot of each GPU
  • Loading branch information
Waino committed Dec 18, 2023
1 parent 147dd16 commit 2d8435c
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 10 deletions.
19 changes: 19 additions & 0 deletions tools/config_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,25 @@ def allocate_devices(opts):
logger.info(f'total slots: {n_nodes * n_gpus_per_node * n_slots_per_gpu}')
logger.info(f'lang_pairs: {len(lang_pairs)}')

# If there are fewer ready tasks than GPUs: adjust the curriculum
if len(lps_ready_to_start) < (n_nodes * n_gpus_per_node):
iats = [
corpus.get('introduce_at_training_step', 0)
for _, corpus in opts.in_config[0]['tasks'].items()
]
iats = sorted(iats)
iats_at_last_gpu = iats[n_nodes * n_gpus_per_node]
lps_ready_to_start = []
for cname, corpus in opts.in_config[0]['tasks'].items():
src_lang, tgt_lang = corpus['src_tgt'].split('-')
if 'introduce_at_training_step' not in corpus:
lps_ready_to_start.append((src_lang, tgt_lang))
continue
adjusted = max(0, corpus.get('introduce_at_training_step', 0) - iats_at_last_gpu)
corpus['introduce_at_training_step'] = adjusted
if adjusted == 0:
lps_ready_to_start.append((src_lang, tgt_lang))

assignment = optimize_gpu_assignment(
n_nodes=n_nodes,
n_gpus_per_node=n_gpus_per_node,
Expand Down
54 changes: 44 additions & 10 deletions tools/gpu_assignment.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import random
import itertools
import json
import numpy as np
import random
import time
from collections import defaultdict, Counter, namedtuple
from functools import lru_cache
Expand Down Expand Up @@ -153,13 +155,24 @@ def make_slots(n_nodes, n_gpus_per_node, n_slots_per_gpu):

def initial_assignment(self, lang_pairs: List[Tuple[str, str]]):
lang_pairs = list(lang_pairs)
random.shuffle(lang_pairs)
if len(lang_pairs) > len(self.gpu_slots):
raise Exception(f'More lang pairs {len(lang_pairs)} than gpu slots {len(self.gpu_slots)}')
if len(self.gpu_slots) > len(lang_pairs):
# Add empty slots
lang_pairs.extend([None] * (len(self.gpu_slots) - len(lang_pairs)))
assignment_dict = {gpu_slot: lp for gpu_slot, lp in zip(self.gpu_slots, lang_pairs)}
# Start by assigning a ready task to the first slot of each GPU
shuffled_ready = list(self.ready_to_start)
random.shuffle(shuffled_ready)
shuffled_ready = shuffled_ready[:(self.n_nodes * self.n_gpus_per_node)]
first_slots = [gpu_slot for gpu_slot in self.gpu_slots if gpu_slot.slot == 0]
assignment_dict = {gpu_slot: lp for gpu_slot, lp in zip(first_slots, shuffled_ready)}
# Assign the rest randomly
seen = set(assignment_dict.values())
lang_pairs = [lp for lp in lang_pairs if lp not in seen]
gpu_slots = [gpu_slot for gpu_slot in self.gpu_slots if gpu_slot not in assignment_dict.keys()]
random.shuffle(lang_pairs)
for gpu_slot, lp in zip(gpu_slots, lang_pairs):
assignment_dict[gpu_slot] = lp
return Assignment.new(assignment_dict, self)

def cost(self, assignment: Assignment) -> float:
Expand Down Expand Up @@ -274,9 +287,10 @@ def _split_lps_cost(self, assignment: Assignment) -> float:
result += VERY_BAD * count
return result

def best_swap_for(self, slot_a: GpuSlot, assignment, current_cost):
def best_swap_for(self, slot_a: GpuSlot, assignment, current_cost, slot_subset=None):
slot_subset = self.gpu_slots if slot_subset is None else slot_subset
costs = [(current_cost, slot_a)]
for i, slot_b in enumerate(tqdm(self.gpu_slots, desc='best_swap_for', leave=False)):
for i, slot_b in enumerate(tqdm(slot_subset, desc='best_swap_for', leave=False)):
if slot_a.node == slot_b.node and slot_a.gpu == slot_b.gpu:
# No point swapping pairs already on the same device
continue
Expand All @@ -287,9 +301,10 @@ def best_swap_for(self, slot_a: GpuSlot, assignment, current_cost):
best_assignment = assignment.swap(slot_a, slot_b, self)
return best_cost, best_assignment

def swap_all_slots_once(self, assignment, current_cost):
for i, slot_a in enumerate(tqdm(self.gpu_slots, desc='swap_all_slots_once', leave=False)):
current_cost, assignment = self.best_swap_for(slot_a, assignment, current_cost)
def swap_all_slots_once(self, assignment, current_cost, slot_subset=None):
slot_subset = self.gpu_slots if slot_subset is None else slot_subset
for i, slot_a in enumerate(tqdm(slot_subset, desc='swap_all_slots_once', leave=False)):
current_cost, assignment = self.best_swap_for(slot_a, assignment, current_cost, slot_subset)
return current_cost, assignment

def optimize(self, assignment, current_cost, iterations=10, patience=1):
Expand All @@ -298,7 +313,13 @@ def optimize(self, assignment, current_cost, iterations=10, patience=1):
print(f'initial cost: {current_cost}', flush=True)
for i in tqdm(range(iterations), desc='iterations'):
prev_cost = current_cost
current_cost, assignment = self.swap_all_slots_once(assignment, current_cost)
slot_subsets = self.slot_subsets(self.gpu_slots, n=100)
for slot_subset in tqdm(slot_subsets, desc='subset'):
current_cost, assignment = self.swap_all_slots_once(
assignment,
current_cost,
slot_subset
)
print(f'\niteration {i} cost: {current_cost}', flush=True)
if prev_cost == current_cost:
stalled += 1
Expand All @@ -309,6 +330,19 @@ def optimize(self, assignment, current_cost, iterations=10, patience=1):
break
return current_cost, assignment, i

def slot_subsets(self, slots, n=100):
if len(slots) <= n:
return [slots]
slots = list(slots)
random.shuffle(slots)
n_chunks = int(np.ceil(len(slots) / n))
chunk_len = int(np.ceil(len(slots) / n_chunks))
chunks = []
islots = iter(slots)
for _ in range(n_chunks):
chunks.append(list(itertools.islice(islots, chunk_len)))
return chunks


def print_assignment(assignment, group_mapping, ready_to_start=None):
ready_to_start = set() if ready_to_start is None else ready_to_start
Expand Down Expand Up @@ -356,7 +390,7 @@ def optimize_gpu_assignment(
if log_name:
with open('gpu_assignment_cost_log.jsonl', 'a') as fout:
record = {
'method': 'empty_penalty',
'method': 'better_init',
'name': log_name,
'n_nodes': n_nodes,
'n_gpus_per_node': n_gpus_per_node,
Expand Down

0 comments on commit 2d8435c

Please sign in to comment.