diff --git a/utils/README.md b/utils/README.md index af6a8c0..b05dbab 100644 --- a/utils/README.md +++ b/utils/README.md @@ -133,4 +133,12 @@ To run a full IGHS test, run the below command: # Creates IGHS Report and screens clusters for any infected nodes. # Will check Level 1 and 2 by default python screen.py --initialize --screen +``` + +IGHS can alternatively be run through below script: + +``` bash +# Creates IGHS Report and screens clusters for any infected nodes. +# Will check Level 1 and 2 by default +./run_ighs.sh ``` \ No newline at end of file diff --git a/utils/intel_gaudi_health_screen/IGNodes.py b/utils/intel_gaudi_health_screen/IGNodes.py index 43313b3..be2a1f2 100644 --- a/utils/intel_gaudi_health_screen/IGNodes.py +++ b/utils/intel_gaudi_health_screen/IGNodes.py @@ -28,18 +28,68 @@ def __init__(self, health_report=HealthReport()): Args: health_report (HealthReport, optional): IGHS Health Report. Defaults to creating a new HealthReport(). """ - self.all_nodes = list() - self.launcher_nodes = list() - self.worker_nodes = list() - self.healthy_nodes = list() - self.infected_nodes = list() + self.all_nodes = list() + self.launcher_nodes = list() + self.worker_nodes = list() + self.healthy_nodes = set() + self.watch_nodes = set() + self.infected_nodes = set() + self.missing_nodes = set() - self.groups_tracker = list() + self.groups_tracker = list() + self.current_node_groups = list() - self.health_report = health_report - self.log_dir = health_report.f_dir + self.health_report = health_report + self.log_dir = health_report.f_dir + def update_node_status(self, healthy_nodes, infected_nodes, missing_nodes, undetected_nodes=[]): + """Update the node lists status based on current node groups. If a node + paring fails with known healthy node, then the other node is considered + infected. Otherwise it will be moved to the healthy node list + Args: + healthy_nodes ([str]): List of Healthy nodes that pass IGHS testing + infected_nodes ([str]): List of nodes that failed to pass IGHS testing + missing_nodes ([str]): List of nodes that IGHS did not run testing on + undetected_nodes ([str]): List of nodes that IGHS did not run testing on b/c it wasn't scheduled on + """ + watch_nodes = self.watch_nodes.copy() + + # Remove Nodes that haven't been tested yet from the healthy list + for n in undetected_nodes: + if n in watch_nodes and n in healthy_nodes: + healthy_nodes.remove(n) + + self.healthy_nodes.update(healthy_nodes) + + for group in self.current_node_groups: + n1, n2 = group + self.determine_node_health(infected_nodes, missing_nodes, n1, n2) + self.determine_node_health(infected_nodes, missing_nodes, n2, n1) + + self.watch_nodes = self.watch_nodes.difference(self.healthy_nodes) + + def determine_node_health(self, infected_nodes, missing_nodes, n1, n2): + """Determine whether a node is healthy . + + Args: + infected_nodes ([str]): List of nodes that failed to pass IGHS testing + missing_nodes ([str]): List of nodes that IGHS did not run testing on + n1 (str): Node name to investigate if it passes the IGHS test + n2 (str): Node name that should be considered healthy. This assist in verifying status of N1 + """ + if n2 in self.healthy_nodes: + remove_from_watch = False + + if n1 in infected_nodes: + self.infected_nodes.add(n1) + remove_from_watch = True + if n1 in missing_nodes: + self.missing_nodes.add(n1) + remove_from_watch = True + + if remove_from_watch and n1 in self.watch_nodes: + self.watch_nodes.remove(n1) class IGNode(): @@ -83,6 +133,14 @@ def scan_cards(self): self.cards = dict(sorted(self.cards.items())) + def record_dmesg(self): + cmd = f"dmesg -T" + output = run_cmd(cmd) + + self.logger.info("***** START of DMESG *****") + self.logger.info(output) + self.logger.info("***** END of DMESG *****") + def health_check(self, target_cards=[], write_report=False): checked_cards = list() processes = list() @@ -107,9 +165,10 @@ def health_check(self, target_cards=[], write_report=False): checked_cards.append(card) self.logger.info(card) + self.record_dmesg() self.write_json(checked_cards) if(write_report): - self.health_report.write_rows(node_id=self.name, cards=checked_cards) + self.health_report.write_rows(data=checked_cards) def write_json(self, cards): node_status = dict() @@ -147,10 +206,11 @@ def __init__(self, index=-1, module_id=-1, pci_address="", memory_used=-1, frame self.multi_node_fail = False self.is_infected = False - self.external_ports = [1, 8, 9] - self.incorrect_ports_direction = list() + self.internal_ports = list() + self.external_ports = list() def check_health(self,num_checks_link_state=10, checked_cards=[]): + self.check_port_type() self.check_link_state(attempts=num_checks_link_state, sleep_sec=0.2) self.check_device_acquire_fail() self.check_temperature_state() @@ -159,7 +219,10 @@ def check_health(self,num_checks_link_state=10, checked_cards=[]): def check_link_state(self, attempts=10, sleep_sec=0.5): self.logger.debug(f"Checking {self.pci_address} Link State. Will check {attempts} times") - cmd = f"hl-smi -n link -i {self.pci_address}" + all_ports = self.internal_ports + self.external_ports + all_ports_txt = ",".join(all_ports) + + cmd = f"hl-smi -n link -i {self.pci_address} -P {all_ports_txt}" down_links = set() for a in range(attempts): @@ -179,30 +242,21 @@ def check_link_state(self, attempts=10, sleep_sec=0.5): return self.down_links - def check_port_direction(self): - self.logger.debug(f"Checking {self.pci_address} Port Directions") + def check_port_type(self): + self.logger.debug(f"Checking {self.pci_address} Port Types (Internal|External)") - incorrect_ports_direction = list() cmd = f"hl-smi -n ports -i {self.pci_address}" output = run_cmd(cmd) + output_list = output.strip().split("\n") - ports_direction = output.strip().split("\n") - if ports_direction[-1] == "": - ports_direction.pop() + for output in output_list: + port_txt, port_type = output.split(":") + port = port_txt.split(" ")[1] - for i, direction in enumerate(ports_direction): - if i in self.external_ports: - if "internal" in direction: - incorrect_ports_direction.append(i) - self.is_infected = True + if "external" in port_type: + self.external_ports.append(port) else: - if "external" in direction: - incorrect_ports_direction.append(i) - self.is_infected = True - - self.incorrect_ports_direction = incorrect_ports_direction - - return incorrect_ports_direction + self.internal_ports.append(port) def check_device_acquire_fail(self): self.logger.debug(f"Checking {self.pci_address} for Device Acquire Issues") diff --git a/utils/intel_gaudi_health_screen/README.md b/utils/intel_gaudi_health_screen/README.md index 711d415..af4ad4e 100644 --- a/utils/intel_gaudi_health_screen/README.md +++ b/utils/intel_gaudi_health_screen/README.md @@ -1,4 +1,4 @@ -# Intel Gaudi Health Screen 2.1.1 +# Intel Gaudi Health Screen 2.2.0 A large scale Intel Gaudi cluster contains a lot of moving parts. To ensure distributed training proceeds smoothly, it is recommended to check the cluster network health. Troubleshooting issues on a large cluster can be a tedious act. To simplify the debugging process the @@ -233,7 +233,7 @@ system-info: tcp-interface: "10.3.124.0/24" # Image to run Intel Gaudi Health Screen -image: "vault.habana.ai/gaudi-docker/1.16.0/ubuntu22.04/habanalabs/pytorch-installer-2.2.2:latest" +image: "vault.habana.ai/gaudi-docker/1.17.0/ubuntu22.04/habanalabs/pytorch-installer-2.3.1:latest" # Node Label used to identify a Intel Gaudi Node gaudi-node-label: "brightcomputing.com/node-category=gaudi" @@ -260,16 +260,16 @@ Before running the screening test, you need to generate the ssh key used for pas ``` bash # Keys to setup initial bare-metal passwordless ssh connection between systems -ssh-keygen -t rsa -f ssh/ighs_rsa +ssh-keygen -t rsa -f ssh/ighs_rsa; chmod 600 ssh/ighs_rsa; chmod 644 ssh/ighs_rsa.pub; # Keys to setup containers passwordless ssh connection -ssh-keygen -t rsa -f template/bare-metal/ssh/id_rsa +ssh-keygen -t rsa -f template/bare-metal/ssh/id_rsa; chmod 600 template/bare-metal/ssh/id_rsa; chmod 644 template/bare-metal/ssh/id_rsa.pub; -cat template/bare-metal/ssh/id_rsa.pub > template/bare-metal/sshauthorized_keys +cat template/bare-metal/ssh/id_rsa.pub > template/bare-metal/ssh/authorized_keys; ``` ## Recovery Steps diff --git a/utils/intel_gaudi_health_screen/config.yaml b/utils/intel_gaudi_health_screen/config.yaml index 794288c..cb88c01 100644 --- a/utils/intel_gaudi_health_screen/config.yaml +++ b/utils/intel_gaudi_health_screen/config.yaml @@ -12,7 +12,7 @@ system-info: tcp-interface: "10.3.124.0/24" # Image to run Intel Gaudi Health Screen -image: "vault.habana.ai/gaudi-docker/1.16.2/ubuntu22.04/habanalabs/pytorch-installer-2.2.2:latest" +image: "vault.habana.ai/gaudi-docker/1.17.0/ubuntu22.04/habanalabs/pytorch-installer-2.3.1:latest" # Node Label used to identify a Intel Gaudi Node gaudi-node-label: "ighs_label=gaudi" diff --git a/utils/intel_gaudi_health_screen/hccl_demo_helper.py b/utils/intel_gaudi_health_screen/hccl_demo_helper.py index 40b676d..d0d492c 100644 --- a/utils/intel_gaudi_health_screen/hccl_demo_helper.py +++ b/utils/intel_gaudi_health_screen/hccl_demo_helper.py @@ -15,99 +15,117 @@ import logging _logger = logging.getLogger("health_screener") -def find_groups(nodes_to_test, groups_tracker): +def find_groups(healthy_nodes, watch_nodes, groups_tracker): """ Find a list of node groups to run hccl_demo all reduce test Args: - nodes_to_test ([str]): Nodes list used to create a group of nodes for hccl_demo + healthy_nodes ([str]): Nodes that previously passed a pair testing of hccl_demo + watch_nodes ([str]): Nodes that haven't has a passing round of hccl_demo groups_tracker ([str]): History of used groups. A group has to be unique Returns: ([str],[str]): Unique list of groups of nodes, History of used groups """ - random.shuffle(nodes_to_test) + random.shuffle(healthy_nodes) + random.shuffle(watch_nodes) found_unique = True - num_nodes = len(nodes_to_test) + num_nodes = len(healthy_nodes) + len(watch_nodes) node_groups = list() max_num_groups = num_nodes // 2 max_combinations = (math.factorial(num_nodes)) / (math.factorial(num_nodes-2) * 2) - _logger.debug(f"nodes_to_test {len(nodes_to_test)}: {nodes_to_test}") + max_attempts = 10 - def add_unique_group_id(interval=2): - nonlocal node_groups, nodes_to_test - i = 1 - max_attempts = 10 - found_unique = False + if num_nodes == 1: + _logger.warn(f"Need more than 1 Node to test pair all_reduce") + return False + + while len(node_groups) < max_num_groups and found_unique: + i = 0 + h_i, w_i = 0,0 if len(groups_tracker) >= max_combinations: _logger.info(f"Reached maximum combinations {max_combinations} for {num_nodes} Nodes") - return found_unique + break - node_group, group_id = find_group_id(nodes_to_test, i, interval=interval) + node_group, group_id, (h_i, w_i) = find_group_id(healthy_nodes, watch_nodes, h_i, w_i) + + if node_group[0] == node_group[1]: + _logger.info(f"Found duplicate nodes in node_group {node_group}. Exiting group id search") + found_unique = False + break while group_id in groups_tracker: if i > max_attempts: _logger.warn(f"Max attempt {max_attempts} reached for finding unique pair combination.") - return found_unique + found_unique = False + break - node_group, group_id = find_group_id(nodes_to_test, i, interval=interval) - if group_id == "": - return found_unique + node_group, group_id, (h_i, w_i) = find_group_id(healthy_nodes, watch_nodes, h_i, w_i) + if group_id == "" and node_group[0] == node_group[1]: + found_unique = False + break i += 1 - found_unique = True - groups_tracker.append(group_id) - node_groups.append(node_group) - - for n in node_group: - nodes_to_test.remove(n) - - return found_unique + if found_unique: + groups_tracker.append(group_id) + node_groups.append(node_group) + for n in node_group: + if n in healthy_nodes: + healthy_nodes.remove(n) + if n in watch_nodes: + watch_nodes.remove(n) - if num_nodes == 1: - _logger.warn(f"Need more than 1 Node to test all_reduce") - return False - - if num_nodes % 2 != 0: - # Ensures that every node has a group to test. - found_unique = add_unique_group_id(interval=3) - - while len(node_groups) < max_num_groups and found_unique: - found_unique = add_unique_group_id() - - if not found_unique: - _logger.debug(f"Finished searching for Unique pair combinations") + if len(watch_nodes) == 0: break return node_groups, groups_tracker -def find_group_id(nodes_to_test, start, interval=2): +def find_group_id(healthy_nodes, watch_nodes, h_i=0, w_i=0): """ Finds a group of nodes and combines to form a group id Args: - nodes_to_test ([str]): Viable node list - start (int): Index of next potential node id - interval (int, optional): The size of the group id. Most common is pairs of nodes. Defaults to 2. + healthy_nodes ([str]): Nodes that previously passed a pair testing of hccl_demo + watch_nodes ([str]): Nodes that haven't has a passing round of hccl_demo + h_i (int): Index of next potential node id for healthy_nodes + w_i (int): Index of next potential node id for watch_nodes Returns: ([str], str): Potential nodes and their group id """ - group_id = "" - - if len(nodes_to_test) == 0: - return [], group_id - - node_group = [nodes_to_test[0]] - node_group.extend(nodes_to_test[start:start+interval-1]) + group_id = "" + node_group = [] + max_attempt = 10 + + # Goal of testing is to test watch_nodes and pair it with a healhty_node if available + if len(watch_nodes) == 0 or (len(watch_nodes) == 1 and len(healthy_nodes)==0): + return node_group, group_id, (h_i, w_i) + + for i in range(max_attempt): + if len(watch_nodes) and w_i < len(watch_nodes): + node_group.append(watch_nodes[w_i]) + w_i += 1 + if len(healthy_nodes) and h_i < len(healthy_nodes): + node_group.append(healthy_nodes[h_i]) + h_i += 1 + + if h_i > len(healthy_nodes): + random.shuffle(healthy_nodes) + h_i = 0 + if w_i > len(watch_nodes): + random.shuffle(watch_nodes) + w_i = 0 + + if len(node_group) >= 2: + break if len(node_group) > 1: node_group.sort() - group_id = "".join(node_group) + group_id = "-".join(node_group) - return node_group, group_id + return node_group, group_id, (h_i, w_i) def gather_hccl_logs(job_path, round, log_dir, health_report): """ Retrieve hccl_demo log files based on the job yamls executed diff --git a/utils/intel_gaudi_health_screen/screen.py b/utils/intel_gaudi_health_screen/screen.py index 85c8993..3f644e1 100644 --- a/utils/intel_gaudi_health_screen/screen.py +++ b/utils/intel_gaudi_health_screen/screen.py @@ -10,7 +10,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os, datetime, yaml, sys, time +import os, datetime, yaml, sys, time, json import argparse import logging @@ -24,45 +24,40 @@ _logger = None -def monitor_ighs_status(system_mode, level, nodes, job_path="tmp/jobs", log_dir="", timeout_s=240, round=0, monitor=True): +def monitor_ighs_status(system_mode, level, nodes, timeout_s=240, round=0): sleep_time_s = 2 max_attempts = (timeout_s // sleep_time_s) + min(timeout_s % sleep_time_s, 1) current_run_status = dict() + lvl_check_msg = f"Checking IGHS Level {level}" - if len(nodes.healthy_nodes) > 0: - num_nodes = len(nodes.healthy_nodes) - else: - num_nodes = len(nodes.all_nodes) + num_nodes = len(nodes.all_nodes) + if level == 2: + num_nodes = len(nodes.current_node_groups) * 2 + lvl_check_msg += f" Round {round}" - _logger.info(f"Checking IGHS Level {level} Status") + _logger.info(f"{lvl_check_msg} Status") - if monitor: - for attempt in range(max_attempts): - num_found_nodes = system_mode.check_screen_complete(current_run_status=current_run_status, health_report=nodes.health_report, level=level, round=round) + for attempt in range(max_attempts): + num_found_nodes = system_mode.check_screen_complete(current_run_status=current_run_status, health_report=nodes.health_report, level=level, round=round) - if num_found_nodes == num_nodes: - _logger.info(f"Found {num_found_nodes}/{num_nodes} Nodes during Health Screen") - break - - _logger.info(f"Attempt {attempt}/{max_attempts}: Found {num_found_nodes}/{num_nodes} Nodes - Will Check again in {sleep_time_s} seconds") - time.sleep(sleep_time_s) - - num_found_nodes = system_mode.check_screen_complete(current_run_status=current_run_status, health_report=nodes.health_report, level=level, round=round, final_check=True) - else: - hosts = nodes.all_nodes - if len(nodes.launcher_nodes) > 0: - hosts = nodes.launcher_nodes + if num_found_nodes == num_nodes: + _logger.info(f"Found {num_found_nodes}/{num_nodes} Nodes during Health Screen") + break - nodes.health_report.gather_health_report(level, remote_path="/tmp/ighs", hosts=hosts) - nodes.health_report.consolidate_health_report(level=level, report_dir=f"{log_dir}") + _logger.info(f"Attempt {attempt}/{max_attempts}: Found {num_found_nodes}/{num_nodes} Nodes - Will Check again in {sleep_time_s} seconds") + time.sleep(sleep_time_s) + num_found_nodes = system_mode.check_screen_complete(current_run_status=current_run_status, health_report=nodes.health_report, level=level, round=round, final_check=True) if level == 1: detected_nodes, infected_nodes, missing_nodes = nodes.health_report.extract_node_info() missing_nodes.update(set(nodes.all_nodes).difference(detected_nodes)) + undetected_nodes = [] nodes.health_report.update_health_report(detected_nodes=detected_nodes, infected_nodes=infected_nodes, missing_nodes=missing_nodes) elif level == 2: detected_nodes, infected_nodes, missing_nodes = nodes.health_report.extract_hccl_demo_info() + undetected_nodes = set(nodes.all_nodes).difference(detected_nodes) + nodes.health_report.update_health_report(detected_nodes=detected_nodes, infected_nodes=infected_nodes, missing_nodes=missing_nodes) detected_nodes_l1, infected_nodes_l1, missing_nodes = nodes.health_report.extract_node_info() @@ -74,13 +69,17 @@ def monitor_ighs_status(system_mode, level, nodes, job_path="tmp/jobs", log_dir= healthy_nodes = sorted(list(healthy_nodes)) missing_nodes = sorted(list(missing_nodes)) infected_nodes = sorted(list(infected_nodes)) + nodes.update_node_status(healthy_nodes, infected_nodes, missing_nodes, undetected_nodes=undetected_nodes) + + watch_nodes = sorted(list(nodes.watch_nodes)) detected_nodes = sorted(list(detected_nodes)) if level == 1: - nodes.healthy_nodes = healthy_nodes + nodes.healthy_nodes = set(healthy_nodes) _logger.info(f"Infected {len(infected_nodes)} Node: {infected_nodes}") _logger.info(f"Missing {len(missing_nodes)} Node: {missing_nodes}") + _logger.info(f"Unverified {len(watch_nodes)} Node: {watch_nodes}") _logger.info(f"Healthy {len(healthy_nodes)} Node: {healthy_nodes}") _logger.info(f"Detected {len(detected_nodes)} Node: {detected_nodes}") @@ -120,7 +119,6 @@ def main(args): hostfile=config_data["system-info"]["hostfile"], namespace=config_data["system-info"]["namespace"], log_dir=args.logs_dir) - monitor = True elif config_data["system-info"]["type"] == "bare-metal": system_mode = BareMetalUtils(image=config_data["image"], @@ -128,7 +126,6 @@ def main(args): ssh_path=config_data["system-info"]["ssh-path"], tcp_interface=config_data["system-info"]["tcp-interface"], log_dir=args.logs_dir) - monitor = False else: _logger.error(f"system_mode: {system_mode} in {args.config} is not set correctly. system_mode has to be set to k8s or bare-metal") sys.exit(1) @@ -146,54 +143,82 @@ def main(args): if args.screen: start_time = datetime.datetime.now() - intel_gaudi_nodes = IGNodes(health_report=health_report) - intel_gaudi_nodes.all_nodes = system_mode.collect_nodes(gaudi_node_label=config_data["gaudi-node-label"]) + intel_gaudi_nodes = IGNodes(health_report=health_report) + intel_gaudi_nodes.all_nodes = system_mode.collect_nodes(gaudi_node_label=config_data["gaudi-node-label"]) + intel_gaudi_nodes.watch_nodes = set(intel_gaudi_nodes.all_nodes) + healthy_nodes, infected_nodes, missing_nodes = list(), list(), list() + occupied_nodes, missing_cards_nodes, misc_nodes = list(), list(), list() if config_data["level-1"]["run"]: _logger.info("Running Level 1 Checks: Card Diagnostics") if not os.path.exists(f"{health_report.f_dir}/L1"): os.makedirs(f"{health_report.f_dir}/L1") - system_mode.initialize_node_jobs(level=1, + nodes_initialized = system_mode.initialize_node_jobs(level=1, nodes=intel_gaudi_nodes, job_base_path=job_path) - healthy_nodes, infected_nodes, missing_nodes = monitor_ighs_status(system_mode=system_mode, - level=1, - nodes=intel_gaudi_nodes, - job_path=job_path, - log_dir=args.logs_dir, - timeout_s=config_data["level-1"]["timeout_s"], - monitor=monitor) - system_mode.diagnose_unhealthy_nodes(infected_nodes, missing_nodes) - - system_mode.clear_ighs_pods() + if nodes_initialized: + healthy_nodes, infected_nodes, missing_nodes = monitor_ighs_status(system_mode=system_mode, + level=1, + nodes=intel_gaudi_nodes, + timeout_s=config_data["level-1"]["timeout_s"]) + occupied_nodes, missing_cards_nodes, misc_nodes = system_mode.diagnose_unhealthy_nodes(infected_nodes, missing_nodes) + system_mode.clear_ighs_pods() + + summary = { + "level": 1, + "infected": infected_nodes, + "missing": missing_nodes, + "occupied": occupied_nodes, + "missing_cards": missing_cards_nodes, + "untested": misc_nodes, + "healthy": healthy_nodes + } + + with open(f"{args.logs_dir}/ighs_L1_summary.json", 'w', encoding ='utf8') as f: + json.dump(summary, f, indent=4) if config_data["level-2"]["run"]: _logger.info("Running Level 2 Checks: Pair HCCL_DEMO All Reduce") if not os.path.exists(f"{health_report.f_dir}/L2"): os.makedirs(f"{health_report.f_dir}/L2") + intel_gaudi_nodes.healthy_nodes = set() + intel_gaudi_nodes.watch_nodes = set(intel_gaudi_nodes.all_nodes) + for i in range(config_data["level-2"]["num-rounds"]): - system_mode.initialize_node_jobs(level=2, + nodes_initialized = system_mode.initialize_node_jobs(level=2, nodes=intel_gaudi_nodes, job_base_path=job_path, round=i) + if not nodes_initialized: + _logger.info(f"Round {i}/{config_data['level-2']['num-rounds']}: No other Nodes to screen. Exit screening early.") + break + healthy_nodes, infected_nodes, missing_nodes = monitor_ighs_status(system_mode=system_mode, level=2, nodes=intel_gaudi_nodes, - job_path=job_path, - log_dir=args.logs_dir, timeout_s=config_data["level-2"]["timeout_s"], - round=i, - monitor=monitor) - system_mode.diagnose_unhealthy_nodes(infected_nodes, missing_nodes) - + round=i) + occupied_nodes, missing_cards_nodes, misc_nodes = system_mode.diagnose_unhealthy_nodes(infected_nodes, missing_nodes) system_mode.clear_ighs_pods(job_type="mpijobs") - if len(infected_nodes) == 0 and len(missing_nodes) == 0: - _logger.info(f"Round {i}/{config_data['level-2']['num-rounds']}: No Infected or Missing Nodes found. Exit screening early.") + if len(intel_gaudi_nodes.watch_nodes) == 0: + _logger.info(f"Round {i}/{config_data['level-2']['num-rounds']}: No other Nodes to screen. Exit screening early.") break + summary = { + "level": 2, + "infected": infected_nodes, + "missing": missing_nodes, + "occupied": occupied_nodes, + "missing_cards": missing_cards_nodes, + "untested": misc_nodes, + "healthy": healthy_nodes + } + + with open(f"{args.logs_dir}/ighs_L2_summary.json", 'w', encoding ='utf8') as f: + json.dump(summary, f, indent=4) end_time = datetime.datetime.now() diff_time = (end_time - start_time) diff --git a/utils/intel_gaudi_health_screen/system_utils.py b/utils/intel_gaudi_health_screen/system_utils.py index fc1c852..99d0671 100644 --- a/utils/intel_gaudi_health_screen/system_utils.py +++ b/utils/intel_gaudi_health_screen/system_utils.py @@ -10,7 +10,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time, os, shutil, yaml, glob, json +import time, os, shutil, yaml, glob, json, re import multiprocessing import logging @@ -79,7 +79,8 @@ def initialize_node_jobs(self, level, nodes, job_base_path="tmp/jobs", round=0): - update_val = { + nodes_initialized = False + update_val = { "metadata-name": "", "round": round, "container-image": self.image, @@ -100,13 +101,17 @@ def initialize_node_jobs(self, level, yaml_type = "mpijob" metadata_app = "ighs-hccl" - if len(nodes.healthy_nodes) > 0: - nodes_to_test = nodes.healthy_nodes - else: - nodes_to_test = nodes.all_nodes.copy() + healthy_nodes = list(nodes.healthy_nodes.copy()) + watch_nodes = list(nodes.watch_nodes.copy()) + + node_groups, nodes.groups_tracker = find_groups(healthy_nodes, watch_nodes, nodes.groups_tracker) + nodes.current_node_groups = node_groups + job_path = f"{job_base_path}/L2/r{round}" + + if len(node_groups) == 0 : + _logger.warn(f"No Node Groups to test found during initialization") + return nodes_initialized - node_groups, nodes.groups_tracker = find_groups(nodes_to_test, nodes.groups_tracker) - job_path = f"{job_base_path}/L2/r{round}" for i, node_group in enumerate(node_groups): if level == 1: @@ -135,6 +140,10 @@ def initialize_node_jobs(self, level, p = multiprocessing.Process(target=self.cp_ighs, args=(self.namespace,cwd, metadata_app)) p.start() + nodes_initialized = True + + return nodes_initialized + def cp_ighs(self, namespace, cwd, metadata_app): pods_done = dict() cmd = f"kubectl get pods -n {namespace} -l app={metadata_app} -o=custom-columns='NAME:.metadata.name,STATUS:.status.phase' --no-headers" @@ -238,10 +247,10 @@ def clear_ighs_pods(self, job_type="jobs"): if len(output) == 0: break - _logger.info(f"Attempt {attempts}: Pods are still up. Will wait 10 seconds to check again") - time.sleep(10) + _logger.info(f"Attempt {attempts}: Pods are still up. Will wait 5 seconds to check again") + time.sleep(5) - def check_screen_complete(self, current_run_status, health_report, level, round=0, final_check=False): + def check_screen_complete(self, current_run_status, health_report, level, round=0, job_path="tmp/jobs", final_check=False): if level == 1: metadata_app = "ighs" log_dir = f"{self.log_dir}/L{level}" @@ -310,7 +319,7 @@ def check_screen_complete(self, current_run_status, health_report, level, round= elif level == 2: num_nodes = 0 - # L2 runs MPIJobs that containers 2 or 3 Nodes + # L2 runs MPIJobs that contains 2 nodes for k,v in current_run_status.items(): num_nodes += v[1] @@ -356,6 +365,7 @@ def diagnose_unhealthy_nodes(self, infected_nodes, missing_nodes): if(len(misc_list)): _logger.info(f"{len(misc_list)} Unaccounted Nodes: {misc_list}") + return in_use_list, missing_cards_list, misc_list class BareMetalUtils(SystemUtils): @@ -388,6 +398,11 @@ def initialize_ssh(self): _logger.debug("Activating ssh-agent") cmd = f"ssh-agent -s" output = run_cmd(cmd) + _OUTPUT_PATTERN = re.compile(r'SSH_AUTH_SOCK=(?P[^;]+).*SSH_AGENT_PID=(?P\d+)', re.MULTILINE | re.DOTALL) + match = _OUTPUT_PATTERN.search(output) + data = match.groupdict() + os.environ['SSH_AUTH_SOCK'] = data['SSH_AUTH_SOCK'] + os.environ['SSH_AGENT_PID'] = data['SSH_AGENT_PID'] _logger.debug("Adding ighs private key to ssh-agent") cmd = f"ssh-add {self.ssh_path}/ighs_rsa" @@ -395,15 +410,16 @@ def initialize_ssh(self): def initialize_system(self): + _logger.info(f"Setting up ssh connection for hosts: {self.hosts}") + for h in self.hosts: + cmd = f"ssh-copy-id -o StrictHostKeyChecking=no -i {self.ssh_path}/ighs_rsa.pub {os.environ['USER']}@{h}" + output = run_cmd(cmd,verbose=True) + self.clear_ighs_pods() self.clear_ighs_pods(job_type="mpijobs") self.clear_jobs() self.clear_remote_jobs() - _logger.info(f"Setting up ssh connection for hosts: {self.hosts}") - for h in self.hosts: - cmd = f"ssh-copy-id -o StrictHostKeyChecking=no -i {self.ssh_path}/ighs_rsa.pub {os.environ['USER']}@{h}" - output = run_cmd(cmd) self.initialize_ssh() copy_files(src="../", dst=f"{self.remote_path}", exclude={"logs", "ssh", "tmp"}, hosts=self.hosts) @@ -418,7 +434,8 @@ def initialize_node_jobs(self, level, nodes, job_base_path="tmp/jobs", round=0): - update_val = { + nodes_initialized = False + update_val = { "metadata-name": "", "round": round, "container-image": self.image, @@ -433,16 +450,19 @@ def initialize_node_jobs(self, level, node_groups = nodes.all_nodes job_path = f"{job_base_path}/L1" elif level == 2: - if len(nodes.healthy_nodes) > 0: - nodes_to_test = [n.replace("ighs-","").replace(":48","") for n in nodes.healthy_nodes] - else: - nodes_to_test = nodes.all_nodes.copy() + healthy_nodes = list(nodes.healthy_nodes.copy()) + watch_nodes = list(nodes.watch_nodes.copy()) - node_groups, nodes.groups_tracker = find_groups(nodes_to_test, nodes.groups_tracker) + node_groups, nodes.groups_tracker = find_groups(healthy_nodes, watch_nodes, nodes.groups_tracker) + nodes.current_node_groups = node_groups job_path = f"{job_base_path}/L2/r{round}" nodes.launcher_nodes = list() nodes.worker_nodes = list() + if len(node_groups) == 0: + _logger.warn(f"No Node Groups to test found during initialization") + return nodes_initialized + self.update_yaml_job(source_file="config.yaml", out_dir="tmp", out_file="config.yaml", yaml_type="config") for i, node_group in enumerate(node_groups): if level == 1: @@ -497,11 +517,16 @@ def initialize_node_jobs(self, level, f"pdsh -w ^{job_base_path}/L2/r{round}/hostfile_workers {self.docker_compose_alias} -f {self.remote_path}/jobs/L2/r{round}/intel-gaudi-docker-compose-L2-worker.yaml build", f"pdsh -w ^{job_base_path}/L2/r{round}/hostfile_workers {self.docker_compose_alias} -f {self.remote_path}/jobs/L2/r{round}/intel-gaudi-docker-compose-L2-worker.yaml up -d --remove-orphans", f"pdsh -w ^{job_base_path}/L2/r{round}/hostfile_launchers {self.docker_compose_alias} -f {self.remote_path}/jobs/L2/r{round}/intel-gaudi-docker-compose-L2-launcher.yaml build", - f"pdsh -w ^{job_base_path}/L2/r{round}/hostfile_launchers {self.docker_compose_alias} -f {self.remote_path}/jobs/L2/r{round}/intel-gaudi-docker-compose-L2-launcher.yaml up --remove-orphans" + f"pdsh -w ^{job_base_path}/L2/r{round}/hostfile_launchers {self.docker_compose_alias} -f {self.remote_path}/jobs/L2/r{round}/intel-gaudi-docker-compose-L2-launcher.yaml up -d --remove-orphans" ] for cmd in cmd_list: - output = run_cmd(cmd).strip() + verbose = ("up" in cmd) + output = run_cmd(cmd, verbose=verbose).strip() + + nodes_initialized = True + + return nodes_initialized def update_yaml_job(self, source_file="template/bare-metal/intel-gaudi-docker-compose-L1.yaml", @@ -572,5 +597,95 @@ def clear_remote_jobs(self): cmd = f"{self.pdsh_cmd} rm -R /tmp/ighs/jobs/" output = run_cmd(cmd) + def check_screen_complete(self, current_run_status, health_report, level, round=0, job_path="tmp/jobs", final_check=False): + docker_status_cmd = "ps -a --format json --filter status=exited" + + if level == 1: + log_dir = f"{self.log_dir}/L{level}" + docker_compose_f = f"{self.remote_path}/jobs/L1/intel-gaudi-docker-compose-L1.yaml" + cmd = f"{self.docker_compose_cmd} -f {docker_compose_f} {docker_status_cmd}" + elif level == 2: + log_dir = f"{self.log_dir}/L{level}/r{round}" + docker_compose_f = f"{self.remote_path}/jobs/L2/r{round}/intel-gaudi-docker-compose-L2-launcher.yaml" + cmd = f"pdsh -w ^{job_path}/L2/r{round}/hostfile_launchers {self.docker_compose_alias} -f {docker_compose_f} {docker_status_cmd}" + + if not os.path.exists(log_dir): + os.makedirs(log_dir) + + check_log_cmd = f"{self.docker_compose_alias} -f {docker_compose_f} logs" + + output = run_cmd(cmd).strip() + pods = output.split("\n") + + for p in pods: + if ":" not in p: + continue + + colon_index = p.index(":") + name = p[:colon_index] + data_txt = p[colon_index+1:] + + data = json.loads(data_txt) + + if data["State"] == "exited": + cmd = f"ssh {name} {check_log_cmd}" + output = run_cmd(cmd).strip().split("\n") + + start_analyze = False + for l in output: + if "START of Node Report" in l: + start_analyze = True + continue + elif "END of Node Report" in l: + start_analyze = False + continue + + #### analyze output + if start_analyze: + # Ignore Logger output level + bracket_index = l.index("{") + node_status_txt = l[bracket_index:] + status_dict = json.loads(node_status_txt) + + if not name in current_run_status: + if level == 1: + health_report.write_rows(data=status_dict["cards"], level=level) + current_run_status[name] = True + elif level == 2: + health_report.write_rows(data=[status_dict], level=level) + current_run_status[name] = (True, status_dict["num_nodes"]) + name = f"ighs-hccl-r{status_dict['round']}-{status_dict['group_id']}" + + with open(f"{log_dir}/{name}.json", 'w', encoding ='utf8') as f: + json.dump(status_dict, f, indent=4) + with open(f"{log_dir}/{name}.log", 'w', encoding ='utf8') as f: + f.write('\n'.join(output)) + elif level==2 and final_check: + cmd = f"ssh {name} {check_log_cmd}" + output = run_cmd(cmd).strip().split("\n") + + if not name in current_run_status: + hccL_results = hccl_demo_check(job_id=name, health_report=health_report, hccl_log=output, write=False) + f_name = f"ighs-hccl-r{hccL_results['round']}-{hccL_results['group_id']}" + + with open(f"{log_dir}/{f_name}.json", 'w', encoding ='utf8') as f: + json.dump(hccL_results, f, indent=4) + with open(f"{log_dir}/{f_name}.log", 'w', encoding ='utf8') as f: + f.write('\n'.join(output)) + + health_report.write_rows(data=[hccL_results], level=level) + current_run_status[name] = (True, hccL_results["num_nodes"]) + + if level == 1: + num_nodes = len(current_run_status) + elif level == 2: + num_nodes = 0 + + # L2 runs MPIJobs that contains 2 nodes + for k,v in current_run_status.items(): + num_nodes += v[1] + + return num_nodes + def diagnose_unhealthy_nodes(self, infected_nodes, missing_nodes): pass diff --git a/utils/intel_gaudi_health_screen/template/bare-metal/intel-gaudi-docker-compose-L1.yaml b/utils/intel_gaudi_health_screen/template/bare-metal/intel-gaudi-docker-compose-L1.yaml index fbee7d9..2c028b8 100644 --- a/utils/intel_gaudi_health_screen/template/bare-metal/intel-gaudi-docker-compose-L1.yaml +++ b/utils/intel_gaudi_health_screen/template/bare-metal/intel-gaudi-docker-compose-L1.yaml @@ -1,5 +1,3 @@ -version: '3.3' - services: ighs_level1: image: ighs_level1 @@ -16,6 +14,7 @@ services: - IGHS_LEVEL=1 cap_add: - SYS_NICE + - SYSLOG ipc: host network_mode: host working_dir: /tmp/ighs/intel_gaudi_health_screen @@ -24,5 +23,5 @@ services: - /tmp/ighs/intel_gaudi_health_screen:/tmp/ighs/intel_gaudi_health_screen - /etc/localtime:/etc/localtime:ro command: > - bash -c "python screen.py --ighs-check node --logs-dir $${LOG_DIR} --node-name $${MY_NODE_NAME} --node-write-report && \ + bash -c "python screen.py --ighs-check node --logs-dir $${LOG_DIR} --node-name $${MY_NODE_NAME} && \ chmod 777 -R $${LOG_DIR}" diff --git a/utils/intel_gaudi_health_screen/template/bare-metal/intel-gaudi-docker-compose-L2-launcher.yaml b/utils/intel_gaudi_health_screen/template/bare-metal/intel-gaudi-docker-compose-L2-launcher.yaml index 454550e..80aee76 100644 --- a/utils/intel_gaudi_health_screen/template/bare-metal/intel-gaudi-docker-compose-L2-launcher.yaml +++ b/utils/intel_gaudi_health_screen/template/bare-metal/intel-gaudi-docker-compose-L2-launcher.yaml @@ -1,5 +1,3 @@ -version: '3.3' - services: ighs_level2_launcher: image: ighs_level2 @@ -16,6 +14,7 @@ services: - IGHS_LEVEL=2 cap_add: - SYS_NICE + - SYSLOG ipc: host network_mode: host working_dir: /tmp/ighs/intel_gaudi_health_screen diff --git a/utils/intel_gaudi_health_screen/template/bare-metal/intel-gaudi-docker-compose-L2-worker.yaml b/utils/intel_gaudi_health_screen/template/bare-metal/intel-gaudi-docker-compose-L2-worker.yaml index 8a99927..30ea074 100644 --- a/utils/intel_gaudi_health_screen/template/bare-metal/intel-gaudi-docker-compose-L2-worker.yaml +++ b/utils/intel_gaudi_health_screen/template/bare-metal/intel-gaudi-docker-compose-L2-worker.yaml @@ -1,5 +1,3 @@ -version: '3.3' - services: ighs_level2_worker: image: ighs_level2 @@ -16,6 +14,7 @@ services: - IGHS_LEVEL=2 cap_add: - SYS_NICE + - SYSLOG ipc: host network_mode: host working_dir: /tmp/ighs/intel_gaudi_health_screen diff --git a/utils/intel_gaudi_health_screen/template/bare-metal/run_hccl_demo.sh b/utils/intel_gaudi_health_screen/template/bare-metal/run_hccl_demo.sh index 5d51d58..62ffd1a 100755 --- a/utils/intel_gaudi_health_screen/template/bare-metal/run_hccl_demo.sh +++ b/utils/intel_gaudi_health_screen/template/bare-metal/run_hccl_demo.sh @@ -12,6 +12,7 @@ CMD="python ${WORK_DIR}/run_hccl_demo.py \ --test all_reduce \ --loop 1000 \ --size 32m \ +-clean \ -mpi "; mkdir -p $HOME_DIR/$LOG_DIR/L2/$ROUND/; @@ -35,6 +36,6 @@ $CMD \ 2>&1 | ts '[%Y-%m-%d %H:%M:%S]' | tee -a $HOME_DIR/$LOG_DIR/L2/$ROUND/$JOB_ID.log; cd ${HOME_DIR}; -python $HOME_DIR/screen.py --ighs-check hccl-demo --logs-dir $LOG_DIR --job-id $JOB_ID --target-nodes $TARGET_NODES --node-name $MY_NODE_NAME; +python $HOME_DIR/screen.py --ighs-check hccl-demo --logs-dir $LOG_DIR --job-id $JOB_ID --target-nodes $TARGET_NODES --round $ROUND; chmod 777 -R $HOME_DIR/$LOG_DIR diff --git a/utils/intel_gaudi_health_screen/template/k8s/intel-gaudi-health-screen-L1.yaml b/utils/intel_gaudi_health_screen/template/k8s/intel-gaudi-health-screen-L1.yaml index b4a17c1..2dab422 100644 --- a/utils/intel_gaudi_health_screen/template/k8s/intel-gaudi-health-screen-L1.yaml +++ b/utils/intel_gaudi_health_screen/template/k8s/intel-gaudi-health-screen-L1.yaml @@ -46,6 +46,10 @@ spec: volumeMounts: - name: mydir mountPath: /workdir + securityContext: + capabilities: + add: + - SYSLOG env: - name: IGHS_LEVEL value: "1" diff --git a/utils/intel_gaudi_health_screen/template/k8s/intel-gaudi-health-screen-L2_hccl-demo.yaml b/utils/intel_gaudi_health_screen/template/k8s/intel-gaudi-health-screen-L2_hccl-demo.yaml index 0a6b83f..6319743 100644 --- a/utils/intel_gaudi_health_screen/template/k8s/intel-gaudi-health-screen-L2_hccl-demo.yaml +++ b/utils/intel_gaudi_health_screen/template/k8s/intel-gaudi-health-screen-L2_hccl-demo.yaml @@ -28,6 +28,10 @@ spec: volumeMounts: - name: mydir mountPath: /workdir + securityContext: + capabilities: + add: + - SYSLOG env: - name: JOB_ID valueFrom: @@ -116,6 +120,10 @@ spec: - image: template-container-image name: ighs-worker imagePullPolicy: IfNotPresent + securityContext: + capabilities: + add: + - SYSLOG resources: limits: habana.ai/gaudi: 8 diff --git a/utils/intel_gaudi_health_screen/utilities.py b/utils/intel_gaudi_health_screen/utilities.py index a782d14..47f5458 100644 --- a/utils/intel_gaudi_health_screen/utilities.py +++ b/utils/intel_gaudi_health_screen/utilities.py @@ -57,7 +57,7 @@ def create_logger(logger_name, logger_file_name, f_path="", level=logging.INFO, file_path = f"{f_path}/{logger_file_name}.log" if f_path != "" else f"logs/{date_format}/{date_format}_{time_format}/{logger_file_name}.log" d_path = os.path.dirname(file_path) - print(f"d_path: {d_path} file_path: {file_path}") + _logger.debug(f"d_path: {d_path} file_path: {file_path}") if(not os.path.exists(d_path)): os.makedirs(d_path) diff --git a/utils/intel_gaudi_health_screen/version.txt b/utils/intel_gaudi_health_screen/version.txt index 7c32728..e3a4f19 100644 --- a/utils/intel_gaudi_health_screen/version.txt +++ b/utils/intel_gaudi_health_screen/version.txt @@ -1 +1 @@ -2.1.1 \ No newline at end of file +2.2.0 \ No newline at end of file