Skip to content

Commit

Permalink
* Improved Bare-Metal testing
Browse files Browse the repository at this point in the history
* Adapted bucketing pairing for L2 to decrease runtime
* Included DMESG in L1 logs
* Output L1 & L2 summary.json
  • Loading branch information
ltran5991 committed Aug 21, 2024
1 parent 9624b47 commit fbba3e5
Show file tree
Hide file tree
Showing 15 changed files with 399 additions and 169 deletions.
8 changes: 8 additions & 0 deletions utils/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,12 @@ To run a full IGHS test, run the below command:
# Creates IGHS Report and screens clusters for any infected nodes.
# Will check Level 1 and 2 by default
python screen.py --initialize --screen
```
IGHS can alternatively be run through below script:
``` bash
# Creates IGHS Report and screens clusters for any infected nodes.
# Will check Level 1 and 2 by default
./run_ighs.sh
```
114 changes: 84 additions & 30 deletions utils/intel_gaudi_health_screen/IGNodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,68 @@ def __init__(self, health_report=HealthReport()):
Args:
health_report (HealthReport, optional): IGHS Health Report. Defaults to creating a new HealthReport().
"""
self.all_nodes = list()
self.launcher_nodes = list()
self.worker_nodes = list()
self.healthy_nodes = list()
self.infected_nodes = list()
self.all_nodes = list()
self.launcher_nodes = list()
self.worker_nodes = list()
self.healthy_nodes = set()
self.watch_nodes = set()
self.infected_nodes = set()
self.missing_nodes = set()

self.groups_tracker = list()
self.groups_tracker = list()
self.current_node_groups = list()

self.health_report = health_report
self.log_dir = health_report.f_dir
self.health_report = health_report
self.log_dir = health_report.f_dir

def update_node_status(self, healthy_nodes, infected_nodes, missing_nodes, undetected_nodes=[]):
"""Update the node lists status based on current node groups. If a node
paring fails with known healthy node, then the other node is considered
infected. Otherwise it will be moved to the healthy node list
Args:
healthy_nodes ([str]): List of Healthy nodes that pass IGHS testing
infected_nodes ([str]): List of nodes that failed to pass IGHS testing
missing_nodes ([str]): List of nodes that IGHS did not run testing on
undetected_nodes ([str]): List of nodes that IGHS did not run testing on b/c it wasn't scheduled on
"""
watch_nodes = self.watch_nodes.copy()

# Remove Nodes that haven't been tested yet from the healthy list
for n in undetected_nodes:
if n in watch_nodes and n in healthy_nodes:
healthy_nodes.remove(n)

self.healthy_nodes.update(healthy_nodes)

for group in self.current_node_groups:
n1, n2 = group
self.determine_node_health(infected_nodes, missing_nodes, n1, n2)
self.determine_node_health(infected_nodes, missing_nodes, n2, n1)

self.watch_nodes = self.watch_nodes.difference(self.healthy_nodes)

def determine_node_health(self, infected_nodes, missing_nodes, n1, n2):
"""Determine whether a node is healthy .
Args:
infected_nodes ([str]): List of nodes that failed to pass IGHS testing
missing_nodes ([str]): List of nodes that IGHS did not run testing on
n1 (str): Node name to investigate if it passes the IGHS test
n2 (str): Node name that should be considered healthy. This assist in verifying status of N1
"""
if n2 in self.healthy_nodes:
remove_from_watch = False

if n1 in infected_nodes:
self.infected_nodes.add(n1)
remove_from_watch = True
if n1 in missing_nodes:
self.missing_nodes.add(n1)
remove_from_watch = True

if remove_from_watch and n1 in self.watch_nodes:
self.watch_nodes.remove(n1)

class IGNode():

Expand Down Expand Up @@ -83,6 +133,14 @@ def scan_cards(self):

self.cards = dict(sorted(self.cards.items()))

def record_dmesg(self):
cmd = f"dmesg -T"
output = run_cmd(cmd)

self.logger.info("***** START of DMESG *****")
self.logger.info(output)
self.logger.info("***** END of DMESG *****")

def health_check(self, target_cards=[], write_report=False):
checked_cards = list()
processes = list()
Expand All @@ -107,9 +165,10 @@ def health_check(self, target_cards=[], write_report=False):
checked_cards.append(card)
self.logger.info(card)

self.record_dmesg()
self.write_json(checked_cards)
if(write_report):
self.health_report.write_rows(node_id=self.name, cards=checked_cards)
self.health_report.write_rows(data=checked_cards)

def write_json(self, cards):
node_status = dict()
Expand Down Expand Up @@ -147,10 +206,11 @@ def __init__(self, index=-1, module_id=-1, pci_address="", memory_used=-1, frame
self.multi_node_fail = False
self.is_infected = False

self.external_ports = [1, 8, 9]
self.incorrect_ports_direction = list()
self.internal_ports = list()
self.external_ports = list()

def check_health(self,num_checks_link_state=10, checked_cards=[]):
self.check_port_type()
self.check_link_state(attempts=num_checks_link_state, sleep_sec=0.2)
self.check_device_acquire_fail()
self.check_temperature_state()
Expand All @@ -159,7 +219,10 @@ def check_health(self,num_checks_link_state=10, checked_cards=[]):

def check_link_state(self, attempts=10, sleep_sec=0.5):
self.logger.debug(f"Checking {self.pci_address} Link State. Will check {attempts} times")
cmd = f"hl-smi -n link -i {self.pci_address}"
all_ports = self.internal_ports + self.external_ports
all_ports_txt = ",".join(all_ports)

cmd = f"hl-smi -n link -i {self.pci_address} -P {all_ports_txt}"
down_links = set()

for a in range(attempts):
Expand All @@ -179,30 +242,21 @@ def check_link_state(self, attempts=10, sleep_sec=0.5):
return self.down_links


def check_port_direction(self):
self.logger.debug(f"Checking {self.pci_address} Port Directions")
def check_port_type(self):
self.logger.debug(f"Checking {self.pci_address} Port Types (Internal|External)")

incorrect_ports_direction = list()
cmd = f"hl-smi -n ports -i {self.pci_address}"
output = run_cmd(cmd)
output_list = output.strip().split("\n")

ports_direction = output.strip().split("\n")
if ports_direction[-1] == "":
ports_direction.pop()
for output in output_list:
port_txt, port_type = output.split(":")
port = port_txt.split(" ")[1]

for i, direction in enumerate(ports_direction):
if i in self.external_ports:
if "internal" in direction:
incorrect_ports_direction.append(i)
self.is_infected = True
if "external" in port_type:
self.external_ports.append(port)
else:
if "external" in direction:
incorrect_ports_direction.append(i)
self.is_infected = True

self.incorrect_ports_direction = incorrect_ports_direction

return incorrect_ports_direction
self.internal_ports.append(port)

def check_device_acquire_fail(self):
self.logger.debug(f"Checking {self.pci_address} for Device Acquire Issues")
Expand Down
10 changes: 5 additions & 5 deletions utils/intel_gaudi_health_screen/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Intel Gaudi Health Screen 2.1.1
# Intel Gaudi Health Screen 2.2.0

A large scale Intel Gaudi cluster contains a lot of moving parts. To ensure distributed training proceeds smoothly, it is recommended to check the
cluster network health. Troubleshooting issues on a large cluster can be a tedious act. To simplify the debugging process the
Expand Down Expand Up @@ -233,7 +233,7 @@ system-info:
tcp-interface: "10.3.124.0/24"

# Image to run Intel Gaudi Health Screen
image: "vault.habana.ai/gaudi-docker/1.16.0/ubuntu22.04/habanalabs/pytorch-installer-2.2.2:latest"
image: "vault.habana.ai/gaudi-docker/1.17.0/ubuntu22.04/habanalabs/pytorch-installer-2.3.1:latest"

# Node Label used to identify a Intel Gaudi Node
gaudi-node-label: "brightcomputing.com/node-category=gaudi"
Expand All @@ -260,16 +260,16 @@ Before running the screening test, you need to generate the ssh key used for pas
``` bash
# Keys to setup initial bare-metal passwordless ssh connection between systems
ssh-keygen -t rsa -f ssh/ighs_rsa
ssh-keygen -t rsa -f ssh/ighs_rsa;
chmod 600 ssh/ighs_rsa;
chmod 644 ssh/ighs_rsa.pub;

