Skip to content

Commit

Permalink
Add Cli jobs manager (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariotaddeucci authored Nov 17, 2024
1 parent 94eae3a commit fa60686
Show file tree
Hide file tree
Showing 12 changed files with 229 additions and 6 deletions.
File renamed without changes.
11 changes: 11 additions & 0 deletions examples/jobs/simple_app/simple_app.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[gyjd.job]
name = "new_simple_app"
description = "A simple app that does nothing"
script = "simple_app.py"
python_version = "3.11"
dependencies = ["gyjd", "requests"]
tags = ["simple", "app"]

[gyjd.job.schedule.cron.default]
expression = "0 0 * * *"
timezone = "UTC"
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ requires-python = ">=3.11"
dependencies = []

[project.optional-dependencies]
compiler = ["nuitka"]
cli = ["typer", "uv"]

[project.scripts]
gyjd = "gyjd.__main__:app"
gyjd = "gyjd.cli.__main__:app"

[tool.hatch.build]
exclude = ["/tests", "/docs", "/examples"]
Expand Down
Empty file added src/gyjd/cli/__init__.py
Empty file.
43 changes: 43 additions & 0 deletions src/gyjd/cli/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import os
import shutil
import subprocess
from pathlib import Path
from typing import Annotated

import typer

from gyjd.cli.apps.jobs.app import app as jobs_app

app = typer.Typer(no_args_is_help=True)
app.add_typer(jobs_app, name="jobs", help="CLI for managing jobs.")


@app.command(name="compile", help="Compile a Python file to an executable.", no_args_is_help=True)
def compile(
filename: Annotated[
Path,
typer.Option(help="Python file to compile."),
],
):
output_dir = "dist"

commnad = [
"uvx",
"nuitka",
"--follow-imports",
"--onefile",
f"--output-dir={output_dir}",
"--assume-yes-for-downloads",
str(filename),
]

subprocess.run(commnad, stdout=None, stderr=None, text=True)

for entry in os.listdir(output_dir):
entry_uri = os.path.join(output_dir, entry)
if not os.path.isfile(entry_uri):
shutil.rmtree(entry_uri)


if __name__ == "__main__":
app()
Empty file added src/gyjd/cli/apps/__init__.py
Empty file.
Empty file.
47 changes: 47 additions & 0 deletions src/gyjd/cli/apps/jobs/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import os
import subprocess
from pathlib import Path
from typing import Annotated

import typer

app = typer.Typer(no_args_is_help=True, help="CLI for managing jobs.")


@app.command(help="Start dagster server.")
def server(
scripts_path: Annotated[
Path,
typer.Option(
exists=True,
file_okay=False,
dir_okay=True,
resolve_path=True,
help="Path that contains the scripts to run.",
default="scripts",
),
],
):
repos_dir = Path(__file__).parent / "repos"

dependencies = ["dagster-webserver", "uv"]

command = ["uvx", "--with", ",".join(dependencies), "dagster", "dev"]

for repo in repos_dir.glob("*.py"):
command.extend(["-f", str(repo)])

envs = os.environ.copy()
envs["GYJD_SCRIPTS_PATH"] = str(scripts_path.absolute())

process = subprocess.run(command, stdout=None, stderr=None, text=True, env=envs)

print(f"Exit Code: {process.returncode}")


@app.command(help="Create python script job.")
def create_script(
name: Annotated[str, typer.Argument(help="Name of the script.")],
python_version: Annotated[str, typer.Option(help="Python version to use.", prompt=True, default="3.11")],
):
print("Deleting user: Hiro Hamada")
122 changes: 122 additions & 0 deletions src/gyjd/cli/apps/jobs/repos/gyjd_scripts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import os
import subprocess
import sys
from pathlib import Path

import toml
from dagster import Field, OpExecutionContext, ScheduleDefinition, Shape, job, op, repository


@op(
config_schema=Shape(
{
"script_path": Field(str, description="Caminho para o script Python"),
"python_version": Field(str, description="Versão do Python"),
"dependencies": Field([str], description="Dependências do script"),
}
)
)
def run_python_script(context: OpExecutionContext):
script_path = context.op_config["script_path"]
python_version = context.op_config["python_version"]
dependencies = context.op_config["dependencies"]

context.log.info(f"Starting script: {script_path}")

command = [sys.executable, "-m", "uv", "run", "--no-project"]

if python_version:
command.extend(["--python", python_version])

if dependencies:
command.extend(["--with", ",".join(dependencies)])

command.extend(["--script", str(script_path)])

envs = os.environ.copy()

logger_prefix = "gyjd-jobs -"

envs["LOG_FORMATTER"] = f"{logger_prefix} - %(levelname)s - %(message)s"

process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
env=envs,
)

if process.stdout:
for line in process.stdout:
line = line.strip()
if line.startswith(logger_prefix):
level, message = line.removeprefix(logger_prefix).strip().split(" - ", 1)
getattr(context.log, level.lower())(message)

exit_code = process.wait()
if exit_code != 0:
raise Exception(f"Script finished with exit code: {exit_code}")

context.log.info(f"Script finished with exit code: {exit_code}")


# Função para criar um job para cada script
def create_job_for_script(script_name: str, script_path: str):
@job(name=script_name)
def dynamic_job():
run_python_script.configured(
{
"script_path": script_path,
"python_version": "3.11",
"dependencies": ["gyjd", "requests<3"],
},
name=f"{script_name}_op",
)()

return dynamic_job


def generate_definitions(scripts_path: Path):
for config_path in scripts_path.glob("**/*.toml"):
with open(config_path) as f:
try:
script_config = toml.load(f).get("gyjd", {}).get("job", {})
except toml.TomlDecodeError:
continue

if "script" not in script_config:
continue

script_path: Path = config_path.parent / script_config["script"]
script_name = script_config.get("name", script_path.stem)

job = create_job_for_script(script_name, str(script_path))

yield job

schedules = script_config.get("schedule", {})

for schedule_name, schedule_config in schedules.get("cron", {}).items():
cron_expression = schedule_config.get("expression")
if not cron_expression:
continue

cron_timezone = schedule_config.get("timezone", "UTC")

yield ScheduleDefinition(
job=job,
cron_schedule=cron_expression,
execution_timezone=cron_timezone,
name=f"{script_name}_{schedule_name}_schedule",
)


# Repositório contendo os jobs e schedules
@repository(
name="scripts_repository",
description="Repositório com jobs e schedules para scripts Python em ambientes isolados utilizando uv coordenadamente",
)
def scripts_repository():
scripts_path = Path(os.environ["GYJD_SCRIPTS_PATH"])
return list(generate_definitions(scripts_path=scripts_path))
4 changes: 2 additions & 2 deletions src/gyjd/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
@dataclass
class LoggerConfig:
name: str = "gyjd"
level: str = "INFO"
level: str = "$Env:LOG_LEVEL|INFO"
default_to_console: bool = True
format: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
formatter: str = "$Env:LOG_FORMATTER|%(asctime)s - %(name)s - %(levelname)s - %(message)s"


@dataclass
Expand Down
2 changes: 1 addition & 1 deletion src/gyjd/core/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def get_default_logger(config: LoggerConfig):
logger = logging.getLogger(config.name)
if not logger.handlers and config.default_to_console:
handler = logging.StreamHandler()
formatter = logging.Formatter(config.format)
formatter = logging.Formatter(config.formatter)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(getattr(logging, config.level.upper()))
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit fa60686

Please sign in to comment.