Skip to content

Commit

Permalink
Monitor queued jobs (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
gerardsegarra authored May 14, 2024
1 parent 36750c3 commit ac99cf0
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 77 deletions.
28 changes: 26 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,33 @@ env:
REGISTRY: ghcr.io

jobs:
test:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.11"]

steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r tests/requirements.txt
- name: Run pre-commit
uses: pre-commit/[email protected]
- name: Test with pytest
run: |
pytest --cov=src tests/
build:
runs-on: ubuntu-latest
name: Build
needs: test
permissions:
contents: read
packages: write
Expand All @@ -36,7 +60,7 @@ jobs:
with:
images: ${{ env.REGISTRY }}/${{ github.repository_owner }}/github-workflows-monitoring
flavor: |
latest=true
latest=${{ github.event_name != 'pull_request' }}
tags: |
type=ref,event=branch
type=ref,event=pr
Expand All @@ -45,7 +69,7 @@ jobs:
- name: Docker build and push
uses: docker/build-push-action@v4
with:
push: ${{ github.event_name != 'pull_request' }}
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
context: .
32 changes: 0 additions & 32 deletions .github/workflows/tests.yaml

This file was deleted.

1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ install_requires =
Flask>=2.2,<3
Flask-APScheduler==1.13.1
datadog==0.49.1
gql[all]==3.5.0

[flake8]
max-line-length = 120
Expand Down
33 changes: 32 additions & 1 deletion src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from const import GithubHeaders, LOGGING_CONFIG
from github import GithubJob
from jobs import JobEventsHandler
from utils import dict_to_logfmt
from utils import dict_to_logfmt, parse_datetime
from query_graphql import query_jobs

dictConfig(LOGGING_CONFIG)

Expand Down Expand Up @@ -125,6 +126,36 @@ def process_workflow_job():
return True


@scheduler.task("interval", id="monitor_jobs", seconds=15)
def monitor_jobs():
with scheduler.app.app_context():
queued_nodes = [job.node_id for job in job_handler.queued.values()]
jobs_data = query_jobs(queued_nodes)

app.logger.info(f"Processing data for jobs {job_handler.queued.keys()}")

for job_data in jobs_data["nodes"]:
job = job_handler.queued.get(job_data["id"])
if job_data["status"] != "QUEUED":
job = job_handler.queued.pop(job_data["id"], None)
app.logger.info(
f"Job {job_data['id']} is no longer queued {job_data['status']}"
)
if job:
job.status = job_data["status"].lower()
job.in_progress_at = parse_datetime(job_data["startedAt"])
job.completed_at = parse_datetime(job_data["completedAt"])
job.final_queued_time_updated = True
if job:
app.logger.info(
f"Sending metric for {job_data['id']} with status {job_data['status']},"
f"duration {job.seconds_in_queue}"
)
job.send_queued_metric()
else:
app.logger.info(f"No job for {job_data['id']}")


@scheduler.task("interval", id="monitor_queued", seconds=30)
def monitor_queued_jobs():
"""Return the job that has been queued and not starting for long time."""
Expand Down
8 changes: 8 additions & 0 deletions src/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,13 @@ def runner_group_name(self):
def runner_public(self):
return self.runner_group_name == "GitHub Actions"

@property
def runner_buildjet(self):
return any(item.startswith("buildjet") for item in self.labels)

@property
def labels(self):
return self.data["workflow_job"]["labels"]

def __str__(self):
return f"<{self.id}@{self.name}>"
34 changes: 22 additions & 12 deletions src/jobs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Dict
import metrics

from datetime import datetime
Expand All @@ -16,6 +17,8 @@ def __init__(self, github_job: GithubJob) -> None:
self._update_attributes(github_job)

self.node_id = self.github_job.node_id
self.labels = "-".join(sorted(self.github_job.labels))
self.final_queued_time_updated = False

@property
def seconds_in_queue(self):
Expand All @@ -27,7 +30,7 @@ def seconds_in_queue(self):

def _update_attributes(self, github_job: GithubJob):
self.github_job: GithubJob = github_job
self.status = github_job.action
self.status = self.github_job.action

if self.github_job.action == "queued":
self.queued_at = self.github_job.time_start
Expand All @@ -42,11 +45,22 @@ def _update_attributes(self, github_job: GithubJob):
def update(self, github_job: GithubJob):
self._update_attributes(github_job)

def send_queued_metric(self):
metrics.send_queued_job(
seconds_in_queue=self.seconds_in_queue,
job_name=self.github_job.job_name,
status=self.status,
repository=self.github_job.repository,
runner_group_name=self.github_job.runner_group_name,
public=self.github_job.runner_public,
buildjet=self.github_job.runner_buildjet,
)


class JobEventsHandler:
def __init__(self) -> None:
self.queued = dict()
self.in_progress = dict()
self.queued: Dict[str, Job] = dict()
self.in_progress: Dict[str, Job] = dict()

def process_event(self, event: dict):
status = event["action"]
Expand All @@ -64,7 +78,7 @@ def process_event(self, event: dict):
pass

def _get_event_job_id(self, event: dict):
return event["workflow_job"]["id"]
return event["workflow_job"]["node_id"]

def _create_job(self, githubJob: GithubJob) -> Job:
return Job(github_job=githubJob)
Expand All @@ -81,14 +95,10 @@ def _process_in_progress_event(self, event: dict):
job = self._create_job(GithubJob(event))
else:
job.update(GithubJob(event))
metrics.send_queued_job(
seconds_in_queue=job.seconds_in_queue,
job_name=job.github_job.job_name,
repository=job.github_job.repository,
runner=job.github_job.runner_name,
run_id=job.github_job.run_id,
public=job.github_job.runner_public,
)
# This is a fallover in case the job was not processed during the tracking time.
if not job.final_queued_time_updated:
job.final_queued_time_updated = True
job.send_queued_metric()

self.in_progress[job_id] = job

Expand Down
44 changes: 33 additions & 11 deletions src/metrics.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import logging
import re

from datadog import initialize, statsd
from flask import current_app

options = {
"statsd_host": "datadog-agent.datadog.svc.cluster.local",
Expand All @@ -7,23 +11,41 @@

initialize(**options)

logger = logging.getLogger(__name__)


TAG_INVALID_CHARS_RE = re.compile(r"[^\w\d_\-:/\.]", re.UNICODE)
TAG_INVALID_CHARS_SUBS = "_"


def normalize_tags(tag_list):
return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]


def send_queued_job(
seconds_in_queue: int,
job_name: str,
status: str,
repository: str,
runner: str,
run_id: str,
public: bool,
buildjet: bool,
runner_group_name: str,
):
statsd.histogram(
"midokura.github_runners.jobs.seconds_in_queue.histogram",
tags = [
f"repository:{repository}",
f"job_name:{job_name}",
f"status:{status}",
f"public:{public}",
f"buildjet:{buildjet}",
f"runner_group_name:{runner_group_name}",
]

tags = normalize_tags(tags)

current_app.logger.info(f"Sending {seconds_in_queue} tags {tags}")

statsd.distribution(
"midokura.github_runners.jobs.seconds_in_queue.distribution",
seconds_in_queue,
tags=[
f"job:{job_name}",
f"repository:{repository}",
f"runner_name:{runner}",
f"run_id:run-{run_id}", # "run-" added to group by run-id in DD
f"public:{public}",
],
tags=tags,
)
47 changes: 47 additions & 0 deletions src/query_graphql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import os

from typing import List

from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport

# Select your transport with a defined url endpoint
headers = {"Authorization": f"bearer {os.getenv('GH_PAT')}"}
transport = AIOHTTPTransport(url="https://api.github.com/graphql", headers=headers)

# Create a GraphQL client using the defined transport
client = Client(transport=transport, fetch_schema_from_transport=True)


# Provide a GraphQL query
def query_jobs(node_id_list: List[str]):
query = gql(
"""
query getCheckRuns($node_id_list: [ID!]!) {
nodes(ids: $node_id_list) {
... on CheckRun {
id
name
status
startedAt
completedAt
repository {
owner {
login
}
name
}
checkSuite {
workflowRun {
event
runNumber
}
}
}
}
}
"""
)
params = {"node_id_list": node_id_list}

return client.execute(query, variable_values=params)
Loading

0 comments on commit ac99cf0

Please sign in to comment.