diff --git a/.flake8 b/.flake8 index f837320..ec8536f 100644 --- a/.flake8 +++ b/.flake8 @@ -1,3 +1,3 @@ [flake8] max-line-length = 120 -exclude = dist \ No newline at end of file +exclude = dist, tests \ No newline at end of file diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0dafe46..d193446 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,7 +3,7 @@ name: CI on: [push, pull_request] jobs: - build-and-lint: + all: runs-on: ubuntu-latest strategy: @@ -22,12 +22,15 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install flake8 + pip install flake8 pytest pip install -r requirements.txt - name: Build project run: make + - name: Test probe + run: pytest + - name: Lint with flake8 run: flake8 . diff --git a/.gitignore b/.gitignore index bdaa5ea..4bf1bd5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ *.tar.gz -.idea \ No newline at end of file +.idea +__pycache__/ \ No newline at end of file diff --git a/check_b2access.py b/check_b2access.py index cbe355e..3cae32a 100755 --- a/check_b2access.py +++ b/check_b2access.py @@ -40,7 +40,9 @@ def wrapper(*args, **kwargs): print(message, sys.exc_info()[0]) sys.exit(2) return ret + return wrapper + return handleExceptions @@ -63,8 +65,8 @@ def getAccessToken(param): if param.verbose: print("Access token: " + k['access_token']) - getTokenInfo(str(param.url) + '/oauth2/tokeninfo', str(k['access_token']), param.verbose) - getUserInfo(str(param.url) + '/oauth2/userinfo', str(k['access_token']), param.verbose) + getTokenInfo(f"{str(param.url)}/oauth2/tokeninfo{str(k['access_token'])}", param.verbose) + getUserInfo(f"{str(param.url)}/oauth2/userinfo{str(k['access_token'])}", param.verbose) except ConnectionError as e: print("CRITICAL: Invalid Unity URL: {0}".format(e)) sys.exit(2) @@ -84,6 +86,8 @@ def getTokenInfo(url, token, verbose): print(f"\nFetching access token information from URL: {url}") entity = requests.get(url, verify=False, headers={'Authorization': 'Bearer ' + token}) + if entity.status_code != 200: + raise ConnectionError(f"Fetching access token from {url} returned {entity.status_code}") j = entity.json() expire = datetime.datetime.fromtimestamp(int(j['exp'])).strftime('%Y-%m-%d %H:%M:%S') if verbose: @@ -103,11 +107,13 @@ def getTokenInfo(url, token, verbose): def getUserInfo(url, token, verbose): """ Fetch user information using access token """ try: - if parser_args.verbose: + if verbose: print(f"\nFetching user information based on access token, endpoint URL: {url}") entity = requests.get(url, verify=False, headers={'Authorization': 'Bearer ' + token}) + if entity.status_code != 200: + raise ConnectionError(f"Fetching access token from {url} returned {entity.status_code}") j = entity.json() - if parser_args.verbose: + if verbose: print( f"Subject: {j['sub']}\nPersistent Id: {j['unity:persistent']}\n\ Detailed user information: {entity.text}") diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..84ad2d4 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,6 @@ +[pytest] +minversion = 6.0 +addopts = -ra -q +testpaths = + tests +pythonpath = . \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 65d2f78..dc94277 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,6 @@ oauthlib requests-oauthlib urllib3 validators -requests \ No newline at end of file +requests +pytest +requests-mock \ No newline at end of file diff --git a/tests/test_check_b2access.py b/tests/test_check_b2access.py new file mode 100644 index 0000000..1cabfeb --- /dev/null +++ b/tests/test_check_b2access.py @@ -0,0 +1,46 @@ +import json + +import pytest +from requests import Response + +from check_b2access import getAccessToken, getLdapName, getInfoCert, getTokenInfo, getUserInfo, getInfoUsernamePassword +import requests_mock + + +class TestProbe: + def test_get_ldap_name(self): + name = getLdapName("Apple/Pineapple/Banana ") + assert name == "Banana,Pineapple" + name = getLdapName("Apple/Pineapple/Banana/Oranges ") + assert name == "Oranges,Banana,Pineapple" + + def test_get_user_info(self): + url = "https://some_url" + token = "token" + + body = json.loads( + """{ + "sub": "subject", + "unity:persistent": "PersistentID" + } + """ + ) + + with requests_mock.Mocker() as m: + m.get(url, json=body, status_code=200) + getUserInfo(url, token, True) + + @pytest.mark.parametrize("status_code, body", [ + [200, '{"sub": "subject"}'], # missing persistent ID + [200, '{"unity:persistent": "PersistentID"}'], # missing subject + [500, '{"sub": "subject", "unity:persistent": "PersistentID"}'] # Server Error + ]) + def test_get_user_info_exception(self, status_code, body): + body = json.loads(body) + url = "https://some_url" + token = "token" + + with pytest.raises(SystemExit) as e: + with requests_mock.Mocker() as m: + m.get(url, json=body, status_code=status_code) + getUserInfo(url, token, True)