diff --git a/python/mypy.ini b/python/mypy.ini index e95010bb1307..a2e5681ee78d 100644 --- a/python/mypy.ini +++ b/python/mypy.ini @@ -29,10 +29,6 @@ ignore_errors = true ignore_errors = true # TODO (eavanvalkenburg): remove this: https://github.com/microsoft/semantic-kernel/issues/7134 -[mypy-semantic_kernel.connectors.openapi_plugin.*] -ignore_errors = true -# TODO (eavanvalkenburg): remove this: https://github.com/microsoft/semantic-kernel/issues/7135 - [mypy-semantic_kernel.connectors.utils.*] ignore_errors = true # TODO (eavanvalkenburg): remove this: https://github.com/microsoft/semantic-kernel/issues/7136 diff --git a/python/semantic_kernel/connectors/openapi_plugin/models/rest_api_operation.py b/python/semantic_kernel/connectors/openapi_plugin/models/rest_api_operation.py index 0894781fde61..d3c95d1ae0a0 100644 --- a/python/semantic_kernel/connectors/openapi_plugin/models/rest_api_operation.py +++ b/python/semantic_kernel/connectors/openapi_plugin/models/rest_api_operation.py @@ -2,7 +2,7 @@ import re from typing import Any, Final -from urllib.parse import urlencode, urljoin, urlparse, urlunparse +from urllib.parse import ParseResult, urlencode, urljoin, urlparse, urlunparse from semantic_kernel.connectors.openapi_plugin.models.rest_api_operation_expected_response import ( RestApiOperationExpectedResponse, @@ -49,7 +49,7 @@ def __init__( self, id: str, method: str, - server_url: str, + server_url: str | ParseResult, path: str, summary: str | None = None, description: str | None = None, @@ -60,11 +60,11 @@ def __init__( """Initialize the RestApiOperation.""" self.id = id self.method = method.upper() - self.server_url = server_url + self.server_url = urlparse(server_url) if isinstance(server_url, str) else server_url self.path = path self.summary = summary self.description = description - self.parameters = params + self.parameters = params if params else [] self.request_body = request_body self.responses = responses @@ -163,7 +163,7 @@ def get_parameters( enable_payload_spacing: bool = False, ) -> list["RestApiOperationParameter"]: """Get the parameters for the operation.""" - params = list(operation.parameters) + params = list(operation.parameters) if operation.parameters is not None else [] if operation.request_body is not None: params.extend( self.get_payload_parameters( @@ -221,8 +221,8 @@ def _get_parameters_from_payload_metadata( ) -> list["RestApiOperationParameter"]: parameters: list[RestApiOperationParameter] = [] for property in properties: - parameter_name = self._get_property_name(property, root_property_name, enable_namespacing) - if not property.properties: + parameter_name = self._get_property_name(property, root_property_name or False, enable_namespacing) + if not hasattr(property, "properties") or not property.properties: parameters.append( RestApiOperationParameter( name=parameter_name, @@ -234,9 +234,16 @@ def _get_parameters_from_payload_metadata( schema=property.schema, ) ) - parameters.extend( - self._get_parameters_from_payload_metadata(property.properties, enable_namespacing, parameter_name) - ) + else: + # Handle property.properties as a single instance or a list + if isinstance(property.properties, RestApiOperationPayloadProperty): + nested_properties = [property.properties] + else: + nested_properties = property.properties + + parameters.extend( + self._get_parameters_from_payload_metadata(nested_properties, enable_namespacing, parameter_name) + ) return parameters def get_payload_parameters( @@ -246,7 +253,7 @@ def get_payload_parameters( if use_parameters_from_metadata: if operation.request_body is None: raise Exception( - f"Payload parameters cannot be retrieved from the `{operation.Id}` " + f"Payload parameters cannot be retrieved from the `{operation.id}` " f"operation payload metadata because it is missing." ) if operation.request_body.media_type == RestApiOperation.MEDIA_TYPE_TEXT_PLAIN: @@ -256,7 +263,7 @@ def get_payload_parameters( return [ self.create_payload_artificial_parameter(operation), - self.create_content_type_artificial_parameter(operation), + self.create_content_type_artificial_parameter(), ] def get_default_response( @@ -276,14 +283,25 @@ def get_default_return_parameter(self, preferred_responses: list[str] | None = N if preferred_responses is None: preferred_responses = self._preferred_responses - rest_operation_response = self.get_default_response(self.responses, preferred_responses) + responses = self.responses if self.responses is not None else {} + + rest_operation_response = self.get_default_response(responses, preferred_responses) + + schema_type = None + if rest_operation_response is not None and rest_operation_response.schema is not None: + schema_type = rest_operation_response.schema.get("type") if rest_operation_response: return KernelParameterMetadata( name="return", description=rest_operation_response.description, - type_=rest_operation_response.schema.get("type") if rest_operation_response.schema else None, + type_=schema_type, schema_data=rest_operation_response.schema, ) - return None + return KernelParameterMetadata( + name="return", + description="Default return parameter", + type_="string", + schema_data={"type": "string"}, + ) diff --git a/python/semantic_kernel/connectors/openapi_plugin/models/rest_api_operation_expected_response.py b/python/semantic_kernel/connectors/openapi_plugin/models/rest_api_operation_expected_response.py index 2cc251cbe048..3b77af349594 100644 --- a/python/semantic_kernel/connectors/openapi_plugin/models/rest_api_operation_expected_response.py +++ b/python/semantic_kernel/connectors/openapi_plugin/models/rest_api_operation_expected_response.py @@ -6,7 +6,7 @@ @experimental_class class RestApiOperationExpectedResponse: - def __init__(self, description: str, media_type: str, schema: str | None = None): + def __init__(self, description: str, media_type: str, schema: dict[str, str] | None = None): """Initialize the RestApiOperationExpectedResponse.""" self.description = description self.media_type = media_type diff --git a/python/semantic_kernel/connectors/openapi_plugin/models/rest_api_operation_run_options.py b/python/semantic_kernel/connectors/openapi_plugin/models/rest_api_operation_run_options.py index efc7d7434948..332a446bf609 100644 --- a/python/semantic_kernel/connectors/openapi_plugin/models/rest_api_operation_run_options.py +++ b/python/semantic_kernel/connectors/openapi_plugin/models/rest_api_operation_run_options.py @@ -7,7 +7,7 @@ class RestApiOperationRunOptions: """The options for running the REST API operation.""" - def __init__(self, server_url_override=None, api_host_url=None): + def __init__(self, server_url_override=None, api_host_url=None) -> None: """Initialize the REST API operation run options.""" self.server_url_override: str = server_url_override self.api_host_url: str = api_host_url diff --git a/python/semantic_kernel/connectors/openapi_plugin/openapi_manager.py b/python/semantic_kernel/connectors/openapi_plugin/openapi_manager.py index 4986072f4dcf..bc195dec1bef 100644 --- a/python/semantic_kernel/connectors/openapi_plugin/openapi_manager.py +++ b/python/semantic_kernel/connectors/openapi_plugin/openapi_manager.py @@ -46,12 +46,14 @@ def create_functions_from_openapi( list[KernelFunctionFromMethod]: the operations as functions """ parser = OpenApiParser() - parsed_doc = parser.parse(openapi_document_path) + if (parsed_doc := parser.parse(openapi_document_path)) is None: + raise FunctionExecutionException(f"Error parsing OpenAPI document: {openapi_document_path}") operations = parser.create_rest_api_operations(parsed_doc, execution_settings=execution_settings) auth_callback = None if execution_settings and execution_settings.auth_callback: auth_callback = execution_settings.auth_callback + openapi_runner = OpenApiRunner( parsed_openapi_document=parsed_doc, auth_callback=auth_callback, @@ -129,11 +131,13 @@ async def run_openapi_operation( description=f"{p.description or p.name}", default_value=p.default_value or "", is_required=p.is_required, - type_=p.type if p.type is not None else TYPE_MAPPING.get(p.type, None), + type_=p.type if p.type is not None else TYPE_MAPPING.get(p.type, "object"), schema_data=( p.schema if p.schema is not None and isinstance(p.schema, dict) - else {"type": f"{p.type}"} if p.type else None + else {"type": f"{p.type}"} + if p.type + else None ), ) for p in rest_operation_params diff --git a/python/semantic_kernel/connectors/openapi_plugin/openapi_parser.py b/python/semantic_kernel/connectors/openapi_plugin/openapi_parser.py index 05ce5c4c821c..85f13a096908 100644 --- a/python/semantic_kernel/connectors/openapi_plugin/openapi_parser.py +++ b/python/semantic_kernel/connectors/openapi_plugin/openapi_parser.py @@ -118,13 +118,19 @@ def _get_payload_properties(self, operation_id, schema, required_properties, lev def _create_rest_api_operation_payload( self, operation_id: str, request_body: dict[str, Any] - ) -> RestApiOperationPayload: + ) -> RestApiOperationPayload | None: if request_body is None or request_body.get("content") is None: return None - media_type = next((mt for mt in OpenApiParser.SUPPORTED_MEDIA_TYPES if mt in request_body.get("content")), None) + + content = request_body.get("content") + if content is None: + return None + + media_type = next((mt for mt in OpenApiParser.SUPPORTED_MEDIA_TYPES if mt in content), None) if media_type is None: raise Exception(f"Neither of the media types of {operation_id} is supported.") - media_type_metadata = request_body.get("content")[media_type] + + media_type_metadata = content[media_type] payload_properties = self._get_payload_properties( operation_id, media_type_metadata["schema"], media_type_metadata["schema"].get("required", set()) ) diff --git a/python/semantic_kernel/connectors/openapi_plugin/openapi_runner.py b/python/semantic_kernel/connectors/openapi_plugin/openapi_runner.py index 11ddd06452d2..951a2c4d69fc 100644 --- a/python/semantic_kernel/connectors/openapi_plugin/openapi_runner.py +++ b/python/semantic_kernel/connectors/openapi_plugin/openapi_runner.py @@ -3,7 +3,8 @@ import json import logging from collections import OrderedDict -from collections.abc import Callable, Mapping +from collections.abc import Awaitable, Callable, Mapping +from inspect import isawaitable from typing import Any from urllib.parse import urlparse, urlunparse @@ -34,13 +35,13 @@ class OpenApiRunner: def __init__( self, parsed_openapi_document: Mapping[str, str], - auth_callback: Callable[[dict[str, str]], dict[str, str]] | None = None, + auth_callback: Callable[..., dict[str, str] | Awaitable[dict[str, str]]] | None = None, http_client: httpx.AsyncClient | None = None, enable_dynamic_payload: bool = True, enable_payload_namespacing: bool = False, ): """Initialize the OpenApiRunner.""" - self.spec = Spec.from_dict(parsed_openapi_document) + self.spec = Spec.from_dict(parsed_openapi_document) # type: ignore self.auth_callback = auth_callback self.http_client = http_client self.enable_dynamic_payload = enable_dynamic_payload @@ -99,11 +100,17 @@ def build_json_object(self, properties, arguments, property_namespace=None): ) return result - def build_operation_payload(self, operation: RestApiOperation, arguments: KernelArguments) -> tuple[str, str]: + def build_operation_payload( + self, operation: RestApiOperation, arguments: KernelArguments + ) -> tuple[str, str] | tuple[None, None]: """Build the operation payload.""" if operation.request_body is None and self.payload_argument_name not in arguments: return None, None - return self.build_json_payload(operation.request_body, arguments) + + if operation.request_body is not None: + return self.build_json_payload(operation.request_body, arguments) + + return None, None def get_argument_name_for_payload(self, property_name, property_namespace=None): """Get argument name for the payload.""" @@ -111,7 +118,9 @@ def get_argument_name_for_payload(self, property_name, property_namespace=None): return property_name return f"{property_namespace}.{property_name}" if property_namespace else property_name - def _get_first_response_media_type(self, responses: OrderedDict[str, RestApiOperationExpectedResponse]) -> str: + def _get_first_response_media_type( + self, responses: OrderedDict[str, RestApiOperationExpectedResponse] | None + ) -> str: if responses: first_response = next(iter(responses.values())) return first_response.media_type if first_response.media_type else self.media_type_application_json @@ -123,30 +132,36 @@ async def run_operation( arguments: KernelArguments | None = None, options: RestApiOperationRunOptions | None = None, ) -> str: - """Run the operation.""" + """Runs the operation defined in the OpenAPI manifest.""" + if not arguments: + arguments = KernelArguments() url = self.build_operation_url( operation=operation, arguments=arguments, - server_url_override=options.server_url_override, - api_host_url=options.api_host_url, + server_url_override=options.server_url_override if options else None, + api_host_url=options.api_host_url if options else None, ) headers = operation.build_headers(arguments=arguments) payload, _ = self.build_operation_payload(operation=operation, arguments=arguments) - """Runs the operation defined in the OpenAPI manifest""" - if headers is None: - headers = {} - if self.auth_callback: - headers_update = await self.auth_callback(headers=headers) - headers.update(headers_update) + headers_update = self.auth_callback(**headers) + if isawaitable(headers_update): + headers_update = await headers_update + # at this point, headers_update is a valid dictionary + headers.update(headers_update) # type: ignore if APP_INFO: headers.update(APP_INFO) headers = prepend_semantic_kernel_to_user_agent(headers) if "Content-Type" not in headers: - headers["Content-Type"] = self._get_first_response_media_type(operation.responses) + responses = ( + operation.responses + if isinstance(operation.responses, OrderedDict) + else OrderedDict(operation.responses or {}) + ) + headers["Content-Type"] = self._get_first_response_media_type(responses) async def fetch(): async def make_request(client: httpx.AsyncClient): diff --git a/python/tests/unit/connectors/openapi/test_openapi_manager.py b/python/tests/unit/connectors/openapi/test_openapi_manager.py new file mode 100644 index 000000000000..63fef583c1ff --- /dev/null +++ b/python/tests/unit/connectors/openapi/test_openapi_manager.py @@ -0,0 +1,224 @@ +# Copyright (c) Microsoft. All rights reserved. + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from semantic_kernel.connectors.openapi_plugin.models.rest_api_operation_parameter import ( + RestApiOperationParameter, + RestApiOperationParameterLocation, +) +from semantic_kernel.connectors.openapi_plugin.openapi_manager import ( + _create_function_from_operation, +) +from semantic_kernel.exceptions import FunctionExecutionException +from semantic_kernel.functions.kernel_function_decorator import kernel_function +from semantic_kernel.functions.kernel_parameter_metadata import KernelParameterMetadata +from semantic_kernel.kernel import Kernel + + +@pytest.mark.asyncio +async def test_run_openapi_operation_success(kernel: Kernel): + runner = AsyncMock() + operation = MagicMock() + operation.id = "test_operation" + operation.summary = "Test Summary" + operation.description = "Test Description" + operation.get_parameters.return_value = [ + RestApiOperationParameter( + name="param1", type="string", location=RestApiOperationParameterLocation.QUERY, is_required=True + ) + ] + + execution_parameters = MagicMock() + execution_parameters.server_url_override = "https://override.com" + execution_parameters.enable_dynamic_payload = True + execution_parameters.enable_payload_namespacing = False + + plugin_name = "TestPlugin" + document_uri = "https://document.com" + + run_operation_mock = AsyncMock(return_value="Operation Result") + runner.run_operation = run_operation_mock + + with patch.object( + operation, + "get_default_return_parameter", + return_value=KernelParameterMetadata( + name="return", + description="Return description", + default_value=None, + type_="string", + type_object=None, + is_required=False, + schema_data={"type": "string"}, + ), + ): + + @kernel_function(description=operation.summary, name=operation.id) + async def run_openapi_operation(kernel, **kwargs): + return await _create_function_from_operation( + runner, operation, plugin_name, execution_parameters, document_uri + )(kernel, **kwargs) + + kwargs = {"param1": "value1"} + + result = await run_openapi_operation(kernel, **kwargs) + assert str(result) == "Operation Result" + run_operation_mock.assert_called_once() + + +@pytest.mark.asyncio +async def test_run_openapi_operation_missing_required_param(kernel: Kernel): + runner = AsyncMock() + operation = MagicMock() + operation.id = "test_operation" + operation.summary = "Test Summary" + operation.description = "Test Description" + operation.get_parameters.return_value = [ + RestApiOperationParameter( + name="param1", type="string", location=RestApiOperationParameterLocation.QUERY, is_required=True + ) + ] + + execution_parameters = MagicMock() + execution_parameters.server_url_override = "https://override.com" + execution_parameters.enable_dynamic_payload = True + execution_parameters.enable_payload_namespacing = False + + plugin_name = "TestPlugin" + document_uri = "https://document.com" + + with patch.object( + operation, + "get_default_return_parameter", + return_value=KernelParameterMetadata( + name="return", + description="Return description", + default_value=None, + type_="string", + type_object=None, + is_required=False, + schema_data={"type": "string"}, + ), + ): + + @kernel_function(description=operation.summary, name=operation.id) + async def run_openapi_operation(kernel, **kwargs): + return await _create_function_from_operation( + runner, operation, plugin_name, execution_parameters, document_uri + )(kernel, **kwargs) + + kwargs = {} + + with pytest.raises( + FunctionExecutionException, + match="Parameter param1 is required but not provided in the arguments", + ): + await run_openapi_operation(kernel, **kwargs) + + +@pytest.mark.asyncio +async def test_run_openapi_operation_runner_exception(kernel: Kernel): + runner = AsyncMock() + operation = MagicMock() + operation.id = "test_operation" + operation.summary = "Test Summary" + operation.description = "Test Description" + operation.get_parameters.return_value = [ + RestApiOperationParameter( + name="param1", type="string", location=RestApiOperationParameterLocation.QUERY, is_required=True + ) + ] + + execution_parameters = MagicMock() + execution_parameters.server_url_override = "https://override.com" + execution_parameters.enable_dynamic_payload = True + execution_parameters.enable_payload_namespacing = False + + plugin_name = "TestPlugin" + document_uri = "https://document.com" + + run_operation_mock = AsyncMock(side_effect=Exception("Runner Exception")) + runner.run_operation = run_operation_mock + + with patch.object( + operation, + "get_default_return_parameter", + return_value=KernelParameterMetadata( + name="return", + description="Return description", + default_value=None, + type_="string", + type_object=None, + is_required=False, + schema_data={"type": "string"}, + ), + ): + + @kernel_function(description=operation.summary, name=operation.id) + async def run_openapi_operation(kernel, **kwargs): + return await _create_function_from_operation( + runner, operation, plugin_name, execution_parameters, document_uri + )(kernel, **kwargs) + + kwargs = {"param1": "value1"} + + with pytest.raises(FunctionExecutionException, match="Error running OpenAPI operation: test_operation"): + await run_openapi_operation(kernel, **kwargs) + + +@pytest.mark.asyncio +async def test_run_openapi_operation_alternative_name(kernel: Kernel): + runner = AsyncMock() + operation = MagicMock() + operation.id = "test_operation" + operation.summary = "Test Summary" + operation.description = "Test Description" + operation.get_parameters.return_value = [ + RestApiOperationParameter( + name="param1", + type="string", + location=RestApiOperationParameterLocation.QUERY, + is_required=True, + alternative_name="alt_param1", + ) + ] + + execution_parameters = MagicMock() + execution_parameters.server_url_override = "https://override.com" + execution_parameters.enable_dynamic_payload = True + execution_parameters.enable_payload_namespacing = False + + plugin_name = "TestPlugin" + document_uri = "https://document.com" + + run_operation_mock = AsyncMock(return_value="Operation Result") + runner.run_operation = run_operation_mock + + with patch.object( + operation, + "get_default_return_parameter", + return_value=KernelParameterMetadata( + name="return", + description="Return description", + default_value=None, + type_="string", + type_object=None, + is_required=False, + schema_data={"type": "string"}, + ), + ): + + @kernel_function(description=operation.summary, name=operation.id) + async def run_openapi_operation(kernel, **kwargs): + return await _create_function_from_operation( + runner, operation, plugin_name, execution_parameters, document_uri + )(kernel, **kwargs) + + kwargs = {"alt_param1": "value1"} + + result = await run_openapi_operation(kernel, **kwargs) + assert str(result) == "Operation Result" + run_operation_mock.assert_called_once() + assert runner.run_operation.call_args[0][1]["param1"] == "value1" diff --git a/python/tests/unit/connectors/openapi/test_openapi_parser.py b/python/tests/unit/connectors/openapi/test_openapi_parser.py new file mode 100644 index 000000000000..71548537e30a --- /dev/null +++ b/python/tests/unit/connectors/openapi/test_openapi_parser.py @@ -0,0 +1,51 @@ +# Copyright (c) Microsoft. All rights reserved. + + +import pytest + +from semantic_kernel.connectors.openapi_plugin.openapi_manager import OpenApiParser +from semantic_kernel.exceptions.function_exceptions import PluginInitializationError + + +def test_parse_parameters_missing_in_field(): + parser = OpenApiParser() + parameters = [{"name": "param1", "schema": {"type": "string"}}] + with pytest.raises(PluginInitializationError, match="Parameter param1 is missing 'in' field"): + parser._parse_parameters(parameters) + + +def test_get_payload_properties_schema_none(): + parser = OpenApiParser() + properties = parser._get_payload_properties("operation_id", None, []) + assert properties == [] + + +def test_get_payload_properties_hierarchy_max_depth_exceeded(): + parser = OpenApiParser() + schema = { + "properties": { + "prop1": { + "type": "object", + "properties": { + "prop2": { + "type": "object", + "properties": { + # Nested properties to exceed max depth + }, + } + }, + } + } + } + with pytest.raises( + Exception, + match=f"Max level {OpenApiParser.PAYLOAD_PROPERTIES_HIERARCHY_MAX_DEPTH} of traversing payload properties of `operation_id` operation is exceeded.", # noqa: E501 + ): + parser._get_payload_properties("operation_id", schema, [], level=11) + + +def test_create_rest_api_operation_payload_media_type_none(): + parser = OpenApiParser() + request_body = {"content": {"application/xml": {"schema": {"type": "object"}}}} + with pytest.raises(Exception, match="Neither of the media types of operation_id is supported."): + parser._create_rest_api_operation_payload("operation_id", request_body) diff --git a/python/tests/unit/connectors/openapi/test_openapi_runner.py b/python/tests/unit/connectors/openapi/test_openapi_runner.py new file mode 100644 index 000000000000..43955661d6d2 --- /dev/null +++ b/python/tests/unit/connectors/openapi/test_openapi_runner.py @@ -0,0 +1,307 @@ +# Copyright (c) Microsoft. All rights reserved. + +from collections import OrderedDict +from unittest.mock import AsyncMock, MagicMock, Mock + +import pytest + +from semantic_kernel.connectors.openapi_plugin.models.rest_api_operation import RestApiOperation +from semantic_kernel.connectors.openapi_plugin.models.rest_api_operation_payload import RestApiOperationPayload +from semantic_kernel.connectors.openapi_plugin.openapi_manager import OpenApiRunner +from semantic_kernel.exceptions import FunctionExecutionException + + +def test_build_full_url(): + runner = OpenApiRunner({}) + base_url = "http://example.com" + query_string = "param1=value1¶m2=value2" + expected_url = "http://example.com?param1=value1¶m2=value2" + assert runner.build_full_url(base_url, query_string) == expected_url + + +def test_build_operation_url(): + runner = OpenApiRunner({}) + operation = MagicMock() + operation.build_operation_url.return_value = "http://example.com" + operation.build_query_string.return_value = "param1=value1" + arguments = {} + expected_url = "http://example.com?param1=value1" + assert runner.build_operation_url(operation, arguments) == expected_url + + +def test_build_json_payload_dynamic_payload(): + runner = OpenApiRunner({}, enable_dynamic_payload=True) + payload_metadata = RestApiOperationPayload( + media_type="application/json", + properties=["property1", "property2"], + description=None, + schema=None, + ) + arguments = {"property1": "value1", "property2": "value2"} + + runner.build_json_object = MagicMock(return_value={"property1": "value1", "property2": "value2"}) + + content, media_type = runner.build_json_payload(payload_metadata, arguments) + + runner.build_json_object.assert_called_once_with(payload_metadata.properties, arguments) + assert content == '{"property1": "value1", "property2": "value2"}' + assert media_type == "application/json" + + +def test_build_json_payload_no_metadata(): + runner = OpenApiRunner({}, enable_dynamic_payload=True) + arguments = {} + + with pytest.raises( + FunctionExecutionException, match="Payload can't be built dynamically due to the missing payload metadata." + ): + runner.build_json_payload(None, arguments) + + +def test_build_json_payload_static_payload(): + runner = OpenApiRunner({}, enable_dynamic_payload=False) + arguments = {runner.payload_argument_name: '{"key": "value"}'} + + content, media_type = runner.build_json_payload(None, arguments) + + assert content == '{"key": "value"}' + assert media_type == '{"key": "value"}' + + +def test_build_json_payload_no_payload(): + runner = OpenApiRunner({}, enable_dynamic_payload=False) + arguments = {} + + with pytest.raises( + FunctionExecutionException, match=f"No payload is provided by the argument '{runner.payload_argument_name}'." + ): + runner.build_json_payload(None, arguments) + + +def test_build_json_object(): + runner = OpenApiRunner({}) + properties = [MagicMock()] + properties[0].name = "prop1" + properties[0].type = "string" + properties[0].is_required = True + properties[0].properties = [] + arguments = {"prop1": "value1"} + result = runner.build_json_object(properties, arguments) + assert result == {"prop1": "value1"} + + +def test_build_json_object_missing_required_argument(): + runner = OpenApiRunner({}) + properties = [MagicMock()] + properties[0].name = "prop1" + properties[0].type = "string" + properties[0].is_required = True + properties[0].properties = [] + arguments = {} + with pytest.raises(FunctionExecutionException, match="No argument is found for the 'prop1' payload property."): + runner.build_json_object(properties, arguments) + + +def test_build_json_object_recursive(): + runner = OpenApiRunner({}) + + nested_property1 = Mock() + nested_property1.name = "property1.nested_property1" + nested_property1.type = "string" + nested_property1.is_required = True + nested_property1.properties = [] + + nested_property2 = Mock() + nested_property2.name = "property2.nested_property2" + nested_property2.type = "integer" + nested_property2.is_required = False + nested_property2.properties = [] + + nested_properties = [nested_property1, nested_property2] + + property1 = Mock() + property1.name = "property1" + property1.type = "object" + property1.properties = nested_properties + property1.is_required = True + + property2 = Mock() + property2.name = "property2" + property2.type = "string" + property2.is_required = False + property2.properties = [] + + properties = [property1, property2] + + arguments = { + "property1.nested_property1": "nested_value1", + "property1.nested_property2": 123, + "property2": "value2", + } + + result = runner.build_json_object(properties, arguments) + + expected_result = {"property1": {"property1.nested_property1": "nested_value1"}, "property2": "value2"} + + assert result == expected_result + + +def test_build_json_object_recursive_missing_required_argument(): + runner = OpenApiRunner({}) + + nested_property1 = MagicMock() + nested_property1.name = "nested_property1" + nested_property1.type = "string" + nested_property1.is_required = True + + nested_property2 = MagicMock() + nested_property2.name = "nested_property2" + nested_property2.type = "integer" + nested_property2.is_required = False + + nested_properties = [nested_property1, nested_property2] + + property1 = MagicMock() + property1.name = "property1" + property1.type = "object" + property1.properties = nested_properties + property1.is_required = True + + property2 = MagicMock() + property2.name = "property2" + property2.type = "string" + property2.is_required = False + + properties = [property1, property2] + + arguments = { + "property1.nested_property2": 123, + "property2": "value2", + } + + with pytest.raises( + FunctionExecutionException, match="No argument is found for the 'nested_property1' payload property." + ): + runner.build_json_object(properties, arguments) + + +def test_build_operation_payload_no_request_body(): + runner = OpenApiRunner({}) + operation = MagicMock() + operation.request_body = None + arguments = {} + assert runner.build_operation_payload(operation, arguments) == (None, None) + + +def test_get_argument_name_for_payload_no_namespacing(): + runner = OpenApiRunner({}, enable_payload_namespacing=False) + assert runner.get_argument_name_for_payload("prop1") == "prop1" + + +def test_get_argument_name_for_payload_with_namespacing(): + runner = OpenApiRunner({}, enable_payload_namespacing=True) + assert runner.get_argument_name_for_payload("prop1", "namespace") == "namespace.prop1" + + +def test_build_operation_payload_with_request_body(): + runner = OpenApiRunner({}) + + request_body = RestApiOperationPayload( + media_type="application/json", + properties=["property1", "property2"], + description=None, + schema=None, + ) + operation = Mock(spec=RestApiOperation) + operation.request_body = request_body + + arguments = {"property1": "value1", "property2": "value2"} + + runner.build_json_payload = MagicMock( + return_value=('{"property1": "value1", "property2": "value2"}', "application/json") + ) + + payload, media_type = runner.build_operation_payload(operation, arguments) + + runner.build_json_payload.assert_called_once_with(request_body, arguments) + assert payload == '{"property1": "value1", "property2": "value2"}' + assert media_type == "application/json" + + +def test_build_operation_payload_without_request_body(): + runner = OpenApiRunner({}) + + operation = Mock(spec=RestApiOperation) + operation.request_body = None + + arguments = {runner.payload_argument_name: '{"property1": "value1"}'} + + runner.build_json_payload = MagicMock(return_value=('{"property1": "value1"}', "application/json")) + + payload, media_type = runner.build_operation_payload(operation, arguments) + + runner.build_json_payload.assert_not_called() + assert payload is None + assert media_type is None + + +def test_build_operation_payload_no_request_body_no_payload_argument(): + runner = OpenApiRunner({}) + + operation = Mock(spec=RestApiOperation) + operation.request_body = None + + arguments = {} + + payload, media_type = runner.build_operation_payload(operation, arguments) + + assert payload is None + assert media_type is None + + +def test_get_first_response_media_type(): + runner = OpenApiRunner({}) + responses = OrderedDict() + response = MagicMock() + response.media_type = "application/xml" + responses["200"] = response + assert runner._get_first_response_media_type(responses) == "application/xml" + + +def test_get_first_response_media_type_default(): + runner = OpenApiRunner({}) + responses = OrderedDict() + assert runner._get_first_response_media_type(responses) == runner.media_type_application_json + + +@pytest.mark.asyncio +async def test_run_operation(): + runner = OpenApiRunner({}) + operation = MagicMock() + arguments = {} + options = MagicMock() + options.server_url_override = None + options.api_host_url = None + operation.build_headers.return_value = {"header": "value"} + operation.method = "GET" + runner.build_operation_url = MagicMock(return_value="http://example.com") + runner.build_operation_payload = MagicMock(return_value=('{"key": "value"}', "application/json")) + + response = MagicMock() + response.media_type = "application/json" + operation.responses = OrderedDict([("200", response)]) + + async def mock_request(*args, **kwargs): + response = MagicMock() + response.text = "response text" + return response + + runner.http_client = AsyncMock() + runner.http_client.request = mock_request + + runner.auth_callback = AsyncMock(return_value={"Authorization": "Bearer token"}) + + runner.http_client.headers = {"header": "client-value"} + + result = await runner.run_operation(operation, arguments, options) + assert result == "response text" diff --git a/python/tests/unit/connectors/openapi/test_rest_api_operation_run_options.py b/python/tests/unit/connectors/openapi/test_rest_api_operation_run_options.py new file mode 100644 index 000000000000..29df73cc7040 --- /dev/null +++ b/python/tests/unit/connectors/openapi/test_rest_api_operation_run_options.py @@ -0,0 +1,20 @@ +# Copyright (c) Microsoft. All rights reserved. + +from semantic_kernel.connectors.openapi_plugin.models.rest_api_operation_run_options import RestApiOperationRunOptions + + +def test_initialization(): + server_url_override = "http://example.com" + api_host_url = "http://example.com" + + rest_api_operation_run_options = RestApiOperationRunOptions(server_url_override, api_host_url) + + assert rest_api_operation_run_options.server_url_override == server_url_override + assert rest_api_operation_run_options.api_host_url == api_host_url + + +def test_initialization_no_params(): + rest_api_operation_run_options = RestApiOperationRunOptions() + + assert rest_api_operation_run_options.server_url_override is None + assert rest_api_operation_run_options.api_host_url is None diff --git a/python/tests/unit/connectors/openapi/test_rest_api_uri.py b/python/tests/unit/connectors/openapi/test_rest_api_uri.py new file mode 100644 index 000000000000..6bbb90b96f4b --- /dev/null +++ b/python/tests/unit/connectors/openapi/test_rest_api_uri.py @@ -0,0 +1,30 @@ +# Copyright (c) Microsoft. All rights reserved. + +from semantic_kernel.connectors.openapi_plugin.models.rest_api_uri import Uri + + +def test_uri_initialization(): + test_uri = "https://example.com/path?query=param" + uri_instance = Uri(test_uri) + assert uri_instance.uri == test_uri + + +def test_get_left_part(): + test_uri = "https://example.com/path?query=param" + expected_left_part = "https://example.com" + uri_instance = Uri(test_uri) + assert uri_instance.get_left_part() == expected_left_part + + +def test_get_left_part_no_scheme(): + test_uri = "example.com/path?query=param" + expected_left_part = "://" + uri_instance = Uri(test_uri) + assert uri_instance.get_left_part() == expected_left_part + + +def test_get_left_part_no_netloc(): + test_uri = "https:///path?query=param" + expected_left_part = "https://" + uri_instance = Uri(test_uri) + assert uri_instance.get_left_part() == expected_left_part diff --git a/python/tests/unit/connectors/openapi/test_sk_openapi.py b/python/tests/unit/connectors/openapi/test_sk_openapi.py index f8ed025f58ea..45229b6f1630 100644 --- a/python/tests/unit/connectors/openapi/test_sk_openapi.py +++ b/python/tests/unit/connectors/openapi/test_sk_openapi.py @@ -2,15 +2,31 @@ import os from unittest.mock import patch +from urllib.parse import urlparse import pytest import yaml from openapi_core import Spec +from semantic_kernel.connectors.openapi_plugin.models.rest_api_operation_expected_response import ( + RestApiOperationExpectedResponse, +) +from semantic_kernel.connectors.openapi_plugin.models.rest_api_operation_parameter import ( + RestApiOperationParameter, + RestApiOperationParameterLocation, +) +from semantic_kernel.connectors.openapi_plugin.models.rest_api_operation_parameter_style import ( + RestApiOperationParameterStyle, +) +from semantic_kernel.connectors.openapi_plugin.models.rest_api_operation_payload import RestApiOperationPayload +from semantic_kernel.connectors.openapi_plugin.models.rest_api_operation_payload_property import ( + RestApiOperationPayloadProperty, +) from semantic_kernel.connectors.openapi_plugin.openapi_function_execution_parameters import ( OpenAPIFunctionExecutionParameters, ) from semantic_kernel.connectors.openapi_plugin.openapi_manager import OpenApiParser, OpenApiRunner, RestApiOperation +from semantic_kernel.exceptions import FunctionExecutionException directory = os.path.dirname(os.path.realpath(__file__)) openapi_document = directory + "/openapi.yaml" @@ -102,6 +118,510 @@ def test_parse_invalid_format(): parser.parse(invalid_openapi_document) +def test_url_join_with_trailing_slash(): + operation = RestApiOperation(id="test", method="GET", server_url="https://example.com/", path="test/path") + base_url = "https://example.com/" + path = "test/path" + expected_url = "https://example.com/test/path" + assert operation.url_join(base_url, path) == expected_url + + +def test_url_join_without_trailing_slash(): + operation = RestApiOperation(id="test", method="GET", server_url="https://example.com", path="test/path") + base_url = "https://example.com" + path = "test/path" + expected_url = "https://example.com/test/path" + assert operation.url_join(base_url, path) == expected_url + + +def test_url_join_base_path_with_path(): + operation = RestApiOperation(id="test", method="GET", server_url="https://example.com/base/", path="test/path") + base_url = "https://example.com/base/" + path = "test/path" + expected_url = "https://example.com/base/test/path" + assert operation.url_join(base_url, path) == expected_url + + +def test_url_join_with_leading_slash_in_path(): + operation = RestApiOperation(id="test", method="GET", server_url="https://example.com/", path="/test/path") + base_url = "https://example.com/" + path = "/test/path" + expected_url = "https://example.com/test/path" + assert operation.url_join(base_url, path) == expected_url + + +def test_url_join_base_path_without_trailing_slash(): + operation = RestApiOperation(id="test", method="GET", server_url="https://example.com/base", path="test/path") + base_url = "https://example.com/base" + path = "test/path" + expected_url = "https://example.com/base/test/path" + assert operation.url_join(base_url, path) == expected_url + + +def test_build_headers_with_required_parameter(): + parameters = [ + RestApiOperationParameter( + name="Authorization", type="string", location=RestApiOperationParameterLocation.HEADER, is_required=True + ) + ] + operation = RestApiOperation( + id="test", method="GET", server_url="https://example.com", path="test/path", params=parameters + ) + arguments = {"Authorization": "Bearer token"} + expected_headers = {"Authorization": "Bearer token"} + assert operation.build_headers(arguments) == expected_headers + + +def test_build_headers_missing_required_parameter(): + parameters = [ + RestApiOperationParameter( + name="Authorization", type="string", location=RestApiOperationParameterLocation.HEADER, is_required=True + ) + ] + operation = RestApiOperation( + id="test", method="GET", server_url="https://example.com", path="test/path", params=parameters + ) + arguments = {} + with pytest.raises( + FunctionExecutionException, + match="No argument is provided for the `Authorization` required parameter of the operation - `test`.", + ): + operation.build_headers(arguments) + + +def test_build_headers_with_optional_parameter(): + parameters = [ + RestApiOperationParameter( + name="Authorization", type="string", location=RestApiOperationParameterLocation.HEADER, is_required=False + ) + ] + operation = RestApiOperation( + id="test", method="GET", server_url="https://example.com", path="test/path", params=parameters + ) + arguments = {"Authorization": "Bearer token"} + expected_headers = {"Authorization": "Bearer token"} + assert operation.build_headers(arguments) == expected_headers + + +def test_build_headers_missing_optional_parameter(): + parameters = [ + RestApiOperationParameter( + name="Authorization", type="string", location=RestApiOperationParameterLocation.HEADER, is_required=False + ) + ] + operation = RestApiOperation( + id="test", method="GET", server_url="https://example.com", path="test/path", params=parameters + ) + arguments = {} + expected_headers = {} + assert operation.build_headers(arguments) == expected_headers + + +def test_build_headers_multiple_parameters(): + parameters = [ + RestApiOperationParameter( + name="Authorization", type="string", location=RestApiOperationParameterLocation.HEADER, is_required=True + ), + RestApiOperationParameter( + name="Content-Type", type="string", location=RestApiOperationParameterLocation.HEADER, is_required=False + ), + ] + operation = RestApiOperation( + id="test", method="GET", server_url="https://example.com", path="test/path", params=parameters + ) + arguments = {"Authorization": "Bearer token", "Content-Type": "application/json"} + expected_headers = {"Authorization": "Bearer token", "Content-Type": "application/json"} + assert operation.build_headers(arguments) == expected_headers + + +def test_build_operation_url_with_override(): + parameters = [ + RestApiOperationParameter( + name="id", type="string", location=RestApiOperationParameterLocation.PATH, is_required=True + ) + ] + operation = RestApiOperation( + id="test", method="GET", server_url="https://example.com/", path="/resource/{id}", params=parameters + ) + arguments = {"id": "123"} + server_url_override = urlparse("https://override.com") + expected_url = "https://override.com/resource/123" + assert operation.build_operation_url(arguments, server_url_override=server_url_override) == expected_url + + +def test_build_operation_url_without_override(): + parameters = [ + RestApiOperationParameter( + name="id", type="string", location=RestApiOperationParameterLocation.PATH, is_required=True + ) + ] + operation = RestApiOperation( + id="test", method="GET", server_url="https://example.com/", path="/resource/{id}", params=parameters + ) + arguments = {"id": "123"} + expected_url = "https://example.com/resource/123" + assert operation.build_operation_url(arguments) == expected_url + + +def test_get_server_url_with_override(): + operation = RestApiOperation(id="test", method="GET", server_url="https://example.com", path="/resource/{id}") + server_url_override = urlparse("https://override.com") + expected_url = "https://override.com/" + assert operation.get_server_url(server_url_override=server_url_override).geturl() == expected_url + + +def test_get_server_url_without_override(): + operation = RestApiOperation(id="test", method="GET", server_url="https://example.com", path="/resource/{id}") + expected_url = "https://example.com/" + assert operation.get_server_url().geturl() == expected_url + + +def test_build_path_with_required_parameter(): + parameters = [ + RestApiOperationParameter( + name="id", type="string", location=RestApiOperationParameterLocation.PATH, is_required=True + ) + ] + operation = RestApiOperation( + id="test", method="GET", server_url="https://example.com/", path="/resource/{id}", params=parameters + ) + arguments = {"id": "123"} + expected_path = "/resource/123" + assert operation.build_path(operation.path, arguments) == expected_path + + +def test_build_path_missing_required_parameter(): + parameters = [ + RestApiOperationParameter( + name="id", type="string", location=RestApiOperationParameterLocation.PATH, is_required=True + ) + ] + operation = RestApiOperation( + id="test", method="GET", server_url="https://example.com/", path="/resource/{id}", params=parameters + ) + arguments = {} + with pytest.raises( + FunctionExecutionException, + match="No argument is provided for the `id` required parameter of the operation - `test`.", + ): + operation.build_path(operation.path, arguments) + + +def test_build_path_with_optional_and_required_parameters(): + parameters = [ + RestApiOperationParameter( + name="id", type="string", location=RestApiOperationParameterLocation.PATH, is_required=True + ), + RestApiOperationParameter( + name="optional", type="string", location=RestApiOperationParameterLocation.PATH, is_required=False + ), + ] + operation = RestApiOperation( + id="test", method="GET", server_url="https://example.com/", path="/resource/{id}/{optional}", params=parameters + ) + arguments = {"id": "123"} + expected_path = "/resource/123/{optional}" + assert operation.build_path(operation.path, arguments) == expected_path + + +def test_build_query_string_with_required_parameter(): + parameters = [ + RestApiOperationParameter( + name="query", type="string", location=RestApiOperationParameterLocation.QUERY, is_required=True + ) + ] + operation = RestApiOperation( + id="test", method="GET", server_url="https://example.com/", path="/resource", params=parameters + ) + arguments = {"query": "value"} + expected_query_string = "query=value" + assert operation.build_query_string(arguments) == expected_query_string + + +def test_build_query_string_missing_required_parameter(): + parameters = [ + RestApiOperationParameter( + name="query", type="string", location=RestApiOperationParameterLocation.QUERY, is_required=True + ) + ] + operation = RestApiOperation( + id="test", method="GET", server_url="https://example.com/", path="/resource", params=parameters + ) + arguments = {} + with pytest.raises( + FunctionExecutionException, + match="No argument or value is provided for the `query` required parameter of the operation - `test`.", + ): + operation.build_query_string(arguments) + + +def test_build_query_string_with_optional_and_required_parameters(): + parameters = [ + RestApiOperationParameter( + name="required_param", type="string", location=RestApiOperationParameterLocation.QUERY, is_required=True + ), + RestApiOperationParameter( + name="optional_param", type="string", location=RestApiOperationParameterLocation.QUERY, is_required=False + ), + ] + operation = RestApiOperation( + id="test", method="GET", server_url="https://example.com/", path="/resource", params=parameters + ) + arguments = {"required_param": "required_value"} + expected_query_string = "required_param=required_value" + assert operation.build_query_string(arguments) == expected_query_string + + +def test_create_payload_artificial_parameter_with_text_plain(): + properties = [ + RestApiOperationPayloadProperty( + name="prop1", + type="string", + properties=[], + description="Property description", + is_required=True, + default_value=None, + schema=None, + ) + ] + request_body = RestApiOperationPayload( + media_type=RestApiOperation.MEDIA_TYPE_TEXT_PLAIN, + properties=properties, + description="Test description", + schema="Test schema", + ) + operation = RestApiOperation( + id="test", method="POST", server_url="https://example.com/", path="/resource", request_body=request_body + ) + expected_parameter = RestApiOperationParameter( + name=operation.PAYLOAD_ARGUMENT_NAME, + type="string", + is_required=True, + location=RestApiOperationParameterLocation.BODY, + style=RestApiOperationParameterStyle.SIMPLE, + description="Test description", + schema="Test schema", + ) + parameter = operation.create_payload_artificial_parameter(operation) + assert parameter.name == expected_parameter.name + assert parameter.type == expected_parameter.type + assert parameter.is_required == expected_parameter.is_required + assert parameter.location == expected_parameter.location + assert parameter.style == expected_parameter.style + assert parameter.description == expected_parameter.description + assert parameter.schema == expected_parameter.schema + + +def test_create_payload_artificial_parameter_with_object(): + properties = [ + RestApiOperationPayloadProperty( + name="prop1", + type="string", + properties=[], + description="Property description", + is_required=True, + default_value=None, + schema=None, + ) + ] + request_body = RestApiOperationPayload( + media_type="application/json", properties=properties, description="Test description", schema="Test schema" + ) + operation = RestApiOperation( + id="test", method="POST", server_url="https://example.com/", path="/resource", request_body=request_body + ) + expected_parameter = RestApiOperationParameter( + name=operation.PAYLOAD_ARGUMENT_NAME, + type="object", + is_required=True, + location=RestApiOperationParameterLocation.BODY, + style=RestApiOperationParameterStyle.SIMPLE, + description="Test description", + schema="Test schema", + ) + parameter = operation.create_payload_artificial_parameter(operation) + assert parameter.name == expected_parameter.name + assert parameter.type == expected_parameter.type + assert parameter.is_required == expected_parameter.is_required + assert parameter.location == expected_parameter.location + assert parameter.style == expected_parameter.style + assert parameter.description == expected_parameter.description + assert parameter.schema == expected_parameter.schema + + +def test_create_payload_artificial_parameter_without_request_body(): + operation = RestApiOperation(id="test", method="POST", server_url="https://example.com/", path="/resource") + expected_parameter = RestApiOperationParameter( + name=operation.PAYLOAD_ARGUMENT_NAME, + type="object", + is_required=True, + location=RestApiOperationParameterLocation.BODY, + style=RestApiOperationParameterStyle.SIMPLE, + description="REST API request body.", + schema=None, + ) + parameter = operation.create_payload_artificial_parameter(operation) + assert parameter.name == expected_parameter.name + assert parameter.type == expected_parameter.type + assert parameter.is_required == expected_parameter.is_required + assert parameter.location == expected_parameter.location + assert parameter.style == expected_parameter.style + assert parameter.description == expected_parameter.description + assert parameter.schema == expected_parameter.schema + + +def test_create_content_type_artificial_parameter(): + operation = RestApiOperation(id="test", method="POST", server_url="https://example.com/", path="/resource") + expected_parameter = RestApiOperationParameter( + name=operation.CONTENT_TYPE_ARGUMENT_NAME, + type="string", + is_required=False, + location=RestApiOperationParameterLocation.BODY, + style=RestApiOperationParameterStyle.SIMPLE, + description="Content type of REST API request body.", + ) + parameter = operation.create_content_type_artificial_parameter() + assert parameter.name == expected_parameter.name + assert parameter.type == expected_parameter.type + assert parameter.is_required == expected_parameter.is_required + assert parameter.location == expected_parameter.location + assert parameter.style == expected_parameter.style + assert parameter.description == expected_parameter.description + + +def test_get_property_name_with_namespacing_and_root_property(): + operation = RestApiOperation(id="test", method="POST", server_url="https://example.com/", path="/resource") + property = RestApiOperationPayloadProperty( + name="child", type="string", properties=[], description="Property description" + ) + result = operation._get_property_name(property, root_property_name="root", enable_namespacing=True) + assert result == "root.child" + + +def test_get_property_name_without_namespacing(): + operation = RestApiOperation(id="test", method="POST", server_url="https://example.com/", path="/resource") + property = RestApiOperationPayloadProperty( + name="child", type="string", properties=[], description="Property description" + ) + result = operation._get_property_name(property, root_property_name="root", enable_namespacing=False) + assert result == "child" + + +def test_get_payload_parameters_with_metadata_and_text_plain(): + properties = [ + RestApiOperationPayloadProperty(name="prop1", type="string", properties=[], description="Property description") + ] + request_body = RestApiOperationPayload( + media_type=RestApiOperation.MEDIA_TYPE_TEXT_PLAIN, properties=properties, description="Test description" + ) + operation = RestApiOperation( + id="test", method="POST", server_url="https://example.com/", path="/resource", request_body=request_body + ) + result = operation.get_payload_parameters(operation, use_parameters_from_metadata=True, enable_namespacing=True) + assert len(result) == 1 + assert result[0].name == operation.PAYLOAD_ARGUMENT_NAME + + +def test_get_payload_parameters_with_metadata_and_json(): + properties = [ + RestApiOperationPayloadProperty(name="prop1", type="string", properties=[], description="Property description") + ] + request_body = RestApiOperationPayload( + media_type="application/json", properties=properties, description="Test description" + ) + operation = RestApiOperation( + id="test", method="POST", server_url="https://example.com/", path="/resource", request_body=request_body + ) + result = operation.get_payload_parameters(operation, use_parameters_from_metadata=True, enable_namespacing=True) + assert len(result) == len(properties) + assert result[0].name == properties[0].name + + +def test_get_payload_parameters_without_metadata(): + operation = RestApiOperation(id="test", method="POST", server_url="https://example.com/", path="/resource") + result = operation.get_payload_parameters(operation, use_parameters_from_metadata=False, enable_namespacing=False) + assert len(result) == 2 + assert result[0].name == operation.PAYLOAD_ARGUMENT_NAME + assert result[1].name == operation.CONTENT_TYPE_ARGUMENT_NAME + + +def test_get_payload_parameters_raises_exception(): + operation = RestApiOperation( + id="test", + method="POST", + server_url="https://example.com/", + path="/resource", + request_body=None, + ) + with pytest.raises( + Exception, + match="Payload parameters cannot be retrieved from the `test` operation payload metadata because it is missing.", # noqa: E501 + ): + operation.get_payload_parameters(operation, use_parameters_from_metadata=True, enable_namespacing=False) + + +def test_get_default_response(): + operation = RestApiOperation(id="test", method="GET", server_url="https://example.com/", path="/resource") + responses = { + "200": RestApiOperationExpectedResponse( + description="Success", media_type="application/json", schema={"type": "object"} + ), + "default": RestApiOperationExpectedResponse( + description="Default response", media_type="application/json", schema={"type": "object"} + ), + } + preferred_responses = ["200", "default"] + result = operation.get_default_response(responses, preferred_responses) + assert result.description == "Success" + + +def test_get_default_response_with_default(): + operation = RestApiOperation(id="test", method="GET", server_url="https://example.com/", path="/resource") + responses = { + "default": RestApiOperationExpectedResponse( + description="Default response", media_type="application/json", schema={"type": "object"} + ) + } + preferred_responses = ["200", "default"] + result = operation.get_default_response(responses, preferred_responses) + assert result.description == "Default response" + + +def test_get_default_response_none(): + operation = RestApiOperation(id="test", method="GET", server_url="https://example.com/", path="/resource") + responses = {} + preferred_responses = ["200", "default"] + result = operation.get_default_response(responses, preferred_responses) + assert result is None + + +def test_get_default_return_parameter_with_response(): + operation = RestApiOperation(id="test", method="GET", server_url="https://example.com/", path="/resource") + responses = { + "200": RestApiOperationExpectedResponse( + description="Success", media_type="application/json", schema={"type": "object"} + ), + "default": RestApiOperationExpectedResponse( + description="Default response", media_type="application/json", schema={"type": "object"} + ), + } + operation.responses = responses + result = operation.get_default_return_parameter(preferred_responses=["200", "default"]) + assert result.name == "return" + assert result.description == "Success" + assert result.type_ == "object" + assert result.schema_data == {"type": "object"} + + +def test_get_default_return_parameter_none(): + operation = RestApiOperation(id="test", method="GET", server_url="https://example.com/", path="/resource") + responses = {} + operation.responses = responses + result = operation.get_default_return_parameter(preferred_responses=["200", "default"]) + assert result is not None + assert result.name == "return" + + @pytest.fixture def openapi_runner(): parser = OpenApiParser() @@ -159,3 +679,9 @@ async def test_run_operation_with_error(mock_request, openapi_runner): mock_request.side_effect = Exception("Error") with pytest.raises(Exception): await runner.run_operation(operation, headers=headers, request_body=request_body) + + +def test_invalid_server_url_override(): + with pytest.raises(ValueError, match="Invalid server_url_override: invalid_url"): + params = OpenAPIFunctionExecutionParameters(server_url_override="invalid_url") + params.model_post_init(None)