Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When measuring CI-pipeline duration with GitLab as source, allow for … #10833

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 100 additions & 48 deletions components/collector/src/source_collectors/gitlab/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
from abc import ABC
from dataclasses import dataclass, fields
from datetime import datetime, timedelta
from typing import cast
from typing import Self, cast

from dateutil.tz import tzutc

from shared.utils.date_time import now

from base_collectors import SourceCollector
from collector_utilities.date_time import parse_datetime
from collector_utilities.exceptions import CollectorError
from collector_utilities.date_time import minutes, parse_datetime
from collector_utilities.functions import add_query, match_string_or_regular_expression
from collector_utilities.type import URL, Job
from model import Entities, Entity, SourceResponses
Expand Down Expand Up @@ -139,53 +138,63 @@ def _lookback_datetime(self) -> datetime:


@dataclass
class Pipeline:
"""GitLab pipeline JSON. See https://docs.gitlab.com/ee/api/pipelines.html."""
class GitLabJSON:
"""Base class for GitLab response JSON."""

@classmethod
def from_json(cls, **kwargs) -> Self:
"""Override to ignore unknown fields so the caller does not need to weed the GitLab JSON."""
field_names = [field.name for field in fields(cls)]
return cls(**{key: value for key, value in kwargs.items() if key in field_names})


@dataclass
class Pipeline(GitLabJSON):
"""GitLab pipeline JSON. See https://docs.gitlab.com/ee/api/pipelines.html.

We don't use the pipeline details to determine the pipeline duration, but simply subtract the update_at and
created_at timestamps. Reason is that GitLab itself is pretty unclear about what exactly the pipeline duration is.
See https://gitlab.com/gitlab-org/gitlab/-/issues/19594.
"""

id: int
project_id: int
name: str
ref: str
status: str
source: str
created_at: str
updated_at: str
web_url: str
started_at: str | None = None
finished_at: str | None = None
duration: int | None = None # Seconds

def __init__(self, **kwargs) -> None:
"""Override to ignore unknown fields so the caller does not need to weed the GitLab pipeline JSON."""
field_names = [field.name for field in fields(self)]
for key, value in kwargs.items():
if key in field_names:
setattr(self, key, value)
updated_at: str = ""
schedule_description: str = "" # Pipeline schedule description for scheduled pipelines

@property
def start(self) -> datetime:
"""Return the pipeline start time.

The started_at field may be empty, be prepared. See https://gitlab.com/gitlab-org/gitlab/-/issues/210353.
"""
return parse_datetime(self.started_at or self.created_at)
"""Return the pipeline start time."""
return parse_datetime(self.created_at)

@property
def end(self) -> datetime:
"""Return the pipeline end time.

The finished_at field may empty if the pipeline hasn't finished yet. Use the current time as fallback.
"""
return parse_datetime(self.finished_at) if self.finished_at else datetime.now(tz=tzutc())
"""Return the pipeline end time."""
return parse_datetime(self.updated_at) if self.updated_at else datetime.now(tz=tzutc())

@property
def datetime(self) -> datetime:
"""Return the datetime of the pipeline."""
return parse_datetime(self.updated_at or self.created_at)

@property
def pipeline_duration(self) -> timedelta:
def duration(self) -> timedelta:
"""Return the duration of the pipeline."""
return self.end - self.start if self.duration is None else timedelta(seconds=self.duration)
return self.end - self.start


@dataclass
class PipelineSchedule(GitLabJSON):
"""Dataclass for GitLab pipeline schedule."""

id: int
description: str


class GitLabPipelineBase(GitLabProjectBase):
Expand All @@ -201,26 +210,69 @@ async def _landing_url(self, responses: SourceResponses) -> URL:
pipelines = await self._pipelines(responses)
return URL(pipelines[0].web_url)

