Skip to content

Commit

Permalink
A few improvements to bazel ci scripts (#2076)
Browse files Browse the repository at this point in the history
`aggregate_incompatible_flags_test_result.py`:

- Stripped out timestamps added by BuildKite in job log.
- Added support for collecting incompatible flag test result for
https://buildkite.com/bazel/bcr-bazel-compatibility-test

`bazelci.py`:
- Added support for overriding Bazel version in task config before task
expansion.
- Support `concurrency` and `concurrency_group` to limit CI resource
usage.
- Avoid hitting 429 Too Many Requests error while fetching a large
number of buildkite job logs.
  • Loading branch information
meteorcloudy authored Oct 23, 2024
1 parent 7fe019f commit 2d33fc6
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 57 deletions.
124 changes: 85 additions & 39 deletions buildkite/aggregate_incompatible_flags_test_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,26 @@

FLAG_LINE_PATTERN = re.compile(r"\s*(?P<flag>--\S+)\s*")

MODULE_VERSION_PATTERN = re.compile(r'(?P<module_version>[a-z](?:[a-z0-9._-]*[a-z0-9])?@[^\s]+)')

BAZEL_TEAM_OWNED_MODULES = frozenset([
"bazel-skylib",
"rules_android",
"rules_android_ndk",
"rules_cc",
"rules_java",
"rules_license",
"rules_pkg",
"rules_platform",
"rules_shell",
"rules_testing",
])

PROJECT = "module" if PIPELINE == "bcr-bazel-compatibility-test" else "project"

MAX_LOG_FETCHER_THREADS = 30
LOG_FETCHER_SEMAPHORE = threading.Semaphore(MAX_LOG_FETCHER_THREADS)

class LogFetcher(threading.Thread):
def __init__(self, job, client):
threading.Thread.__init__(self)
Expand All @@ -39,7 +59,8 @@ def __init__(self, job, client):
self.log = None

def run(self):
self.log = self.client.get_build_log(self.job)
with LOG_FETCHER_SEMAPHORE:
self.log = self.client.get_build_log(self.job)


def process_build_log(failed_jobs_per_flag, already_failing_jobs, log, job):
Expand All @@ -59,6 +80,10 @@ def handle_failing_flags(line):
if index_success == -1 or index_failure == -1:
raise bazelci.BuildkiteException("Cannot recognize log of " + job["web_url"])
for line in log[index_failure:].split("\n"):
# Strip out BuildKite timestamp prefix
line = re.sub(r'\x1b.*?\x07', '', line.strip())
if not line:
break
handle_failing_flags(line)
log = log[0 : log.rfind("+++ Result")]

Expand All @@ -67,6 +92,12 @@ def handle_failing_flags(line):
already_failing_jobs.append(job)


def extract_module_version(line):
match = MODULE_VERSION_PATTERN.search(line)
if match:
return match.group("module_version")


def extract_flag(line):
match = FLAG_LINE_PATTERN.match(line)
if match:
Expand All @@ -77,19 +108,28 @@ def get_html_link_text(content, link):
return f'<a href="{link}" target="_blank">{content}</a>'


def is_project_owned_by_bazel_team(project):
if bazelci.is_downstream_pipeline() and project in bazelci.DOWNSTREAM_PROJECTS and bazelci.DOWNSTREAM_PROJECTS[project].get(
"owned_by_bazel"
):
# Check the downstream projects definition.
return True
elif project.split("@")[0] in BAZEL_TEAM_OWNED_MODULES:
# Parse the module name and check if it's bazel team owned.
return True
return False

# Check if any of the given jobs needs to be migrated by the Bazel team
def needs_bazel_team_migrate(jobs):
for job in jobs:
pipeline, _ = get_pipeline_and_platform(job)
if pipeline in bazelci.DOWNSTREAM_PROJECTS and bazelci.DOWNSTREAM_PROJECTS[pipeline].get(
"owned_by_bazel"
):
project = get_project_name(job)
if is_project_owned_by_bazel_team(project):
return True
return False


def print_flags_ready_to_flip(failed_jobs_per_flag, incompatible_flags):
info_text1 = ["#### The following flags didn't break any passing projects"]
info_text1 = [f"#### The following flags didn't break any passing {PROJECT}s"]
for flag in sorted(list(incompatible_flags.keys())):
if flag not in failed_jobs_per_flag:
html_link_text = get_html_link_text(":github:", incompatible_flags[flag])
Expand All @@ -99,7 +139,7 @@ def print_flags_ready_to_flip(failed_jobs_per_flag, incompatible_flags):
info_text1 = []

info_text2 = [
"#### The following flags didn't break any passing Bazel team owned/co-owned projects"
f"#### The following flags didn't break any passing Bazel team owned/co-owned {PROJECT}s"
]
for flag, jobs in failed_jobs_per_flag.items():
if flag not in incompatible_flags:
Expand Down Expand Up @@ -128,7 +168,7 @@ def print_already_fail_jobs(already_failing_jobs):


def print_projects_need_to_migrate(failed_jobs_per_flag):
info_text = ["#### The following projects need migration"]
info_text = [f"#### The following {PROJECT}s need migration"]
jobs_need_migration = {}
for jobs in failed_jobs_per_flag.values():
for job in jobs.values():
Expand All @@ -141,14 +181,14 @@ def print_projects_need_to_migrate(failed_jobs_per_flag):

projects = set()
for job in job_list:
project, _ = get_pipeline_and_platform(job)
project = get_project_name(job)
projects.add(project)
project_num = len(projects)

s1 = "" if project_num == 1 else "s"
s2 = "s" if project_num == 1 else ""
info_text.append(
f"<details><summary>{project_num} project{s1} need{s2} migration, click to see details</summary><ul>"
f"<details><summary>{project_num} {PROJECT}{s1} need{s2} migration, click to see details</summary><ul>"
)

entries = merge_and_format_jobs(job_list, " <li><strong>{}</strong>: {}</li>")
Expand Down Expand Up @@ -179,62 +219,68 @@ def print_flags_need_to_migrate(failed_jobs_per_flag, incompatible_flags):
if jobs:
github_url = incompatible_flags[flag]
info_text = [f"* **{flag}** " + get_html_link_text(":github:", github_url)]
jobs_per_pipeline = merge_jobs(jobs.values())
for pipeline, platforms in jobs_per_pipeline.items():
jobs_per_project = merge_jobs(jobs.values())
for project, platforms in jobs_per_project.items():
bazel_mark = ""
if pipeline in bazelci.DOWNSTREAM_PROJECTS and bazelci.DOWNSTREAM_PROJECTS[
pipeline
].get("owned_by_bazel"):
if is_project_owned_by_bazel_team(project):
bazel_mark = ":bazel:"
platforms_text = ", ".join(platforms)
info_text.append(f" - {bazel_mark}**{pipeline}**: {platforms_text}")
info_text.append(f" - {bazel_mark}**{project}**: {platforms_text}")
# Use flag as the context so that each flag gets a different info box.
print_info(flag, "error", info_text)
printed_flag_boxes = True
if not printed_flag_boxes:
return
info_text = [
"#### Downstream projects need to migrate for the following flags:",
"#### Projects need to migrate for the following flags:",
"Projects marked with :bazel: need to be migrated by the Bazel team.",
]
print_info("flags_need_to_migrate", "error", info_text)


def merge_jobs(jobs):
jobs_per_pipeline = collections.defaultdict(list)
jobs_per_project = collections.defaultdict(list)
for job in sorted(jobs, key=lambda s: s["name"].lower()):
pipeline, platform = get_pipeline_and_platform(job)
jobs_per_pipeline[pipeline].append(get_html_link_text(platform, job["web_url"]))
return jobs_per_pipeline
project = get_project_name(job)
platform_label = get_platform_emoji_name(job)
jobs_per_project[project].append(get_html_link_text(platform_label, job["web_url"]))
return jobs_per_project


def merge_and_format_jobs(jobs, line_pattern):
# Merges all jobs for a single pipeline into one line.
# Merges all jobs for a single project into one line.
# Example:
# pipeline (platform1)
# pipeline (platform2)
# pipeline (platform3)
# project (platform1)
# project (platform2)
# project (platform3)
# with line_pattern ">> {}: {}" becomes
# >> pipeline: platform1, platform2, platform3
jobs_per_pipeline = merge_jobs(jobs)
# >> project: platform1, platform2, platform3
jobs_per_project = merge_jobs(jobs)
return [
line_pattern.format(pipeline, ", ".join(platforms))
for pipeline, platforms in jobs_per_pipeline.items()
line_pattern.format(project, ", ".join(platforms))
for project, platforms in jobs_per_project.items()
]


def get_pipeline_and_platform(job):
def get_project_name(job):
# Strip out platform label from job name
name = job["name"].replace(get_platform_emoji_name(job), "")
if bazelci.is_downstream_pipeline():
# This is for downstream pipeline, parse the pipeline name
return name.partition("-")[0].partition("(")[0].strip()
else:
# This is for BCR compatibility test pipeline, parse the module name + version
return extract_module_version(name)


def get_platform_emoji_name(job):
# By search for the platform label in the job name.
name = job["name"]
platform = ""
for p in bazelci.PLATFORMS.values():
platform_label = p.get("emoji-name")
if platform_label in name:
platform = platform_label
name = name.replace(platform_label, "")
break

name = name.partition("-")[0].partition("(")[0].strip()
return name, platform
return platform_label
raise bazelci.BuildkiteException("Cannot detect platform name for: " + job["web_url"])


def print_info(context, style, info):
Expand Down Expand Up @@ -264,8 +310,8 @@ def analyze_logs(build_number, client):

threads = []
for job in build_info["jobs"]:
# Some irrelevant job has no "state" field
if "state" in job:
# Some irrelevant job has no "state" or "raw_log_url" field
if "state" in job and "raw_log_url" in job:
thread = LogFetcher(job, client)
threads.append(thread)
thread.start()
Expand Down
70 changes: 52 additions & 18 deletions buildkite/bazelci.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,16 +699,28 @@ def _get_buildkite_token(self):
project=("bazel-public" if THIS_IS_TRUSTED else "bazel-untrusted"),
)

