Skip to content

Commit

Permalink
When measuring CI-pipeline duration with GitLab as source, allow for …
Browse files Browse the repository at this point in the history
…filtering by pipeline description.

Closes #10426.
  • Loading branch information
fniessink committed Feb 25, 2025
1 parent e3b9a80 commit 9594a05
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 114 deletions.
141 changes: 94 additions & 47 deletions components/collector/src/source_collectors/gitlab/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
from abc import ABC
from dataclasses import dataclass, fields
from datetime import datetime, timedelta
from typing import cast
from typing import Self, cast

from dateutil.tz import tzutc

from shared.utils.date_time import now

from base_collectors import SourceCollector
from collector_utilities.date_time import parse_datetime
from collector_utilities.exceptions import CollectorError
from collector_utilities.date_time import minutes, parse_datetime
from collector_utilities.functions import add_query, match_string_or_regular_expression
from collector_utilities.type import URL, Job
from model import Entities, Entity, SourceResponses
Expand Down Expand Up @@ -139,53 +138,58 @@ def _lookback_datetime(self) -> datetime:


@dataclass
class Pipeline:
class GitLabJSON:
"""Base class for GitLab response JSON."""

@classmethod
def from_json(cls, **kwargs) -> Self:
"""Override to ignore unknown fields so the caller does not need to weed the GitLab JSON."""
field_names = [field.name for field in fields(cls)]
return cls(**{key: value for key, value in kwargs.items() if key in field_names})


@dataclass
class Pipeline(GitLabJSON):
"""GitLab pipeline JSON. See https://docs.gitlab.com/ee/api/pipelines.html."""

id: int
project_id: int
name: str
ref: str
status: str
source: str
created_at: str
updated_at: str
web_url: str
started_at: str | None = None
finished_at: str | None = None
duration: int | None = None # Seconds

def __init__(self, **kwargs) -> None:
"""Override to ignore unknown fields so the caller does not need to weed the GitLab pipeline JSON."""
field_names = [field.name for field in fields(self)]
for key, value in kwargs.items():
if key in field_names:
setattr(self, key, value)
updated_at: str = ""
schedule_description: str = "" # Pipeline schedule description for scheduled pipelines

@property
def start(self) -> datetime:
"""Return the pipeline start time.
The started_at field may be empty, be prepared. See https://gitlab.com/gitlab-org/gitlab/-/issues/210353.
"""
return parse_datetime(self.started_at or self.created_at)
"""Return the pipeline start time."""
return parse_datetime(self.created_at)

@property
def end(self) -> datetime:
"""Return the pipeline end time.
The finished_at field may empty if the pipeline hasn't finished yet. Use the current time as fallback.
"""
return parse_datetime(self.finished_at) if self.finished_at else datetime.now(tz=tzutc())
"""Return the pipeline end time."""
return parse_datetime(self.updated_at) if self.updated_at else datetime.now(tz=tzutc())

@property
def datetime(self) -> datetime:
"""Return the datetime of the pipeline."""
return parse_datetime(self.updated_at or self.created_at)

@property
def pipeline_duration(self) -> timedelta:
def duration(self) -> timedelta:
"""Return the duration of the pipeline."""
return self.end - self.start if self.duration is None else timedelta(seconds=self.duration)
return self.end - self.start


@dataclass
class PipelineSchedule(GitLabJSON):
"""Dataclass for GitLab pipeline schedule."""

id: int
description: str


class GitLabPipelineBase(GitLabProjectBase):
Expand All @@ -201,26 +205,69 @@ async def _landing_url(self, responses: SourceResponses) -> URL:
pipelines = await self._pipelines(responses)
return URL(pipelines[0].web_url)

async def _get_source_responses(self, *urls: URL) -> SourceResponses:
"""Extend to get pipeline schedule descriptions."""
# Get the pipeline schedule descriptions so the user can filter scheduled pipelines by schedule description
# The pipeline schedule descriptions is a mapping from pipeline ids to schedule descriptions
self.pipeline_schedule_descriptions = await self._scheduled_pipeline_descriptions()
return await super()._get_source_responses(*urls)

async def _parse_entities(self, responses: SourceResponses) -> Entities:
"""Parse the entities from the responses."""
return Entities(
[
Entity(
key=str(pipeline.id),
name=pipeline.name,
ref=pipeline.ref,
status=pipeline.status,
trigger=pipeline.source,
schedule=pipeline.schedule_description,
created=pipeline.created_at,
updated=pipeline.updated_at,
duration=str(minutes(pipeline.duration)),
)
for pipeline in await self._pipelines(responses)
],
)

def _include_entity(self, entity: Entity) -> bool:
"""Return whether this entity should be considered."""
branches = self._parameter("branches")
matches_branches = match_string_or_regular_expression(entity["ref"], branches) if branches else True
schedule_descriptions = self._parameter("pipeline_schedules_to_include")
matches_schedule_description = (
match_string_or_regular_expression(entity["schedule"], schedule_descriptions)
if schedule_descriptions
else True
)
matches_status = entity["status"] in self._parameter("pipeline_statuses_to_include")
matches_trigger = entity["trigger"] in self._parameter("pipeline_triggers_to_include")
return matches_branches and matches_schedule_description and matches_status and matches_trigger