async def _get_source_responses(self, *urls: URL) -> SourceResponses:
"""Extend to get pipeline schedule descriptions."""
# Get the pipeline schedule descriptions so the user can filter scheduled pipelines by schedule description
# The pipeline schedule descriptions is a mapping from pipeline ids to schedule descriptions
self.pipeline_schedule_descriptions = await self._scheduled_pipeline_descriptions()
return await super()._get_source_responses(*urls)

async def _parse_entities(self, responses: SourceResponses) -> Entities:
"""Parse the entities from the responses."""
return Entities(
[
Entity(
key=str(pipeline.id),
name=pipeline.name,
ref=pipeline.ref,
status=pipeline.status,
trigger=pipeline.source,
schedule=pipeline.schedule_description,
created=pipeline.created_at,
updated=pipeline.updated_at,
duration=str(minutes(pipeline.duration)),
)
for pipeline in await self._pipelines(responses)
],
)

def _include_entity(self, entity: Entity) -> bool:
"""Return whether this entity should be considered."""
branches = self._parameter("branches")
matches_branches = match_string_or_regular_expression(entity["ref"], branches) if branches else True
schedule_descriptions = self._parameter("pipeline_schedules_to_include")
matches_schedule_description = (
match_string_or_regular_expression(entity["schedule"], schedule_descriptions)
if schedule_descriptions
else True
)
matches_status = entity["status"] in self._parameter("pipeline_statuses_to_include")
matches_trigger = entity["trigger"] in self._parameter("pipeline_triggers_to_include")
return matches_branches and matches_schedule_description and matches_status and matches_trigger

async def _pipelines(self, responses: SourceResponses) -> list[Pipeline]:
"""Get the pipelines from the responses."""
pipelines = []
try:
for response in responses:
json = await response.json()
# The GitLab pipelines endpoint returns a list or, if a pipeline id is passed, one pipeline. Harmonize:
json_list = json if isinstance(json, list) else [json]
pipelines.extend([Pipeline(**pipeline) for pipeline in json_list])
except StopAsyncIteration:
pass
if pipelines := [pipeline for pipeline in pipelines if self._include_pipeline(pipeline)]:
return pipelines
error_message = "No pipelines found within the lookback period"
raise CollectorError(error_message)

def _include_pipeline(self, pipeline: Pipeline) -> bool:
"""Return whether this pipeline should be considered."""
branches = self._parameter("branches")
matches_branches = match_string_or_regular_expression(pipeline.ref, branches) if branches else True
matches_status = pipeline.status in self._parameter("pipeline_statuses_to_include")
matches_source = pipeline.source in self._parameter("pipeline_triggers_to_include")
return matches_branches and matches_status and matches_source
for response in responses:
for pipeline_json in await response.json():
schedule_description = self.pipeline_schedule_descriptions.get(pipeline_json["id"], "")
pipeline_json["schedule_description"] = schedule_description
pipelines.append(Pipeline.from_json(**pipeline_json))
return pipelines

async def _scheduled_pipeline_descriptions(self) -> dict[int, str]:
"""Get the scheduled pipelines. Returns a mapping of scheduled pipeline IDs to their descriptions."""
scheduled_pipeline_descriptions = {}
for schedule in await self._pipeline_schedules():
scheduled_pipelines_api = await self._gitlab_api_url(f"pipeline_schedules/{schedule.id}/pipelines")
for response in await super()._get_source_responses(scheduled_pipelines_api):
for scheduled_pipeline in await response.json():
scheduled_pipeline_descriptions[scheduled_pipeline["id"]] = schedule.description
return scheduled_pipeline_descriptions

async def _pipeline_schedules(self) -> list[PipelineSchedule]:
"""Get the pipeline schedules."""
pipeline_schedules = []
for response in await super()._get_source_responses(await self._gitlab_api_url("pipeline_schedules")):
pipeline_schedules.extend([PipelineSchedule.from_json(**schedule) for schedule in await response.json()])
return pipeline_schedules
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
"""GitLab CI-pipeline duration collector."""