def _open_url(self, url, params=[]):
try:
params_str = "".join("&{}={}".format(k, v) for k, v in params)
return (
urllib.request.urlopen("{}?access_token={}{}".format(url, self._token, params_str))
.read()
.decode("utf-8", "ignore")
)
except urllib.error.HTTPError as ex:
raise BuildkiteException("Failed to open {}: {} - {}".format(url, ex.code, ex.reason))
def _open_url(self, url, params=[], retries=5):
params_str = "".join("&{}={}".format(k, v) for k, v in params)
full_url = "{}?access_token={}{}".format(url, self._token, params_str)

for attempt in range(retries):
try:
response = urllib.request.urlopen(full_url)
return response.read().decode("utf-8", "ignore")
except urllib.error.HTTPError as ex:
# Handle specific error codes
if ex.code == 429: # Too Many Requests
retry_after = ex.headers.get("RateLimit-Reset")
if retry_after:
wait_time = int(retry_after)
else:
wait_time = (2 ** attempt) # Exponential backoff if no RateLimit-Reset header

time.sleep(wait_time)
else:
raise BuildkiteException("Failed to open {}: {} - {}".format(url, ex.code, ex.reason))

raise BuildkiteException(f"Failed to open {url} after {retries} retries.")

def get_pipeline_info(self):
"""Get details for a pipeline given its organization slug
Expand Down Expand Up @@ -984,7 +996,7 @@ def get_expanded_task(task, combination):
return expanded_task


def fetch_configs(http_url, file_config):
def fetch_configs(http_url, file_config, bazel_version=None):
"""
If specified fetches the build configuration from file_config or http_url, else tries to
read it from .bazelci/presubmit.yml.
Expand All @@ -993,7 +1005,7 @@ def fetch_configs(http_url, file_config):
if file_config is not None and http_url is not None:
raise BuildkiteException("file_config and http_url cannot be set at the same time")