# Keys to setup containers passwordless ssh connection
ssh-keygen -t rsa -f template/bare-metal/ssh/id_rsa
ssh-keygen -t rsa -f template/bare-metal/ssh/id_rsa;
chmod 600 template/bare-metal/ssh/id_rsa;
chmod 644 template/bare-metal/ssh/id_rsa.pub;

cat template/bare-metal/ssh/id_rsa.pub > template/bare-metal/sshauthorized_keys
cat template/bare-metal/ssh/id_rsa.pub > template/bare-metal/ssh/authorized_keys;
```

## Recovery Steps
Expand Down
2 changes: 1 addition & 1 deletion utils/intel_gaudi_health_screen/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ system-info:
tcp-interface: "10.3.124.0/24"

# Image to run Intel Gaudi Health Screen
image: "vault.habana.ai/gaudi-docker/1.16.2/ubuntu22.04/habanalabs/pytorch-installer-2.2.2:latest"
image: "vault.habana.ai/gaudi-docker/1.17.0/ubuntu22.04/habanalabs/pytorch-installer-2.3.1:latest"

# Node Label used to identify a Intel Gaudi Node
gaudi-node-label: "ighs_label=gaudi"
Expand Down
118 changes: 68 additions & 50 deletions utils/intel_gaudi_health_screen/hccl_demo_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,99 +15,117 @@
import logging
_logger = logging.getLogger("health_screener")

def find_groups(nodes_to_test, groups_tracker):
def find_groups(healthy_nodes, watch_nodes, groups_tracker):
""" Find a list of node groups to run hccl_demo all reduce test
Args:
nodes_to_test ([str]): Nodes list used to create a group of nodes for hccl_demo
healthy_nodes ([str]): Nodes that previously passed a pair testing of hccl_demo
watch_nodes ([str]): Nodes that haven't has a passing round of hccl_demo
groups_tracker ([str]): History of used groups. A group has to be unique
Returns:
([str],[str]): Unique list of groups of nodes, History of used groups
"""
random.shuffle(nodes_to_test)
random.shuffle(healthy_nodes)
random.shuffle(watch_nodes)

found_unique = True
num_nodes = len(nodes_to_test)
num_nodes = len(healthy_nodes) + len(watch_nodes)
node_groups = list()
max_num_groups = num_nodes // 2
max_combinations = (math.factorial(num_nodes)) / (math.factorial(num_nodes-2) * 2)
_logger.debug(f"nodes_to_test {len(nodes_to_test)}: {nodes_to_test}")
max_attempts = 10

def add_unique_group_id(interval=2):
nonlocal node_groups, nodes_to_test
i = 1
max_attempts = 10
found_unique = False
if num_nodes == 1:
_logger.warn(f"Need more than 1 Node to test pair all_reduce")
return False

while len(node_groups) < max_num_groups and found_unique:
i = 0
h_i, w_i = 0,0

if len(groups_tracker) >= max_combinations:
_logger.info(f"Reached maximum combinations {max_combinations} for {num_nodes} Nodes")
return found_unique
break

node_group, group_id = find_group_id(nodes_to_test, i, interval=interval)
node_group, group_id, (h_i, w_i) = find_group_id(healthy_nodes, watch_nodes, h_i, w_i)

if node_group[0] == node_group[1]:
_logger.info(f"Found duplicate nodes in node_group {node_group}. Exiting group id search")
found_unique = False
break

while group_id in groups_tracker:
if i > max_attempts:
_logger.warn(f"Max attempt {max_attempts} reached for finding unique pair combination.")
return found_unique
found_unique = False
break

node_group, group_id = find_group_id(nodes_to_test, i, interval=interval)
if group_id == "":
return found_unique
node_group, group_id, (h_i, w_i) = find_group_id(healthy_nodes, watch_nodes, h_i, w_i)
if group_id == "" and node_group[0] == node_group[1]:
found_unique = False
break

i += 1

found_unique = True
groups_tracker.append(group_id)
node_groups.append(node_group)

for n in node_group:
nodes_to_test.remove(n)

return found_unique
if found_unique:
groups_tracker.append(group_id)
node_groups.append(node_group)

for n in node_group:
if n in healthy_nodes:
healthy_nodes.remove(n)
if n in watch_nodes:
watch_nodes.remove(n)

if num_nodes == 1:
_logger.warn(f"Need more than 1 Node to test all_reduce")
return False

if num_nodes % 2 != 0:
# Ensures that every node has a group to test.
found_unique = add_unique_group_id(interval=3)

while len(node_groups) < max_num_groups and found_unique:
found_unique = add_unique_group_id()

if not found_unique:
_logger.debug(f"Finished searching for Unique pair combinations")
if len(watch_nodes) == 0:
break

return node_groups, groups_tracker

def find_group_id(nodes_to_test, start, interval=2):
def find_group_id(healthy_nodes, watch_nodes, h_i=0, w_i=0):
""" Finds a group of nodes and combines to form a group id
Args:
nodes_to_test ([str]): Viable node list
start (int): Index of next potential node id
interval (int, optional): The size of the group id. Most common is pairs of nodes. Defaults to 2.
healthy_nodes ([str]): Nodes that previously passed a pair testing of hccl_demo
watch_nodes ([str]): Nodes that haven't has a passing round of hccl_demo
h_i (int): Index of next potential node id for healthy_nodes
w_i (int): Index of next potential node id for watch_nodes
Returns:
([str], str): Potential nodes and their group id
"""
group_id = ""

if len(nodes_to_test) == 0:
return [], group_id

node_group = [nodes_to_test[0]]
node_group.extend(nodes_to_test[start:start+interval-1])
group_id = ""
node_group = []
max_attempt = 10

# Goal of testing is to test watch_nodes and pair it with a healhty_node if available
if len(watch_nodes) == 0 or (len(watch_nodes) == 1 and len(healthy_nodes)==0):
return node_group, group_id, (h_i, w_i)

for i in range(max_attempt):
if len(watch_nodes) and w_i < len(watch_nodes):
node_group.append(watch_nodes[w_i])
w_i += 1
if len(healthy_nodes) and h_i < len(healthy_nodes):
node_group.append(healthy_nodes[h_i])
h_i += 1

if h_i > len(healthy_nodes):
random.shuffle(healthy_nodes)
h_i = 0
if w_i > len(watch_nodes):
random.shuffle(watch_nodes)
w_i = 0

if len(node_group) >= 2:
break

if len(node_group) > 1:
node_group.sort()
group_id = "".join(node_group)
group_id = "-".join(node_group)

return node_group, group_id
return node_group, group_id, (h_i, w_i)

def gather_hccl_logs(job_path, round, log_dir, health_report):
""" Retrieve hccl_demo log files based on the job yamls executed
Expand Down
Loading

0 comments on commit fbba3e5

Please sign in to comment.