from datetime import timedelta

from collector_utilities.date_time import minutes
from collector_utilities.type import URL, Value
from collector_utilities.exceptions import CollectorError
from collector_utilities.type import Value
from model import Entities, SourceResponses

from .base import GitLabPipelineBase
Expand All @@ -12,14 +10,9 @@
class GitLabPipelineDuration(GitLabPipelineBase):
"""GitLab CI-pipeline duration collector."""

async def _get_source_responses(self, *urls: URL) -> SourceResponses:
"""Override to get the pipeline details."""
responses = await super()._get_source_responses(*urls)
pipelines = await self._pipelines(responses)
api_url = await self._gitlab_api_url(f"pipelines/{pipelines[0].id}")
return await super()._get_source_responses(api_url)

async def _parse_value(self, responses: SourceResponses, included_entities: Entities) -> Value:
"""Parse the value from the responses."""
durations = [pipeline.pipeline_duration for pipeline in await self._pipelines(responses)]
return str(minutes(max(durations, default=timedelta())))
if durations := [int(entity["duration"]) for entity in included_entities]:
return str(max(durations))
error_message = "No pipelines found within the lookback period"
raise CollectorError(error_message)
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from base_collectors import SourceCollector, TimePassedCollector
from collector_utilities.date_time import days_ago, parse_datetime
from collector_utilities.exceptions import CollectorError
from collector_utilities.type import URL, Response, Value
from model import Entities, SourceMeasurement, SourceResponses

Expand Down Expand Up @@ -81,8 +82,10 @@ class GitLabPipelineUpToDateness(TimePassedCollector, GitLabPipelineBase):
async def _parse_source_response_date_time(self, response: Response) -> datetime:
"""Override to get the date and time of the pipeline."""
pipelines = await self._pipelines(SourceResponses(responses=[response]))
datetimes = [pipeline.datetime for pipeline in pipelines]
return self.minimum(datetimes)
if datetimes := [pipeline.datetime for pipeline in pipelines]:
return self.minimum(datetimes)
error_message = "No pipelines found within the lookback period"
raise CollectorError(error_message)

@staticmethod
def minimum(date_times: Sequence[datetime]) -> datetime:
Expand Down
14 changes: 14 additions & 0 deletions components/collector/tests/source_collectors/gitlab/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,26 @@
"""GitLab unit test base classes."""

from datetime import datetime
from typing import Final

from dateutil.tz import tzutc

from tests.source_collectors.source_collector_test_case import SourceCollectorTestCase


class FakeResponse:
"""Fake GitLab response."""

links: Final[dict] = {}

def __init__(self, fake_json) -> None:
self.fake_json = fake_json

async def json(self):
"""Return the fake JSON."""
return self.fake_json


class GitLabTestCase(SourceCollectorTestCase):
"""Base class for testing GitLab collectors."""

Expand Down
Original file line number Diff line number Diff line change
@@ -1,101 +1,72 @@
"""Unit tests for the GitLab CI-pipeline duration collector."""

from datetime import datetime
from typing import Final
from unittest.mock import Mock, patch

from dateutil.tz import tzutc

from .base import GitLabTestCase


class FakeResponse:
"""Fake GitLab response."""

links: Final[dict] = {}

def __init__(self, fake_json) -> None:
self.fake_json = fake_json

async def json(self):
"""Return the fake JSON."""
return self.fake_json
from .base import FakeResponse, GitLabTestCase


class GitLabPipelineDurationTest(GitLabTestCase):
"""Unit tests for the CI-pipeline duration metric."""

METRIC_TYPE = "pipeline_duration"
METRIC_ADDITION = "min"
NOW = datetime(2022, 9, 22, 1, 30, 14, 197, tzinfo=tzutc())
NOW = datetime(2022, 9, 21, 1, 30, 14, 197, tzinfo=tzutc())
MOCK_DATETIME = Mock(now=Mock(return_value=NOW))