async def _pipelines(self, responses: SourceResponses) -> list[Pipeline]:
"""Get the pipelines from the responses."""
pipelines = []
try:
for response in responses:
json = await response.json()
# The GitLab pipelines endpoint returns a list or, if a pipeline id is passed, one pipeline. Harmonize:
json_list = json if isinstance(json, list) else [json]
pipelines.extend([Pipeline(**pipeline) for pipeline in json_list])
except StopAsyncIteration:
pass
if pipelines := [pipeline for pipeline in pipelines if self._include_pipeline(pipeline)]:
return pipelines
error_message = "No pipelines found within the lookback period"
raise CollectorError(error_message)

def _include_pipeline(self, pipeline: Pipeline) -> bool:
"""Return whether this pipeline should be considered."""
branches = self._parameter("branches")
matches_branches = match_string_or_regular_expression(pipeline.ref, branches) if branches else True
matches_status = pipeline.status in self._parameter("pipeline_statuses_to_include")
matches_source = pipeline.source in self._parameter("pipeline_triggers_to_include")
return matches_branches and matches_status and matches_source
for response in responses:
for pipeline_json in await response.json():
schedule_description = self.pipeline_schedule_descriptions.get(pipeline_json["id"], "")
pipeline_json["schedule_description"] = schedule_description
pipelines.append(Pipeline.from_json(**pipeline_json))
return pipelines

async def _scheduled_pipeline_descriptions(self) -> dict[int, str]:
"""Get the scheduled pipelines. Returns a mapping of scheduled pipeline IDs to their descriptions."""
scheduled_pipeline_descriptions = {}
for schedule in await self._pipeline_schedules():
scheduled_pipelines_api = await self._gitlab_api_url(f"pipeline_schedules/{schedule.id}/pipelines")
for response in await super()._get_source_responses(scheduled_pipelines_api):
for scheduled_pipeline in await response.json():
scheduled_pipeline_descriptions[scheduled_pipeline["id"]] = schedule.description
return scheduled_pipeline_descriptions

async def _pipeline_schedules(self) -> list[PipelineSchedule]:
"""Get the pipeline schedules."""
pipeline_schedules = []
for response in await super()._get_source_responses(await self._gitlab_api_url("pipeline_schedules")):
pipeline_schedules.extend([PipelineSchedule.from_json(**schedule) for schedule in await response.json()])
return pipeline_schedules
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
"""GitLab CI-pipeline duration collector."""

from datetime import timedelta

from collector_utilities.date_time import minutes
from collector_utilities.type import URL, Value
from collector_utilities.exceptions import CollectorError
from collector_utilities.type import Value
from model import Entities, SourceResponses

from .base import GitLabPipelineBase
Expand All @@ -12,14 +10,9 @@
class GitLabPipelineDuration(GitLabPipelineBase):
"""GitLab CI-pipeline duration collector."""

async def _get_source_responses(self, *urls: URL) -> SourceResponses:
"""Override to get the pipeline details."""
responses = await super()._get_source_responses(*urls)
pipelines = await self._pipelines(responses)
api_url = await self._gitlab_api_url(f"pipelines/{pipelines[0].id}")
return await super()._get_source_responses(api_url)

async def _parse_value(self, responses: SourceResponses, included_entities: Entities) -> Value:
"""Parse the value from the responses."""
durations = [pipeline.pipeline_duration for pipeline in await self._pipelines(responses)]
return str(minutes(max(durations, default=timedelta())))
if durations := [int(entity["duration"]) for entity in included_entities]:
return str(max(durations))
error_message = "No pipelines found within the lookback period"
raise CollectorError(error_message)
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from base_collectors import SourceCollector, TimePassedCollector
from collector_utilities.date_time import days_ago, parse_datetime
from collector_utilities.exceptions import CollectorError
from collector_utilities.type import URL, Response, Value
from model import Entities, SourceMeasurement, SourceResponses

Expand Down Expand Up @@ -81,8 +82,10 @@ class GitLabPipelineUpToDateness(TimePassedCollector, GitLabPipelineBase):
async def _parse_source_response_date_time(self, response: Response) -> datetime:
"""Override to get the date and time of the pipeline."""
pipelines = await self._pipelines(SourceResponses(responses=[response]))
datetimes = [pipeline.datetime for pipeline in pipelines]
return self.minimum(datetimes)
if datetimes := [pipeline.datetime for pipeline in pipelines]:
return self.minimum(datetimes)
error_message = "No pipelines found within the lookback period"
raise CollectorError(error_message)

@staticmethod
def minimum(date_times: Sequence[datetime]) -> datetime:
Expand Down
14 changes: 14 additions & 0 deletions components/collector/tests/source_collectors/gitlab/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,26 @@
"""GitLab unit test base classes."""

from datetime import datetime
from typing import Final

from dateutil.tz import tzutc

from tests.source_collectors.source_collector_test_case import SourceCollectorTestCase


class FakeResponse:
"""Fake GitLab response."""

links: Final[dict] = {}