return load_config(http_url, file_config)
return load_config(http_url, file_config, bazel_version=bazel_version)


def expand_task_config(config):
Expand Down Expand Up @@ -1023,7 +1035,15 @@ def expand_task_config(config):
config["tasks"].update(expanded_tasks)


def load_config(http_url, file_config, allow_imports=True):
def maybe_overwrite_bazel_version(bazel_version, config):
if not bazel_version:
return
for task in config.get("tasks", {}):
config["tasks"][task]["old_bazel"] = config["tasks"][task].get("bazel")
config["tasks"][task]["bazel"] = bazel_version


def load_config(http_url, file_config, allow_imports=True, bazel_version=None):
if http_url:
config = load_remote_yaml_file(http_url)
else:
Expand All @@ -1041,6 +1061,7 @@ def load_config(http_url, file_config, allow_imports=True):
if "tasks" not in config:
config["tasks"] = {}

maybe_overwrite_bazel_version(bazel_version, config)
expand_task_config(config)

imports = config.pop("imports", None)
Expand All @@ -1049,7 +1070,7 @@ def load_config(http_url, file_config, allow_imports=True):
raise BuildkiteException("Nested imports are not allowed")

for i in imports:
imported_tasks = load_imported_tasks(i, http_url, file_config)
imported_tasks = load_imported_tasks(i, http_url, file_config, bazel_version)
config["tasks"].update(imported_tasks)

