Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add type annotations and docstrings to devlib #715

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ devlib/bin/scripts/shutils
doc/_build/
build/
dist/
.venv/
.vscode/
venv/
.history/
129 changes: 101 additions & 28 deletions devlib/_target_runner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2024 ARM Limited
# Copyright 2024-2025 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -20,12 +20,22 @@
import logging
import os
import time

from platform import machine
from typing import Optional, cast, Protocol, TYPE_CHECKING, TypedDict, Union
from typing_extensions import NotRequired, LiteralString
if TYPE_CHECKING:
from _typeshed import StrPath, BytesPath
from devlib.platform import Platform
else:
StrPath = str
BytesPath = bytes

from devlib.exception import (TargetStableError, HostError)
from devlib.target import LinuxTarget
from devlib.target import LinuxTarget, Target
from devlib.utils.misc import get_subprocess, which
from devlib.utils.ssh import SshConnection
from devlib.utils.annotation_helpers import SubprocessCommand, SshUserConnectionSettings


class TargetRunner:
Expand All @@ -40,15 +50,22 @@ class TargetRunner:
"""

def __init__(self,
target):
target: Target) -> None:
self.target = target

self.logger = logging.getLogger(self.__class__.__name__)

def __enter__(self):
"""
Enter the context for this runner.
:return: This runner instance.
:rtype: TargetRunner
"""
return self

def __exit__(self, *_):
"""
Exit the context for this runner.
"""
pass


Expand Down Expand Up @@ -77,20 +94,20 @@ class SubprocessTargetRunner(TargetRunner):
"""

def __init__(self,
runner_cmd,
target,
connect=True,
boot_timeout=60):
runner_cmd: SubprocessCommand,
target: Target,
connect: bool = True,
boot_timeout: int = 60):
super().__init__(target=target)

self.boot_timeout = boot_timeout
self.boot_timeout: int = boot_timeout

self.logger.info('runner_cmd: %s', runner_cmd)

try:
self.runner_process = get_subprocess(runner_cmd)
except Exception as ex:
raise HostError(f'Error while running "{runner_cmd}": {ex}') from ex
raise HostError(f'Error while running "{runner_cmd!r}": {ex}') from ex

if connect:
self.wait_boot_complete()
Expand All @@ -107,16 +124,16 @@ def __exit__(self, *_):

self.terminate()

def wait_boot_complete(self):
def wait_boot_complete(self) -> None:
"""
Wait for target OS to finish boot up and become accessible over SSH in at most
``SubprocessTargetRunner.boot_timeout`` seconds.
Wait for the target OS to finish booting and become accessible within
:attr:`boot_timeout` seconds.
:raises TargetStableError: In case of timeout.
:raises TargetStableError: If the target is inaccessible after the timeout.
"""

start_time = time.time()
elapsed = 0
elapsed: float = 0.0
while self.boot_timeout >= elapsed:
try:
self.target.connect(timeout=self.boot_timeout - elapsed)
Expand All @@ -132,9 +149,9 @@ def wait_boot_complete(self):
self.terminate()
raise TargetStableError(f'Target is inaccessible for {self.boot_timeout} seconds!')

def terminate(self):
def terminate(self) -> None:
"""
Terminate ``SubprocessTargetRunner.runner_process``.
Terminate the subprocess associated with this runner.
"""

self.logger.debug('Killing target runner...')
Expand All @@ -150,7 +167,7 @@ class NOPTargetRunner(TargetRunner):
:type target: Target
"""

def __init__(self, target):
def __init__(self, target: Target) -> None:
super().__init__(target=target)

def __enter__(self):
Expand All @@ -159,11 +176,63 @@ def __enter__(self):
def __exit__(self, *_):
pass

def terminate(self):
def terminate(self) -> None:
"""
Nothing to terminate for NOP target runners.
Defined to be compliant with other runners (e.g., ``SubprocessTargetRunner``).
"""
pass


QEMUTargetUserSettings = TypedDict("QEMUTargetUserSettings", {
'kernel_image': str,
'arch': NotRequired[str],
'cpu_type': NotRequired[str],
'initrd_image': str,
'mem_size': NotRequired[int],
'num_cores': NotRequired[int],
'num_threads': NotRequired[int],
'cmdline': NotRequired[str],
'enable_kvm': NotRequired[bool],
})

QEMUTargetRunnerSettings = TypedDict("QEMUTargetRunnerSettings", {
'kernel_image': str,
'arch': str,
'cpu_type': str,
'initrd_image': str,
'mem_size': int,
'num_cores': int,
'num_threads': int,
'cmdline': str,
'enable_kvm': bool,
})


SshConnectionSettings = TypedDict("SshConnectionSettings", {
'username': str,
'password': str,
'keyfile': Optional[Union[LiteralString, StrPath, BytesPath]],
'host': str,
'port': int,
'timeout': float,
'platform': 'Platform',
'sudo_cmd': str,
'strict_host_check': bool,
'use_scp': bool,
'poll_transfers': bool,
'start_transfer_poll_delay': int,
'total_transfer_timeout': int,
'transfer_poll_period': int,
})


class QEMUTargetRunnerTargetFactory(Protocol):
"""
Protocol for Lambda function for creating :class:`Target` based object.
"""
def __call__(self, *, connect: bool, conn_cls, connection_settings: SshConnectionSettings) -> Target:
...


class QEMUTargetRunner(SubprocessTargetRunner):
Expand All @@ -177,7 +246,7 @@ class QEMUTargetRunner(SubprocessTargetRunner):
* ``arch``: Architecture type. Defaults to ``aarch64``.
* ``cpu_types``: List of CPU ids for QEMU. The list only contains ``cortex-a72`` by
* ``cpu_type``: List of CPU ids for QEMU. The list only contains ``cortex-a72`` by
default. This parameter is valid for Arm architectures only.
* ``initrd_image``: This points to the location of initrd image (e.g.,
Expand Down Expand Up @@ -212,21 +281,25 @@ class QEMUTargetRunner(SubprocessTargetRunner):
"""