def __init__(self, fake_json) -> None:
self.fake_json = fake_json

async def json(self):
"""Return the fake JSON."""
return self.fake_json


class GitLabTestCase(SourceCollectorTestCase):
"""Base class for testing GitLab collectors."""

Expand Down
Original file line number Diff line number Diff line change
@@ -1,101 +1,72 @@
"""Unit tests for the GitLab CI-pipeline duration collector."""

from datetime import datetime
from typing import Final
from unittest.mock import Mock, patch

from dateutil.tz import tzutc

from .base import GitLabTestCase


class FakeResponse:
"""Fake GitLab response."""

links: Final[dict] = {}

def __init__(self, fake_json) -> None:
self.fake_json = fake_json

async def json(self):
"""Return the fake JSON."""
return self.fake_json
from .base import FakeResponse, GitLabTestCase


class GitLabPipelineDurationTest(GitLabTestCase):
"""Unit tests for the CI-pipeline duration metric."""

METRIC_TYPE = "pipeline_duration"
METRIC_ADDITION = "min"
NOW = datetime(2022, 9, 22, 1, 30, 14, 197, tzinfo=tzutc())
NOW = datetime(2022, 9, 21, 1, 30, 14, 197, tzinfo=tzutc())
MOCK_DATETIME = Mock(now=Mock(return_value=NOW))

def setUp(self) -> None:
"""Extend to set up fixtures."""
super().setUp()
self.landing_url = "https://gitlab/project/-/pipelines/1"
self.pipeline_schedules_json = [{"id": "pipeline schedule id", "description": "pipeline description"}]
self.scheduled_pipelines_json = [{"id": "pipeline id"}]
self.pipeline_json = [
{
"id": "pipeline id",
"iid": "iid",
"project_id": "project id",
"name": "Pipeline name",
"created_at": "2022-09-21T01:05:14.197Z",
"updated_at": "2022-09-21T01:05:50.175Z",
"updated_at": "2022-09-21T01:15:14.175Z",
"ref": "branch",
"status": "success",
"source": "push",
"web_url": self.landing_url,
},
]
self.pipeline_detail_json: dict[str, str | int] = {
"id": "pipeline id",
"iid": "iid",
"project_id": "project id",
"created_at": "2022-09-21T01:05:14.197Z",
"updated_at": "2022-09-21T01:05:50.175Z",
"ref": "branch",
"status": "success",
"source": "push",
"web_url": self.landing_url,
}

async def collect(self):
"""Override to pass the GitLab pipeline JSON responses."""
return await super().collect(
get_request_side_effect=[
FakeResponse(self.pipeline_schedules_json), # To fetch all pipeline schedules
FakeResponse(self.scheduled_pipelines_json), # To fetch all pipelines for the pipeline schedule
FakeResponse(self.pipeline_json), # To fetch all pipelines
FakeResponse(self.pipeline_detail_json), # To get the pipeline details for the most recent pipeline
]
)

async def test_duration(self):
"""Test that the duration is returned."""
self.pipeline_detail_json["duration"] = 600
response = await self.collect()
self.assert_measurement(response, value="10", landing_url=self.landing_url)

async def test_duration_without_duration_field(self):
"""Test that start and finish timestamps are used when the pipeline has no duration."""
self.pipeline_detail_json["started_at"] = "2022-09-21T01:15:14.197Z"
self.pipeline_detail_json["finished_at"] = "2022-09-21T01:20:14.197Z"
response = await self.collect()
self.assert_measurement(response, value="5", landing_url=self.landing_url)

@patch("source_collectors.gitlab.base.datetime", MOCK_DATETIME)
async def test_duration_without_duration_and_finished(self):
"""Test that start and now are used when the pipeline has no duration and no finish datetime."""
self.pipeline_detail_json["started_at"] = "2022-09-22T01:15:14.197Z"
response = await self.collect()
self.assert_measurement(response, value="15", landing_url=self.landing_url)

async def test_duration_without_duration_and_started(self):
"""Test that the created datetime is used if the started datetime is missing, but finished is not."""
self.pipeline_detail_json["finished_at"] = "2022-09-21T01:08:14.197Z"
async def test_duration_without_updated(self):
"""Test that start and now are used when the pipeline has no updated datetime."""
del self.pipeline_json[0]["updated_at"]
response = await self.collect()
self.assert_measurement(response, value="3", landing_url=self.landing_url)
self.assert_measurement(response, value="25", landing_url=self.landing_url)

async def test_duration_when_no_match(self):
"""Test that an error is returned when no pipelines match."""
self.set_source_parameter("branches", "missing")
self.set_source_parameter("branches", ["missing"])
response = await self.collect()
self.assert_measurement(response, connection_error="No pipelines found within the lookback period")
self.assert_measurement(response, parse_error="No pipelines found within the lookback period")

async def test_filter_by_pipeline_description(self):
"""Test that pipelines can be filtered by pipeline description."""
self.set_source_parameter("pipeline_schedules_to_include", ["pipeline description"])
response = await self.collect()
self.assert_measurement(response, value="10", landing_url=self.landing_url)
Loading

0 comments on commit 9594a05

Please sign in to comment.