Skip to content

Commit

Permalink
initial
Browse files Browse the repository at this point in the history
krayevidi committed May 16, 2024
0 parents commit 98b8f5a
Showing 42 changed files with 1,526 additions and 0 deletions.
66 changes: 66 additions & 0 deletions .github/workflows/workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
name: Test

on:
- push
- pull_request

jobs:
test:
runs-on: ubuntu-latest

strategy:
fail-fast: false
matrix:
python-version:
- "3.11"
- "3.12"
django:
- "4.0"
- "5.0"

steps:
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Update pip
run: python -m pip install --upgrade pip

- name: Install Django ${{ matrix.django }}
run: pip install "Django~=${{ matrix.django }}"

- name: Install requirements
run: pip install -r requirements-ci.txt

- name: Install package
run: pip install -e .

- name: Run tests
run: python manage.py test

publish:
name: Build and publish Python 🐍 distributions 📦 to PyPI
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python 3.12
uses: actions/setup-python@v5
with:
python-version: 3.12

- name: Install req packages
run: python -m pip install -U setuptools wheel

- name: Build a binary wheel and a source tarball
run: python setup.py sdist bdist_wheel

- name: Publish Package on PyPI
if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags')
uses: pypa/gh-action-pypi-publish@release/v1.8
with:
user: __token__
password: ${{ secrets.PYPI_API_TOKEN }}
11 changes: 11 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
*.pyc
.idea/
.pycharm_helpers/
.ipython/
dist/
*.egg-info/
build/
*.sw*
.coverage
.bash_history
docker-compose.override.yaml
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## 1.0.0 (2024-05-16)

* Initial release
21 changes: 21 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
FROM python:3.12-slim-bookworm

ENV PYTHONUNBUFFERED 1
ENV LC_ALL=C.UTF-8

RUN useradd -m app

USER app
WORKDIR /app

ADD requirements-ci.txt /app/
ADD requirements-test.txt /app/

ENV PATH /home/app/venv/bin:$PATH

RUN python3 -m venv ~/venv && \
pip install -r requirements-test.txt

ADD . /app/

ENV DJANGO_SETTINGS_MODULE dev.settings
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2024 RegioHelden GmbH

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
3 changes: 3 additions & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
include LICENSE
include README.md
include CHANGELOG.md
125 changes: 125 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# django-temporalio
___