if len(config["tasks"]) > MAX_TASK_NUMBER:
Expand All @@ -1066,7 +1087,7 @@ def load_remote_yaml_file(http_url):
return yaml.safe_load(reader(resp))


def load_imported_tasks(import_name, http_url, file_config):
def load_imported_tasks(import_name, http_url, file_config, bazel_version):
if "/" in import_name:
raise BuildkiteException("Invalid import '%s'" % import_name)

Expand All @@ -1077,7 +1098,7 @@ def load_imported_tasks(import_name, http_url, file_config):
else:
file_config = new_path

imported_config = load_config(http_url=http_url, file_config=file_config, allow_imports=False)
imported_config = load_config(http_url=http_url, file_config=file_config, allow_imports=False, bazel_version=bazel_version)

namespace = import_name.partition(".")[0]
tasks = {}
Expand Down Expand Up @@ -2777,7 +2798,7 @@ def terminate_background_process(process):
process.kill()


def create_step(label, commands, platform, shards=1, soft_fail=None):
def create_step(label, commands, platform, shards=1, soft_fail=None, concurrency=None, concurrency_group=None):
if "docker-image" in PLATFORMS[platform]:
step = create_docker_step(
label,
Expand Down Expand Up @@ -2823,6 +2844,10 @@ def create_step(label, commands, platform, shards=1, soft_fail=None):
step["retry"]["automatic"].append({"exit_status": 128, "limit": 1})
step["retry"]["automatic"].append({"exit_status": 1, "limit": 1})

if concurrency and concurrency_group:
step["concurrency"] = concurrency
step["concurrency_group"] = concurrency_group

return step


Expand Down Expand Up @@ -4455,6 +4480,7 @@ def main(argv=None):
runner.add_argument("--task", action="store", type=str, default="")
runner.add_argument("--file_config", type=str)
runner.add_argument("--http_config", type=str)
runner.add_argument("--overwrite_bazel_version", type=str, help="Overwrite the bazel version in the config file.")
runner.add_argument("--git_repository", type=str)
runner.add_argument(
"--git_commit", type=str, help="Reset the git repository to this commit after cloning it"
Expand Down Expand Up @@ -4533,7 +4559,9 @@ def main(argv=None):
elif args.git_repository:
clone_git_repository(args.git_repository, args.git_commit)

configs = fetch_configs(args.http_config, args.file_config)
# Maybe overwrite the bazel version for each task, we have to do it before the config expansion.
bazel_version = args.overwrite_bazel_version
configs = fetch_configs(args.http_config, args.file_config, bazel_version)
tasks = configs.get("tasks", {})
task_config = tasks.get(args.task)
if not task_config:
Expand All @@ -4553,6 +4581,12 @@ def main(argv=None):
if "BUILDKITE_MESSAGE" in os.environ:
os.environ["BUILDKITE_MESSAGE"] = os.environ["BUILDKITE_MESSAGE"][:1000]

# Give user a warning that the bazel version in the config file has been overridden.
old_bazel = task_config.get("old_bazel")
if old_bazel:
new_bazel = task_config.get("bazel")
print_collapsed_group(f":bazel: Bazel version overridden from {old_bazel} to {new_bazel}")

execute_commands(
task_config=task_config,
platform=platform,
Expand Down

0 comments on commit 2d33fc6

Please sign in to comment.