def setUp(self) -> None:
"""Extend to set up fixtures."""
super().setUp()
self.landing_url = "https://gitlab/project/-/pipelines/1"
self.pipeline_schedules_json = [{"id": "pipeline schedule id", "description": "pipeline description"}]
self.scheduled_pipelines_json = [{"id": "pipeline id"}]
self.pipeline_json = [
{
"id": "pipeline id",
"iid": "iid",
"project_id": "project id",
"name": "Pipeline name",
"created_at": "2022-09-21T01:05:14.197Z",
"updated_at": "2022-09-21T01:05:50.175Z",
"updated_at": "2022-09-21T01:15:14.175Z",
"ref": "branch",
"status": "success",
"source": "push",
"web_url": self.landing_url,
},
]
self.pipeline_detail_json: dict[str, str | int] = {
"id": "pipeline id",
"iid": "iid",
"project_id": "project id",
"created_at": "2022-09-21T01:05:14.197Z",
"updated_at": "2022-09-21T01:05:50.175Z",
"ref": "branch",
"status": "success",
"source": "push",
"web_url": self.landing_url,
}

async def collect(self):
"""Override to pass the GitLab pipeline JSON responses."""
return await super().collect(
get_request_side_effect=[
FakeResponse(self.pipeline_schedules_json), # To fetch all pipeline schedules
FakeResponse(self.scheduled_pipelines_json), # To fetch all pipelines for the pipeline schedule
FakeResponse(self.pipeline_json), # To fetch all pipelines
FakeResponse(self.pipeline_detail_json), # To get the pipeline details for the most recent pipeline
]
)

async def test_duration(self):
"""Test that the duration is returned."""
self.pipeline_detail_json["duration"] = 600
response = await self.collect()
self.assert_measurement(response, value="10", landing_url=self.landing_url)

async def test_duration_without_duration_field(self):
"""Test that start and finish timestamps are used when the pipeline has no duration."""
self.pipeline_detail_json["started_at"] = "2022-09-21T01:15:14.197Z"
self.pipeline_detail_json["finished_at"] = "2022-09-21T01:20:14.197Z"
response = await self.collect()
self.assert_measurement(response, value="5", landing_url=self.landing_url)

@patch("source_collectors.gitlab.base.datetime", MOCK_DATETIME)
async def test_duration_without_duration_and_finished(self):
"""Test that start and now are used when the pipeline has no duration and no finish datetime."""
self.pipeline_detail_json["started_at"] = "2022-09-22T01:15:14.197Z"
response = await self.collect()
self.assert_measurement(response, value="15", landing_url=self.landing_url)

async def test_duration_without_duration_and_started(self):
"""Test that the created datetime is used if the started datetime is missing, but finished is not."""
self.pipeline_detail_json["finished_at"] = "2022-09-21T01:08:14.197Z"
async def test_duration_without_updated(self):
"""Test that start and now are used when the pipeline has no updated datetime."""
del self.pipeline_json[0]["updated_at"]
response = await self.collect()
self.assert_measurement(response, value="3", landing_url=self.landing_url)
self.assert_measurement(response, value="25", landing_url=self.landing_url)

async def test_duration_when_no_match(self):
"""Test that an error is returned when no pipelines match."""
self.set_source_parameter("branches", "missing")
self.set_source_parameter("branches", ["missing"])
response = await self.collect()
self.assert_measurement(response, connection_error="No pipelines found within the lookback period")
self.assert_measurement(response, parse_error="No pipelines found within the lookback period")

async def test_filter_by_pipeline_description(self):
"""Test that pipelines can be filtered by pipeline description."""
self.set_source_parameter("pipeline_schedules_to_include", ["pipeline description"])
response = await self.collect()
self.assert_measurement(response, value="10", landing_url=self.landing_url)
Loading
Loading