A small Django app that provides helpers for integrating [Temporal.io](https://temporal.io/) with Django.

## Features

- Registry: Provides a registry that holds mappings between queue names and registered activities and workflows.
- Management Commands: Includes management commands to manage Temporal.io workers and sync schedules.

## Installation

You can install `django_temporalio` using pip:

```bash
$ pip install django-temporalio
```

Add `django_temporalio` to your `INSTALLED_APPS`:

```python
INSTALLED_APPS = [
...
'django_temporalio.apps.DjangoTemporalioConfig',
...
]
```

Add the following settings to your `settings.py`:

```python
from temporalio.worker import WorkerConfig

DJANGO_TEMPORALIO = {
"URL": "localhost:7233",
"WORKER_CONFIGS": {
"main": WorkerConfig(
task_queue="MAIN_TASK_QUEUE",
...
),
...
},
}
```

## Usage

### Workflow and Activity Registry

The registry is a singleton that holds mappings between queue names and registered activities and workflows.
You can register activities and workflows using the `register` method.

Activities and workflows should be declared in `workflows.py` and `activities.py` modules respectively.

```python
from temporalio import activity, workflow
from django_temporalio.registry import queue_activities, queue_workflows

@queue_activities.register("HIGH_PRIORITY_TASK_QUEUE", "MAIN_TASK_QUEUE")
@activity.defn
def my_activity():
pass

@queue_workflows.register("HIGH_PRIORITY_TASK_QUEUE", "MAIN_TASK_QUEUE")
@workflow.defn
class MyWorkflow:
pass
```

### Schedule Registry

You can register schedules using the `register` method.

Schedules should be declared in `schedules.py` module.

```python
from django_temporalio.registry import schedules
from temporalio.client import Schedule


schedules.register("do-cool-stuff-every-hour", Schedule(...))
```

### Management Commands

To see a queue's registered activities and workflows:

```bash
$ ./manage.py show_temporalio_queue_registry
```

To start a worker defined in the settings (for production):

```bash
$ ./manage.py start_temporalio_worker <worker_name>
```

To start a worker for development (starts a worker for each registered queue, WORKER_CONFIGS setting is ignored):

```bash
$ ./manage.py start_temporalio_worker --all
```

To sync schedules with Temporal.io:

```bash
$ ./manage.py sync_temporalio_schedules
```

To see what sync operation would do without actually syncing:

```bash
$ ./manage.py sync_temporal_schedules --dry-run
```

## Configuration

You can configure the app using the following settings:

DJANGO_TEMPORALIO: A dictionary containing the following keys:

- URL: The Temporal.io host to connect to, defaults to `http://localhost:7233`
- NAMESPACE: The Temporal.io namespace to use, defaults to `default`
- WORKER_CONFIGS: A dictionary containing worker configurations.
The key is the worker name and the value is a `WorkerConfig` instance.
Empty file added dev/__init__.py
Empty file.
10 changes: 10 additions & 0 deletions dev/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from temporalio import activity

from dev.temporalio import TestTaskQueues
from django_temporalio.registry import queue_activities


@queue_activities.register(TestTaskQueues.MAIN)
@activity.defn
async def test_activity():
pass
7 changes: 7 additions & 0 deletions dev/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# -*- coding: UTF-8 -*-
from django.apps import AppConfig


class DevConfig(AppConfig):
name = "dev"
verbose_name = "Dev"
30 changes: 30 additions & 0 deletions dev/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from temporalio.client import (
Schedule,
ScheduleActionStartWorkflow,
ScheduleCalendarSpec,
ScheduleRange,
ScheduleSpec,
)

from dev.temporalio import TestTaskQueues
from django_temporalio.registry import schedules

schedules.register(
"do-cool-stuff-every-hour",
Schedule(
action=ScheduleActionStartWorkflow(
"TestWorkflow",
id="do-cool-stuff-every-hour",
task_queue=TestTaskQueues.MAIN,
),
spec=ScheduleSpec(
calendars=[
ScheduleCalendarSpec(
hour=[ScheduleRange(0, 23)],
minute=[ScheduleRange(0)],
second=[ScheduleRange(0)],
),
],
),
),
)
12 changes: 12 additions & 0 deletions dev/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import os
from enum import StrEnum

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

SECRET_KEY = "secret-key"
DEBUG = True

INSTALLED_APPS = [
"dev.apps.DevConfig",
"django_temporalio.apps.DjangoTemporalioConfig",
]
6 changes: 6 additions & 0 deletions dev/temporalio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from enum import StrEnum


class TestTaskQueues(StrEnum):
MAIN = "MAIN_TASK_QUEUE"
HIGH_PRIORITY = "HIGH_PRIORITY_TASK_QUEUE"
Empty file added dev/tests/__init__.py
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from io import StringIO
from unittest import TestCase, mock

from django.core.management import call_command


class ShowTemporalioQueueRegistryTestCase(TestCase):
"""
Test case for show_temporalio_queue_registry management command.
"""

def test_command(self):
registry = {
"TEST_QUEUE_1": mock.Mock(
workflows=[mock.Mock(__name__="TestWorkflow_1")],
activities=[mock.Mock(__name__="test_activity_1")],
),
"TEST_QUEUE_2": mock.Mock(
workflows=[mock.Mock(__name__="TestWorkflow_2")],
activities=[mock.Mock(__name__="test_activity_2")],
),
}

with mock.patch(
"django_temporalio.management.commands.show_temporalio_queue_registry.get_queue_registry",
return_value=registry,
) as get_queue_registry_mock, StringIO() as stdout:
call_command("show_temporalio_queue_registry", stdout=stdout)

get_queue_registry_mock.assert_called_once_with()
self.assertEqual(
stdout.getvalue(),
"TEST_QUEUE_1\n"
" workflows:\n"
" unittest.mock.TestWorkflow_1\n"
" activities:\n"
" unittest.mock.test_activity_1\n"
"TEST_QUEUE_2\n"
" workflows:\n"
" unittest.mock.TestWorkflow_2\n"
" activities:\n"
" unittest.mock.test_activity_2\n",
)
140 changes: 140 additions & 0 deletions dev/tests/management_commands/test_start_temporalio_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
from io import StringIO
from unittest import TestCase, mock

from django.core.management import call_command, CommandError
from django.test import override_settings
from temporalio.worker import WorkerConfig

from django_temporalio.conf import SETTINGS_KEY


class StartTemporalioWorkerTestCase(TestCase):
"""
Test case for start_temporalio_worker management command.
"""

@classmethod
def setUpClass(cls):
worker_configs: dict[str, WorkerConfig] = {
"worker_1": WorkerConfig(
task_queue="TEST_QUEUE_1",
),
"worker_2": WorkerConfig(
task_queue="TEST_QUEUE_2",
),
}

cls._overridden_context = override_settings(
**{SETTINGS_KEY: {"WORKER_CONFIGS": worker_configs}}
)
cls._overridden_context.enable()
cls.addClassCleanup(cls._overridden_context.disable)

def setUp(self):
self.worker_run_mock = mock.AsyncMock()
worker_patcher = mock.patch(
"django_temporalio.management.commands.start_temporalio_worker.Worker",
return_value=mock.Mock(run=self.worker_run_mock),
)
self.worker_mock = worker_patcher.start()
self.addCleanup(worker_patcher.stop)

self.client_mock = mock.Mock()
init_client_patcher = mock.patch(
"django_temporalio.management.commands.start_temporalio_worker.init_client",
return_value=self.client_mock,
)
init_client_patcher.start()
self.addCleanup(init_client_patcher.stop)

get_queue_registry_patcher = mock.patch(
"django_temporalio.management.commands.start_temporalio_worker.get_queue_registry",
return_value={
"TEST_QUEUE_1": mock.MagicMock(
workflows=["workflow_1"], activities=["activity_1"]
),
"TEST_QUEUE_2": mock.MagicMock(
workflows=["workflow_2"], activities=["activity_2"]
),
},
)
get_queue_registry_patcher.start()
self.addCleanup(get_queue_registry_patcher.stop)

self.stdout = StringIO()
self.addCleanup(self.stdout.close)

def test_flag_all(self):
"""
Test command execution with --all flag.
"""
call_command("start_temporalio_worker", all=True, stdout=self.stdout)

self.worker_mock.assert_has_calls(
[
mock.call(
self.client_mock,
task_queue="TEST_QUEUE_1",
workflows=["workflow_1"],
activities=["activity_1"],
),
mock.call(
self.client_mock,
task_queue="TEST_QUEUE_2",
workflows=["workflow_2"],
activities=["activity_2"],
),
],
any_order=True,
)
self.worker_run_mock.assert_has_calls([mock.call(), mock.call()])
self.assertEqual(
self.stdout.getvalue(),
"Starting dev Temporal.io workers for queues: TEST_QUEUE_1, TEST_QUEUE_2\n"
"(press ctrl-c to stop)...\n",
)

def test_start_worker(self):
"""
Test command execution with worker name argument.
"""
call_command("start_temporalio_worker", "worker_1", stdout=self.stdout)

self.worker_mock.assert_called_once_with(
self.client_mock,
task_queue="TEST_QUEUE_1",
workflows=["workflow_1"],
activities=["activity_1"],
)
self.worker_run_mock.assert_called_once()
self.assertEqual(
self.stdout.getvalue(),
"Starting 'worker_1' worker for 'TEST_QUEUE_1' queue\n"
"(press ctrl-c to stop)...\n",
)

def test_start_invalid_worker(self):
"""
Test that an error is raised when not declared worker name is provided.
"""
with self.assertRaises(CommandError) as cm:
call_command("start_temporalio_worker", "worker_3", stdout=self.stdout)

self.worker_mock.assert_not_called()
self.assertEqual(
str(cm.exception),
"Error: argument worker_name: invalid choice: 'worker_3' (choose from 'worker_1', 'worker_2')",
)

def test_no_arguments(self):
"""
Test that an error is raised when no arguments are provided.
"""
with self.assertRaises(SystemExit):
call_command("start_temporalio_worker", stderr=self.stdout)

self.worker_mock.assert_not_called()
self.assertEqual(
self.stdout.getvalue(),
"You must provide either a worker name or --all flag.\n",
)
119 changes: 119 additions & 0 deletions dev/tests/management_commands/test_sync_temporal_schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from io import StringIO
from unittest import TestCase, mock

from django.core.management import call_command


async def async_iterable(*items):
for item in items:
yield item


class SyncTemporalioSchedulesTestCase(TestCase):
"""
Test case for sync_temporalio_schedules management command.
"""

def setUp(self, *args, **kwargs):
self.schedule_handle_mock = mock.AsyncMock()
self.client_mock = mock.AsyncMock(
list_schedules=mock.AsyncMock(
return_value=async_iterable(
mock.Mock(id="schedule_1"),
mock.Mock(id="schedule_2"),
mock.Mock(id="schedule_3"),
mock.Mock(id="schedule_4"),
mock.Mock(id="schedule_5"),
)
),
get_schedule_handle=mock.Mock(return_value=self.schedule_handle_mock),
)
init_client_patcher = mock.patch(
"django_temporalio.management.commands.sync_temporalio_schedules.init_client",
return_value=self.client_mock,
)
init_client_patcher.start()
self.addCleanup(init_client_patcher.stop)

get_registry_patcher = mock.patch(
"django_temporalio.management.commands.sync_temporalio_schedules.schedules.get_registry",
return_value={
"schedule_1": "schedule_instance_1",
"schedule_2": "schedule_instance_2",
"schedule_6": "schedule_instance_6",
},
)
self.get_registry_mock = get_registry_patcher.start()
self.addCleanup(get_registry_patcher.stop)

self.stdout = StringIO()
self.addCleanup(self.stdout.close)

def _test_sync_schedules(self, verbosity=0):
call_command(
"sync_temporalio_schedules", verbosity=verbosity, stdout=self.stdout
)

self.get_registry_mock.assert_called_once_with()
self.client_mock.assert_has_calls(
[
mock.call.list_schedules(),
# get handle to initiate delete
mock.call.get_schedule_handle("schedule_3"),
mock.call.get_schedule_handle("schedule_4"),
mock.call.get_schedule_handle("schedule_5"),
# get handle to initiate update
mock.call.get_schedule_handle("schedule_1"),
mock.call.get_schedule_handle("schedule_2"),
mock.call.create_schedule("schedule_6", "schedule_instance_6"),
]
)
self.schedule_handle_mock.assert_has_calls(
[
mock.call.delete(),
mock.call.update(mock.ANY),
]
)

def test_sync_schedules(self):
self._test_sync_schedules()
self.assertEqual(
"Syncing schedules...\n" "removed 3, updated 2, created 1\n",
self.stdout.getvalue(),
)

def test_sync_schedules_verbose_output(self):
self._test_sync_schedules(verbosity=2)
self.assertEqual(
self.stdout.getvalue(),
"Syncing schedules...\n"
"Removed 'schedule_3'\n"
"Removed 'schedule_4'\n"
"Removed 'schedule_5'\n"
"Updated 'schedule_1'\n"
"Updated 'schedule_2'\n"
"Created 'schedule_6'\n"
"removed 3, updated 2, created 1\n",
)

def test_sync_schedules_dry_run(self):
call_command("sync_temporalio_schedules", dry_run=True, stdout=self.stdout)

self.get_registry_mock.assert_called_once_with()
self.client_mock.assert_has_calls(
[
mock.call.list_schedules(),
]
)
self.schedule_handle_mock.assert_not_called()
self.assertEqual(
self.stdout.getvalue(),
"Syncing schedules [DRY RUN]...\n"
"Removed 'schedule_3'\n"
"Removed 'schedule_4'\n"
"Removed 'schedule_5'\n"
"Updated 'schedule_1'\n"
"Updated 'schedule_2'\n"
"Created 'schedule_6'\n"
"removed 3, updated 2, created 1\n",
)
Empty file added dev/tests/registry/__init__.py
Empty file.
40 changes: 40 additions & 0 deletions dev/tests/registry/test_get_queue_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from unittest import TestCase, mock

from django_temporalio.registry import get_queue_registry, QueueRegistryItem


class GetQueueRegistryTestCase(TestCase):
@mock.patch("django_temporalio.registry.queue_activities.get_registry")
@mock.patch("django_temporalio.registry.queue_workflows.get_registry")
def test_get_queue_registry(
self, get_workflows_registry_mock, get_activities_registry_mock
):
"""
Test that the queue registry is correctly built from the workflows and activities registries.
"""
get_workflows_registry_mock.return_value = {
"TEST_QUEUE_1": ["TestWorkflow_1"],
"TEST_QUEUE_2": ["TestWorkflow_2"],
}
get_activities_registry_mock.return_value = {
"TEST_QUEUE_1": ["activity_1"],
"TEST_QUEUE_2": ["activity_2"],
}

registry = get_queue_registry()

get_workflows_registry_mock.assert_called_once_with()
get_activities_registry_mock.assert_called_once_with()
self.assertEqual(
registry,
{
"TEST_QUEUE_1": QueueRegistryItem(
workflows=["TestWorkflow_1"],
activities=["activity_1"],
),
"TEST_QUEUE_2": QueueRegistryItem(
workflows=["TestWorkflow_2"],
activities=["activity_2"],
),
},
)
117 changes: 117 additions & 0 deletions dev/tests/registry/test_queue_activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from unittest import TestCase, mock

from django.utils.module_loading import autodiscover_modules
from temporalio import activity

from dev.temporalio import TestTaskQueues
from django_temporalio.registry import queue_activities


@activity.defn
def test_activity():
pass


class QueueActivityRegistryTestCase(TestCase):
"""
Test case for queue_activities registry.
"""

def tearDown(self):
queue_activities.clear_registry()

@mock.patch(
"django_temporalio.registry.autodiscover_modules", wraps=autodiscover_modules
)
@mock.patch(
"django_temporalio.registry.queue_activities.register",
wraps=queue_activities.register,
)
def test_get_registry(self, mock_register, mock_autodiscover_modules):
"""
Test that activities defined in activities.py are automatically registered when the registry is accessed.
"""
registry = queue_activities.get_registry()

mock_register.assert_called_once_with(TestTaskQueues.MAIN)
mock_autodiscover_modules.assert_called_once_with("activities")
self.assertEqual(len(registry), 1)
self.assertIn(TestTaskQueues.MAIN, registry)
activities = registry[TestTaskQueues.MAIN]
self.assertEqual(len(activities), 1)
self.assertEqual(
f"{activities[0].__module__}.{activities[0].__name__}",
"dev.activities.test_activity",
)

@mock.patch("django_temporalio.registry.autodiscover_modules")
def test_register(self, _):
"""
Test that an activity can be registered.
"""
queue_activities.register(TestTaskQueues.MAIN)(test_activity)

registry = queue_activities.get_registry()
self.assertIn(TestTaskQueues.MAIN, registry)
self.assertIn(test_activity, registry[TestTaskQueues.MAIN])

@mock.patch("django_temporalio.registry.autodiscover_modules")
def test_register_multiple_queues(self, _):
"""
Test that an activity can be registered with multiple queues.
"""
queue_activities.register(
TestTaskQueues.MAIN,
TestTaskQueues.HIGH_PRIORITY,
)(test_activity)

registry = queue_activities.get_registry()
self.assertIn(TestTaskQueues.MAIN, registry)
self.assertIn(TestTaskQueues.HIGH_PRIORITY, registry)
self.assertIn(test_activity, registry[TestTaskQueues.MAIN])
self.assertIn(test_activity, registry[TestTaskQueues.HIGH_PRIORITY])

@mock.patch("django_temporalio.registry.autodiscover_modules")
def test_registry_uniqueness(self, _):
"""
Test that an activity can only be registered once.
"""
queue_activities.register(TestTaskQueues.MAIN)(test_activity)
queue_activities.register(TestTaskQueues.MAIN)(test_activity)

registry = queue_activities.get_registry()
self.assertIn(TestTaskQueues.MAIN, registry)
activities = registry[TestTaskQueues.MAIN]
self.assertEqual(len(activities), 1)
self.assertEqual(activities[0], test_activity)

def test_register_no_queue(self):
"""
Test that an exception is raised when an activity is registered without a queue.
"""
with self.assertRaises(ValueError):
queue_activities.register()

@mock.patch("django_temporalio.registry.autodiscover_modules")
def test_register_failure_on_missing_temporal_decorators(self, _):
"""
Test that an exception is raised when an activity function is not decorated with Temporal.io decorator.
"""
with self.assertRaises(queue_activities.MissingTemporalDecorator):

@queue_activities.register(TestTaskQueues.MAIN)
def test_activity():
pass

self.assertDictEqual(queue_activities.get_registry(), {})

@mock.patch("django_temporalio.registry.autodiscover_modules")
def test_clear_registry(self, _):
"""
Test that the registry can be cleared.
"""
queue_activities.register(TestTaskQueues.MAIN)(test_activity)

queue_activities.clear_registry()

self.assertDictEqual(queue_activities.get_registry(), {})
118 changes: 118 additions & 0 deletions dev/tests/registry/test_queue_workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from unittest import TestCase, mock

from temporalio import workflow

from dev.temporalio import TestTaskQueues
from django_temporalio.registry import queue_workflows, autodiscover_modules


@workflow.defn
class TestWorkflow:
@workflow.run
async def run(self):
pass


class QueueWorkflowRegistryTestCase(TestCase):
"""
Test case for queue_workflows registry.
"""

def tearDown(self):
queue_workflows.clear_registry()

@mock.patch(
"django_temporalio.registry.autodiscover_modules", wraps=autodiscover_modules
)
@mock.patch(
"django_temporalio.registry.queue_workflows.register",
wraps=queue_workflows.register,
)
def test_get_registry(self, mock_register, mock_autodiscover_modules):
"""
Test that workflows defined in workflows.py are automatically registered when the registry is accessed.
"""
registry = queue_workflows.get_registry()

mock_register.assert_called_once_with(TestTaskQueues.MAIN)
mock_autodiscover_modules.assert_called_once_with("workflows")
self.assertEqual(len(registry), 1)
self.assertIn(TestTaskQueues.MAIN, registry)
workflows = registry[TestTaskQueues.MAIN]
self.assertEqual(len(workflows), 1)
self.assertEqual(
"dev.workflows.TestWorkflow",
f"{workflows[0].__module__}.{workflows[0].__name__}",
)

@mock.patch("django_temporalio.registry.autodiscover_modules")
def test_register(self, _):
"""
Test that a workflow can be registered.
"""
queue_workflows.register(TestTaskQueues.MAIN)(TestWorkflow)

registry = queue_workflows.get_registry()
self.assertIn(TestTaskQueues.MAIN, registry)
self.assertIn(TestWorkflow, registry[TestTaskQueues.MAIN])

@mock.patch("django_temporalio.registry.autodiscover_modules")
def test_register_multiple_queues(self, _):
"""
Test that a workflow can be registered with multiple queues.
"""
queue_workflows.register(
TestTaskQueues.MAIN,
TestTaskQueues.HIGH_PRIORITY,
)(TestWorkflow)

registry = queue_workflows.get_registry()
self.assertIn(TestTaskQueues.MAIN, registry)
self.assertIn(TestTaskQueues.HIGH_PRIORITY, registry)
self.assertIn(TestWorkflow, registry[TestTaskQueues.MAIN])
self.assertIn(TestWorkflow, registry[TestTaskQueues.HIGH_PRIORITY])

@mock.patch("django_temporalio.registry.autodiscover_modules")
def test_registry_uniqueness(self, _):
"""
Test that a workflow can only be registered once.
"""
queue_workflows.register(TestTaskQueues.MAIN)(TestWorkflow)
queue_workflows.register(TestTaskQueues.MAIN)(TestWorkflow)

registry = queue_workflows.get_registry()
self.assertIn(TestTaskQueues.MAIN, registry)
workflows = registry[TestTaskQueues.MAIN]
self.assertEqual(len(workflows), 1)
self.assertEqual(workflows[0], TestWorkflow)

def test_register_no_queue(self):
"""
Test that an exception is raised when a workflow is registered without a queue.
"""
with self.assertRaises(ValueError):
queue_workflows.register()

@mock.patch("django_temporalio.registry.autodiscover_modules")
def test_register_failure_on_missing_temporal_decorators(self, _):
"""
Test that an exception is raised when a workflow class is not decorated with Temporal.io decorator.
"""
with self.assertRaises(queue_workflows.MissingTemporalDecorator):

@queue_workflows.register(TestTaskQueues.MAIN)
class TestWorkflow:
pass

self.assertDictEqual(queue_workflows.get_registry(), {})

@mock.patch("django_temporalio.registry.autodiscover_modules")
def test_clear_registry(self, _):
"""
Test that the registry can be cleared.
"""
queue_workflows.register(TestTaskQueues.MAIN)(TestWorkflow)

queue_workflows.clear_registry()

self.assertDictEqual(queue_workflows.get_registry(), {})
88 changes: 88 additions & 0 deletions dev/tests/registry/test_schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from unittest import TestCase, mock

from django.utils.module_loading import autodiscover_modules
from temporalio.client import (
ScheduleActionStartWorkflow,
Schedule,
ScheduleSpec,
ScheduleCalendarSpec,
ScheduleRange,
)

from dev.temporalio import TestTaskQueues
from django_temporalio.registry import schedules


class ScheduleRegistryTestCase(TestCase):
"""
Test case for schedules registry.
"""

@classmethod
def setUpClass(cls):
cls.schedule_id = "test-schedule"
cls.schedule = Schedule(
action=ScheduleActionStartWorkflow(
"TestWorkflow",
id="do-something-every-hour",
task_queue=TestTaskQueues.MAIN,
),
spec=ScheduleSpec(
calendars=[
ScheduleCalendarSpec(
hour=[ScheduleRange(0, 23)],
minute=[ScheduleRange(0)],
second=[ScheduleRange(0)],
),
],
),
)

def tearDown(self):
schedules.clear_registry()

@mock.patch(
"django_temporalio.registry.autodiscover_modules", wraps=autodiscover_modules
)
@mock.patch.object(schedules, "register", wraps=schedules.register)
def test_get_registry(self, mock_register, mock_autodiscover_modules):
"""
Test that schedules defined in schedules.py are automatically registered when the registry is accessed.
"""
registry = schedules.get_registry()

mock_register.assert_called_once()
mock_autodiscover_modules.assert_called_once_with("schedules")
self.assertEqual(len(registry), 1)
self.assertIn("do-cool-stuff-every-hour", registry)

@mock.patch("django_temporalio.registry.autodiscover_modules")
def test_register(self, _):
"""
Test that a schedule can be registered.
"""
schedules.register(self.schedule_id, self.schedule)

registry = schedules.get_registry()
self.assertIn(self.schedule_id, registry)
self.assertEqual(registry[self.schedule_id], self.schedule)

def test_already_registered_exception(self):
"""
Test that an exception is raised when attempting to register a schedule with the same ID.
"""
schedules.register(self.schedule_id, self.schedule)

with self.assertRaises(schedules.AlreadyRegistered):
schedules.register(self.schedule_id, self.schedule)

@mock.patch("django_temporalio.registry.autodiscover_modules")
def test_clear_registry(self, _):
"""
Test that the registry can be cleared.
"""
schedules.register(self.schedule_id, self.schedule)

schedules.clear_registry()

self.assertEqual(len(schedules.get_registry()), 0)
18 changes: 18 additions & 0 deletions dev/tests/test_init_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from unittest import IsolatedAsyncioTestCase, mock

from django_temporalio.client import init_client
from django_temporalio.conf import settings


class InitClientTestCase(IsolatedAsyncioTestCase):
"""
Test case for init_client function.
"""

async def test_init_client(self):
with mock.patch("django_temporalio.client.Client.connect") as connect_mock:
await init_client()

connect_mock.assert_called_once_with(
target_host=settings.URL, namespace=settings.NAMESPACE
)
50 changes: 50 additions & 0 deletions dev/tests/test_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from unittest import TestCase

from django.conf import settings as django_settings
from django.test.utils import override_settings

from django_temporalio.conf import (
SETTINGS_KEY,
DEFAULTS,
settings as temporalio_settings,
)


class SettingsTestCase(TestCase):
"""
Test case for django_temporalio.conf.settings.
"""

def test_default_settings(self):
self.assertFalse(hasattr(django_settings, SETTINGS_KEY))
self.assertEqual(temporalio_settings.URL, DEFAULTS["URL"])
self.assertEqual(temporalio_settings.NAMESPACE, DEFAULTS["NAMESPACE"])
self.assertEqual(temporalio_settings.WORKER_CONFIGS, DEFAULTS["WORKER_CONFIGS"])

def test_user_settings(self):
user_settings = {
"URL": "http://temporal:7233",
"NAMESPACE": "main",
"WORKER_CONFIGS": {"main": "config"},
}
with override_settings(**{SETTINGS_KEY: user_settings}):
self.assertEqual(temporalio_settings.URL, user_settings["URL"])
self.assertEqual(temporalio_settings.NAMESPACE, user_settings["NAMESPACE"])
self.assertEqual(
temporalio_settings.WORKER_CONFIGS, user_settings["WORKER_CONFIGS"]
)

def test_fallback_to_defaults(self):
user_settings = {
"NAMESPACE": "main",
}
with override_settings(**{SETTINGS_KEY: user_settings}):
self.assertEqual(temporalio_settings.URL, DEFAULTS["URL"])
self.assertEqual(temporalio_settings.NAMESPACE, user_settings["NAMESPACE"])
self.assertEqual(
temporalio_settings.WORKER_CONFIGS, DEFAULTS["WORKER_CONFIGS"]
)

def test_invalid_setting(self):
with self.assertRaises(AttributeError):
temporalio_settings.SOMETHING
12 changes: 12 additions & 0 deletions dev/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from temporalio import workflow

from dev.temporalio import TestTaskQueues
from django_temporalio.registry import queue_workflows


@queue_workflows.register(TestTaskQueues.MAIN)
@workflow.defn
class TestWorkflow:
@workflow.run
async def run(self):
pass
7 changes: 7 additions & 0 deletions django_temporalio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
__title__ = "django-temporalio"
__description__ = "Temporal.io integration for Django"
__version__ = "1.0.0"
__url__ = "https://github.com/RegioHelden/django-temporalio"
__author__ = "RegioHelden GmbH"
__author_email__ = "opensource@regiohelden.de"
__license__ = "MIT"
6 changes: 6 additions & 0 deletions django_temporalio/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class DjangoTemporalioConfig(AppConfig):
name = "django_temporalio"
verbose_name = "Django Temporal.io helpers"
13 changes: 13 additions & 0 deletions django_temporalio/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from temporalio.client import Client

from django_temporalio.conf import settings


async def init_client():
"""
Connect to Temporal.io server and return a client instance.
"""
return await Client.connect(
target_host=settings.URL,
namespace=settings.NAMESPACE,
)
56 changes: 56 additions & 0 deletions django_temporalio/conf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
Settings for django-temporalio are all namespaced in the DJANGO_TEMPORALIO setting.
For example your project's `settings.py` file might look like this:
DJANGO_TEMPORALIO = {
'URL': 'http://localhost:7233',
}
This module provides the `settings` object, that is used to access
django-temporalio settings, checking for user settings first, then falling
back to the defaults.
"""

from django.conf import settings as django_settings
from django.core.signals import setting_changed
from django.dispatch import receiver

SETTINGS_KEY = "DJANGO_TEMPORALIO"
DEFAULTS = {
"URL": "http://localhost:7233",
"NAMESPACE": "default",
"WORKER_CONFIGS": {},
}


class Settings:
def __init__(self):
self.defaults = DEFAULTS

@property
def user_settings(self):
if not hasattr(self, "_user_settings"):
self._user_settings = getattr(django_settings, SETTINGS_KEY, {})
return self._user_settings

def __getattr__(self, attr):
if attr not in self.defaults:
raise AttributeError(f"Invalid setting: '{attr}'")

if attr in self.user_settings:
return self.user_settings[attr]

return self.defaults[attr]

def reload(self):
if hasattr(self, "_user_settings"):
delattr(self, "_user_settings")


settings = Settings()


@receiver(setting_changed)
def reload_settings(*args, **kwargs):
if kwargs["setting"] == SETTINGS_KEY:
settings.reload()
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from django.core.management.base import BaseCommand

from django_temporalio.registry import get_queue_registry


class Command(BaseCommand):
help = "Show django-temporalio queue registry."
indent = 2

def handle(self, *args, **options):
for queue_name, item in get_queue_registry().items():
self.stdout.write(f"{queue_name}")
for label, entities in [
("workflows", item.workflows),
("activities", item.activities),
]:
if not entities:
continue

self.stdout.write(f"{' ' * self.indent}{label}:")
for entity in entities:
self.stdout.write(
f"{' ' * self.indent * 2}{entity.__module__}.{entity.__name__}",
)
95 changes: 95 additions & 0 deletions django_temporalio/management/commands/start_temporalio_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import asyncio
import contextlib
import sys

from django.core.management import BaseCommand
from temporalio.worker import Worker

from django_temporalio.client import init_client
from django_temporalio.conf import settings
from django_temporalio.registry import get_queue_registry


class Command(BaseCommand):
help = "Starts Temporal.io worker."

def add_arguments(self, parser):
parser.add_argument(
"worker_name",
nargs="?",
choices=settings.WORKER_CONFIGS.keys(),
help="The name of the worker to start.",
)
parser.add_argument(
"-a",
"--all",
action="store_true",
default=False,
help=(
"Start a worker per queue registered in the django-temporalio registry. "
"Meant for development purposes."
),
)

async def start_dev_workers(self):
client = await init_client()
tasks = []
queues = []

for queue_name, item in get_queue_registry().items():
worker = Worker(
client,
task_queue=queue_name,
workflows=item.workflows,
activities=item.activities,
)
tasks.append(worker.run())
queues.append(queue_name)

self.stdout.write(
f"Starting dev Temporal.io workers for queues: {', '.join(queues)}\n"
f"(press ctrl-c to stop)...",
)
await asyncio.gather(*tasks)

async def start_worker(self, name):
worker_config = settings.WORKER_CONFIGS[name]
queue_name = worker_config["task_queue"]
registry = get_queue_registry().get(queue_name)

if not registry:
self.stderr.write(
f"Failed to start '{name}' worker.\n"
f"No activities/workflows registered for queue '{queue_name}'.",
)
sys.exit(1)

client = await init_client()
worker = Worker(
client,
**worker_config,
workflows=registry.workflows,
activities=registry.activities,
)
self.stdout.write(
f"Starting '{name}' worker for '{queue_name}' queue\n"
f"(press ctrl-c to stop)...",
)
await worker.run()

def handle(self, *args, **options):
worker_name = options["worker_name"]
run_all = options["all"]

if not worker_name and not run_all:
self.stderr.write("You must provide either a worker name or --all flag.")
sys.exit(2)

with contextlib.suppress(KeyboardInterrupt):
asyncio.run(
(
self.start_dev_workers()
if run_all
else self.start_worker(worker_name)
),
)
65 changes: 65 additions & 0 deletions django_temporalio/management/commands/sync_temporalio_schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import asyncio

from django.core.management.base import BaseCommand
from temporalio.client import ScheduleUpdate

from django_temporalio.client import init_client
from django_temporalio.registry import schedules


class Command(BaseCommand):
verbose = False
dry_run = False
help = "Syncs Temporal.io schedules."

def add_arguments(self, parser):
parser.add_argument(
"-d",
"--dry-run",
action="store_true",
default=False,
help="Prints what would be done without actually doing it.",
)

def log(self, msg: str):
if self.verbose or self.dry_run:
self.stdout.write(msg)

async def sync_schedules(self):
client = await init_client()
current_schedule_ids = {s.id async for s in await client.list_schedules()}
registry = schedules.get_registry()
removed_schedule_ids = sorted(current_schedule_ids - set(registry))
updated_schedule_ids = []
new_schedule_ids = []

for schedule_id in removed_schedule_ids:
if not self.dry_run:
handle = client.get_schedule_handle(schedule_id)
await handle.delete()
self.log(f"Removed '{schedule_id}'")

for schedule_id, schedule in registry.items():
if schedule_id in current_schedule_ids:
if not self.dry_run:
handle = client.get_schedule_handle(schedule_id)
await handle.update(lambda _: ScheduleUpdate(schedule=schedule))
updated_schedule_ids.append(schedule_id)
self.log(f"Updated '{schedule_id}'")
else:
if not self.dry_run:
await client.create_schedule(schedule_id, schedule)
new_schedule_ids.append(schedule_id)
self.log(f"Created '{schedule_id}'")

self.stdout.write(
f"removed {len(removed_schedule_ids)}, "
f"updated {len(updated_schedule_ids)}, "
f"created {len(new_schedule_ids)}"
)

def handle(self, *args, **options):
self.verbose = int(options["verbosity"]) > 1
self.dry_run = options["dry_run"]
self.stdout.write(f"Syncing schedules{' [DRY RUN]' if self.dry_run else ''}...")
asyncio.run(self.sync_schedules())
111 changes: 111 additions & 0 deletions django_temporalio/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from dataclasses import dataclass, field
from functools import wraps
from typing import Callable, Sequence, Type

from django.utils.module_loading import autodiscover_modules
from temporalio.client import Schedule


class ScheduleRegistry:
_registry: dict[str, Schedule]

class AlreadyRegistered(Exception):
pass

def __init__(self):
self._init_registry()

def _init_registry(self):
self._registry = {}

def register(self, schedule_id: str, schedule: Schedule):
if schedule_id in self._registry:
raise self.AlreadyRegistered(
f"Schedule with ID '{schedule_id}' is already registered.",
)
self._registry[schedule_id] = schedule

def get_registry(self):
autodiscover_modules("schedules")
return self._registry

def clear_registry(self):
self._init_registry()


class QueueRegistry:
module_name: str
check_attr: str
_registry: dict[str, list]
_registered_object_ids: set

class MissingTemporalDecorator(Exception):
pass

def __init__(self, module_name: str, check_attr: str):
self.module_name = module_name
self.check_attr = check_attr
self._init_registry()

def _init_registry(self):
self._registry = {}
self._registered_object_ids = set()

@staticmethod
def _make_id(obj: Callable):
return f"{obj.__module__}.{obj.__name__}"

def register(self, *queue_names: str):
if not queue_names:
raise ValueError("At least one queue name must be provided.")

@wraps(*queue_names)
def decorator(obj):
if not hasattr(obj, self.check_attr):
raise self.MissingTemporalDecorator(
f"'{self._make_id(obj)}' must be decorated with 'defn' Temporal.io decorator.\n"
f"See https://github.com/temporalio/sdk-python/blob/main/README.md",
)

if (obj_id := self._make_id(obj)) not in self._registered_object_ids:
self._registered_object_ids.add(obj_id)
for queue_name in queue_names:
self._registry.setdefault(queue_name, []).append(obj)

return obj

return decorator

def clear_registry(self):
self._init_registry()

def get_registry(self):
autodiscover_modules(self.module_name)
return self._registry


schedules = ScheduleRegistry()
queue_workflows = QueueRegistry("workflows", "__temporal_workflow_definition")
queue_activities = QueueRegistry("activities", "__temporal_activity_definition")


@dataclass
class QueueRegistryItem:
workflows: Sequence[Type] = field(default_factory=list)
activities: Sequence[Callable] = field(default_factory=list)


def get_queue_registry():
"""
merges the workflows and activities registries
"""
result: dict[str, QueueRegistryItem] = {
queue_name: QueueRegistryItem(
workflows=workflows,
)
for queue_name, workflows in queue_workflows.get_registry().items()
}

for queue_name, activities in queue_activities.get_registry().items():
result.setdefault(queue_name, QueueRegistryItem()).activities = activities
return result
12 changes: 12 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
services:
app:
build: .
user: app
command: /app/manage.py test
volumes:
- .:/app:cached
environment:
SHELL: /bin/bash
IPYTHONDIR: /app/.ipython
HISTFILE: /app/.bash_history
restart: "no"
10 changes: 10 additions & 0 deletions manage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/usr/bin/env python
import os
import sys

if __name__ == "__main__":
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "dev.settings")

from django.core.management import execute_from_command_line

execute_from_command_line(sys.argv)
2 changes: 2 additions & 0 deletions requirements-ci.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
temporalio==1.5.1
setuptools==66.1.1
4 changes: 4 additions & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-r requirements-ci.txt
Django==5.0.4
bumpversion==0.6.0
build==1.2.1
11 changes: 11 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[bumpversion]
current_version = 1.0.0
commit = True
tag = True

[bumpversion:file:django_temporalio/__init__.py]
search = __version__ = "{current_version}"
replace = __version__ = "{new_version}"

[metadata]
description-file = README.md
50 changes: 50 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# -*- coding: UTF-8 -*-
import os

from setuptools import setup, find_packages

import django_temporalio


def read_file(filename):
try:
return open(os.path.join(os.path.dirname(__file__), filename)).read()
except IOError:
return ""


setup(
name=django_temporalio.__title__,
packages=find_packages(exclude=["dev*"]),
version=django_temporalio.__version__,
description=django_temporalio.__description__,
author=django_temporalio.__author__,
author_email=django_temporalio.__author_email__,
long_description=(read_file("README.md") + "\n\n" + read_file("CHANGELOG.md")),
long_description_content_type="text/markdown",
install_requires=[
"django>=4.0",
"temporalio>=1.5.1",
],
license=django_temporalio.__license__,
url=django_temporalio.__url__,
download_url="",
keywords=[
"django",
"temporal.io",
"temporal",
],
include_package_data=True,
python_requires=">=3.11",
classifiers=[
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"Environment :: Web Environment",
"Framework :: Django",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Topic :: Internet :: WWW/HTTP",
],
)

0 comments on commit 98b8f5a

Please sign in to comment.