def __init__(self,
qemu_settings,
connection_settings=None,
make_target=LinuxTarget,
**args):
qemu_settings: QEMUTargetUserSettings,
connection_settings: Optional[SshUserConnectionSettings] = None,
make_target: QEMUTargetRunnerTargetFactory = cast(QEMUTargetRunnerTargetFactory, LinuxTarget),
**args) -> None:

self.connection_settings = {
default_connection_settings = {
'host': '127.0.0.1',
'port': 8022,
'username': 'root',
'password': 'root',
'strict_host_check': False,
}
self.connection_settings = {**self.connection_settings, **(connection_settings or {})}

qemu_args = {
self.connection_settings: SshConnectionSettings = cast(SshConnectionSettings, {
**default_connection_settings,
**(connection_settings or {})
})

qemu_default_args = {
'arch': 'aarch64',
'cpu_type': 'cortex-a72',
'mem_size': 512,
Expand All @@ -235,7 +308,7 @@ def __init__(self,
'cmdline': 'console=ttyAMA0',
'enable_kvm': True,
}
qemu_args = {**qemu_args, **qemu_settings}
qemu_args: QEMUTargetRunnerSettings = cast(QEMUTargetRunnerSettings, {**qemu_default_args, **qemu_settings})

qemu_executable = f'qemu-system-{qemu_args["arch"]}'
qemu_path = which(qemu_executable)
Expand Down
71 changes: 60 additions & 11 deletions devlib/collector/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2015 ARM Limited
# Copyright 2015-2025 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -16,27 +16,62 @@
import logging

from devlib.utils.types import caseless_string
from typing import TYPE_CHECKING, Optional, List
if TYPE_CHECKING:
from devlib.target import Target


class CollectorBase(object):
"""
The ``Collector`` API provide a consistent way of collecting arbitrary data from
a target. Data is collected via an instance of a class derived from :class:`CollectorBase`.
def __init__(self, target):
:param target: The devlib Target from which data will be collected.
"""
def __init__(self, target: 'Target'):
self.target = target
self.logger = logging.getLogger(self.__class__.__name__)
self.output_path = None

def reset(self):
self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self.output_path: Optional[str] = None

def reset(self) -> None:
"""
This can be used to configure a collector for collection. This must be invoked
before :meth:`start()` is called to begin collection.
"""
pass

def start(self):
def start(self) -> None:
"""
Starts collecting from the target.
"""
pass

def stop(self):
"""
Stops collecting from target. Must be called after
:func:`start()`.
"""
pass

def set_output(self, output_path):
def set_output(self, output_path: str) -> None:
"""
Configure the output path for the particular collector. This will be either
a directory or file path which will be used when storing the data. Please see
the individual Collector documentation for more information.
:param output_path: The path (file or directory) to which data will be saved.
"""
self.output_path = output_path

def get_data(self):
def get_data(self) -> 'CollectorOutput':
"""
The collected data will be return via the previously specified output_path.
This method will return a :class:`CollectorOutput` object which is a subclassed
list object containing individual ``CollectorOutputEntry`` objects with details
about the individual output entry.
:raises RuntimeError: If ``output_path`` has not been set.
"""
return CollectorOutput()

def __enter__(self):
Expand All @@ -47,11 +82,25 @@ def __enter__(self):
def __exit__(self, exc_type, exc_value, traceback):
self.stop()


class CollectorOutputEntry(object):
"""
This object is designed to allow for the output of a collector to be processed
generically. The object will behave as a regular string containing the path to
underlying output path and can be used directly in ``os.path`` operations.
.. attribute:: CollectorOutputEntry.path
The file path for the corresponding output item.
.. attribute:: CollectorOutputEntry.path_kind
path_kinds = ['file', 'directory']
:param path: The file path of the collected output data.
:param path_kind: The type of output. Must be one of ``file`` or ``directory``.
"""
path_kinds: List[str] = ['file', 'directory']

def __init__(self, path, path_kind):
def __init__(self, path: str, path_kind: str):
self.path = path

path_kind = caseless_string(path_kind)
Expand Down
Loading