diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..ec8536f --- /dev/null +++ b/.flake8 @@ -0,0 +1,3 @@ +[flake8] +max-line-length = 120 +exclude = dist, tests \ No newline at end of file diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..d193446 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,42 @@ +name: CI + +on: [push, pull_request] + +jobs: + all: + runs-on: ubuntu-latest + + strategy: + matrix: + python-version: ["3.10", "3.11"] + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install flake8 pytest + pip install -r requirements.txt + + - name: Build project + run: make + + - name: Test probe + run: pytest + + - name: Lint with flake8 + run: flake8 . + + - name: Upload artifact + uses: actions/upload-artifact@v4 + with: + name: b2access-probe-py${{ matrix.python-version }} + path: '*.tar.gz' + if-no-files-found: 'error' diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4bf1bd5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.tar.gz +.idea +__pycache__/ \ No newline at end of file diff --git a/check_b2access.py b/check_b2access.py index f418c91..3cae32a 100755 --- a/check_b2access.py +++ b/check_b2access.py @@ -5,303 +5,306 @@ import signal import json -from time import strftime,gmtime +from functools import wraps +from time import strftime, gmtime + +import urllib3 from oauthlib.oauth2 import BackendApplicationClient from requests_oauthlib import OAuth2Session -import requests.packages.urllib3 +import requests import subprocess import datetime -from oauthlib.oauth2.rfc6749.errors import OAuth2Error, MissingTokenError +from oauthlib.oauth2.rfc6749.errors import MissingTokenError from requests.exceptions import ConnectionError, HTTPError import os.path import validators -from validators.utils import ValidationFailure -TEST_SUFFIX='NAGIOS-' + strftime("%Y%m%d-%H%M%S",gmtime()) -VALUE_ORIG='http://www.' + TEST_SUFFIX + '.com/1' -VALUE_AFTER='http://www.' + TEST_SUFFIX + '.com/2' -TOKEN_URI='/oauth2/token' +TEST_SUFFIX = f"NAGIOS-{strftime('%Y%m%d-%H%M%S', gmtime())}" +VALUE_ORIG = f"http://www.{TEST_SUFFIX}.com/1" # TODO this is ugly +VALUE_AFTER = f"http://www.{TEST_SUFFIX}.com/2" +TOKEN_URI = '/oauth2/token' + -def handler(signum, stack): - print "UNKNOWN: Timeout reached, exiting." +def handler(*args): + print("UNKNOWN: Timeout reached, exiting.") sys.exit(3) + +def exceptionHandler(message: str): + def handleExceptions(func): + @wraps(func) + def wrapper(*args, **kwargs): + try: + ret = func(*args, **kwargs) + except BaseException: + print(message, sys.exc_info()[0]) + sys.exit(2) + return ret + + return wrapper + + return handleExceptions + + +@exceptionHandler("CRITICAL: Error fetching OAuth 2.0 access token:") def getAccessToken(param): """Fetch access token from B2ACCESS""" - if param.verbose == True: - print "\nFetching access token from B2ACCESS" - """ Pre-req: Create a user 'argo' with password 'test' in group 'oauth-clients' and 'eudat:b2share' or any other """ - + if param.verbose: + print("\nFetching access token from B2ACCESS") + """ Pre-req: Create a user 'argo' with password 'test' in group 'oauth-clients' and 'eudat:b2share' or any other """ + try: client = BackendApplicationClient(client_id=username) - client.prepare_request_body(scope=['profile','email','GENERATE_USER_CERTIFICATE']) + client.prepare_request_body(scope=['profile', 'email', 'GENERATE_USER_CERTIFICATE']) oauth = OAuth2Session(client=client) - token = oauth.fetch_token(token_url=str(param.url)+TOKEN_URI, verify=False,client_id=str(param.username),client_secret=str(param.password),scope=['USER_PROFILE','GENERATE_USER_CERTIFICATE']) + token = oauth.fetch_token(token_url=str(param.url) + TOKEN_URI, verify=False, client_id=str(param.username), + client_secret=str(param.password), + scope=['USER_PROFILE', 'GENERATE_USER_CERTIFICATE']) j = json.dumps(token, indent=4) k = json.loads(j) if param.verbose: - print "Access token: "+k['access_token'] - - getTokenInfo(str(param.url)+'/oauth2/tokeninfo', str(k['access_token']), param.verbose) - getUserInfo(str(param.url)+'/oauth2/userinfo', str(k['access_token']), param.verbose) + print("Access token: " + k['access_token']) + + getTokenInfo(f"{str(param.url)}/oauth2/tokeninfo{str(k['access_token'])}", param.verbose) + getUserInfo(f"{str(param.url)}/oauth2/userinfo{str(k['access_token'])}", param.verbose) except ConnectionError as e: - print "CRITICAL: Invalid Unity URL: {0}".format(e) + print("CRITICAL: Invalid Unity URL: {0}".format(e)) sys.exit(2) except MissingTokenError as e: - print "CRITICAL: Invalid client Id and/or secret: {0}".format(e.description) + print("CRITICAL: Invalid client Id and/or secret: {0}".format(e.description)) sys.exit(2) except TypeError as e: - print e - sys.exit(2) - except: - print("CRITICAL: Error fetching OAuth 2.0 access token:", sys.exc_info()[0]) + print(e) sys.exit(2) - raise - - + + +@exceptionHandler("CRITICAL: Error retrieving access token information:") def getTokenInfo(url, token, verbose): """ Fetch access token details """ try: if verbose: - print "Fetching access token information from URL: "+url - - entity = requests.get(url,verify=False, headers = {'Authorization': 'Bearer '+token}) + print(f"\nFetching access token information from URL: {url}") + + entity = requests.get(url, verify=False, headers={'Authorization': 'Bearer ' + token}) + if entity.status_code != 200: + raise ConnectionError(f"Fetching access token from {url} returned {entity.status_code}") j = entity.json() expire = datetime.datetime.fromtimestamp(int(j['exp'])).strftime('%Y-%m-%d %H:%M:%S') if verbose: - print "Expires on: "+expire - print 'Detailed token info: '+entity.text + print(f"Expires on: {expire}\nDetailed token info: {entity.text}") except KeyError as e: - print "WARNING: Invalid key(s): {0}".format(e) + print("WARNING: Invalid key(s): {0}".format(e)) sys.exit(1) except ValueError as e: - print "CRITICAL: Invalid access token: {0}".format(e) + print("CRITICAL: Invalid access token: {0}".format(e)) sys.exit(2) except ConnectionError as e: - print "CRITICAL: Invalid token endpoint URL: {0}".format(e) + print("CRITICAL: Invalid token endpoint URL: {0}".format(e)) sys.exit(2) - except: - print("CRITICAL: Error retrieving access token information:", sys.exc_info()[0]) - sys.exit(2) - raise - + +@exceptionHandler("CRITICAL: Error retrieving user information:") def getUserInfo(url, token, verbose): """ Fetch user information using access token """ try: - if param.verbose: - print "\n" - print "Fetching user information based on access token, endpoint URL: "+url - entity = requests.get(url,verify=False, headers = {'Authorization': 'Bearer '+token}) + if verbose: + print(f"\nFetching user information based on access token, endpoint URL: {url}") + entity = requests.get(url, verify=False, headers={'Authorization': 'Bearer ' + token}) + if entity.status_code != 200: + raise ConnectionError(f"Fetching access token from {url} returned {entity.status_code}") j = entity.json() - if param.verbose: - print "Subject: "+j['sub'] - print "Persistent Id: "+j['unity:persistent'] - print 'Detailed user information: '+entity.text + if verbose: + print( + f"Subject: {j['sub']}\nPersistent Id: {j['unity:persistent']}\n\ + Detailed user information: {entity.text}") except KeyError as e: - print "WARNING: Invalid key(s): {0}".format(e) + print("WARNING: Invalid key(s): {0}".format(e)) sys.exit(1) except ValueError as e: - print "CRITICAL: Invalid access token: {0}".format(e) + print("CRITICAL: Invalid access token: {0}".format(e)) sys.exit(2) except ConnectionError as e: - print "CRITICAL: Invalid UserInfo endpoint URL: {0}".format(e) - sys.exit(2) - except: - print("CRITICAL: Error retrieving user information:", sys.exc_info()[0]) + print("CRITICAL: Invalid UserInfo endpoint URL: {0}".format(e)) sys.exit(2) - raise - + +@exceptionHandler("CRITICAL: Error retrieving user information with the username/password:") def getInfoUsernamePassword(param): """ Query user information with username and password """ - - url = param.url+"/rest-admin/v1/resolve/userName/"+str(param.username) - + + url = param.url + "/rest-admin/v1/resolve/userName/" + str(param.username) + if param.verbose: - print "\nQuery with username and password, endpoint URL: "+url - - try: + print(f"\nQuery with username and password, endpoint URL: {url}") + + try: uname = param.username pwd = param.password - entity = requests.get(str(url),verify=False,auth=(uname, pwd)) + entity = requests.get(str(url), verify=False, auth=(uname, pwd)) if entity.status_code == 403: - raise HTTPError("CRITICAL: Error retrieving the user information with username {0}: invalid username/password".format(uname)) + print("CRITICAL: Error retrieving the user information with username {0}: invalid username/password".format( + uname)) sys.exit(2) j = entity.json() if param.verbose: - print "Credential requirement: "+j['credentialInfo']['credentialRequirementId'] - print "Entity Id: "+str(j['id']) - print "Username: "+j['identities'][0]['value'] - print "Detailed user information: "+entity.text + print(f"\nCredential requirement: {j['credentialInfo']['credentialRequirementId']}\n\ + Entity Id: {str(j['id'])}\n\ + Username: {j['identities'][0]['value']}\n\ + Detailed user information: {entity.text}") + except ConnectionError as e: - print "CRITICAL: Invalid Unity endpoint URL: {0}".format(e) + print("CRITICAL: Invalid Unity endpoint URL: {0}".format(e)) sys.exit(2) except HTTPError as e: - print e + print(e) sys.exit(2) except KeyError as e: - print "CRITICAL: Invalid key(s): {0}".format(e) - sys.exit(2) - except: - print("CRITICAL: Error retrieving user information with the username/password:", sys.exc_info()[0]) + print("CRITICAL: Invalid key(s): {0}".format(e)) sys.exit(2) - raise - + + +@exceptionHandler("CRITICAL: Error retrieving user information by X509 certificate:") def getInfoCert(param): """ Query user information with X509 Certificate Authentication """ try: - cert_txt = subprocess.check_output(["openssl", "x509", "-subject", "-noout","-in", param.certificate]) - + cert_txt = subprocess.check_output(["openssl", "x509", "-subject", "-noout", "-in", param.certificate]) sub = str(cert_txt).replace("subject= ", "") - dn = getLdapName(sub) - - """ url = param.url+"/rest-admin/v1/resolve/x500Name/CN=Ahmed Shiraz Memon,OU=IAS-JSC,OU=Forschungszentrum Juelich GmbH,O=GridGermany,C=DE" """ - - url = param.url+"/rest-admin/v1/resolve/x500Name/"+dn - - print "url: "+url - + """ + url = param.url+"/rest-admin/v1/resolve/x500Name/CN=Ahmed Shiraz Memon,OU=IAS-JSC,\ + OU=Forschungszentrum Juelich GmbH,O=GridGermany,C=DE" + """ + url = f"{param.url}/rest-admin/v1/resolve/x500Name/{dn}" + + print(f"url: {url}") + if param.verbose: - print "\nQuery user information with X509 Certificate Authentication, endpoint URL:" + url - - entity = requests.get(str(url),verify=False,cert=(str(param.certificate), str(param.key))) - + print(f"\nQuery user information with X509 Certificate Authentication, endpoint URL: {url}") + + entity = requests.get(str(url), verify=False, cert=(str(param.certificate), str(param.key))) + if (entity.status_code == 400) or (entity.status_code == 403): - raise HTTPError("CRITICAL: Error retrieving the user information with X500Name {0}: invalid certificate".format(dn)) + print("CRITICAL: Error retrieving the user information with X500Name {0}: invalid certificate".format(dn)) sys.exit(2) - + j = entity.json() - + if param.verbose: - print "Credential requirement: "+j['credentialInfo']['credentialRequirementId'] + print(f"Credential requirement: {j['credentialInfo']['credentialRequirementId']}") """print "Entity Id: "+str(j['entityId'])""" - print "Entity Id: "+str(j['entityInformation']['entityId']) - print "X500Name: "+j['identities'][0]['value'] - - if param.verbose: - print "Detailed user information: \n"+json.dumps(j, indent=4) + print(f"Entity Id: {str(j['entityInformation']['entityId'])}") + print(f"X500Name: {j['identities'][0]['value']}") + print(f"Detailed user information: \n{json.dumps(j, indent=4)}") except HTTPError as e: - print e + print(e) sys.exit(2) except KeyError as e: - print "CRITICAL: Invalid key(s): {0}".format(e) - sys.exit(2) - except: - print("CRITICAL: Error retrieving user information by X509 certificate:", sys.exc_info()) + print("CRITICAL: Invalid key(s): {0}".format(e)) sys.exit(2) - raise + def getLdapName(openssl_name): name = str(openssl_name) strs = name.split("/") - strs.reverse() - + strs[0] = str(strs[0]).rstrip() - strs.pop() - - print strs - + + # print(strs) why? str1 = ','.join(strs) - return str1 - + if __name__ == '__main__': - #disable ssl warnings and trust the unity server - requests.packages.urllib3.disable_warnings() + # disable ssl warnings and trust the unity server + urllib3.disable_warnings(category=urllib3.exceptions.InsecureRequestWarning) parser = argparse.ArgumentParser(description='B2ACCESS login, query probe') - + # req = parser.add_argument_group('required arguments') - + subParsers = parser.add_subparsers() - + parser.add_argument('-u', '--url', action='store', dest='url', required=True, - help='baseuri of B2ACCESS-UNITY to test') + help='baseuri of B2ACCESS-UNITY to test') parser.add_argument('-t', '--timeout', action='store', dest='timeout', - help='timeout') + help='timeout') parser.add_argument('-v', '--version', action='store', dest='version', - help='version') + help='version') parser.add_argument('-V', '--verbose', action='store_true', dest='verbose', - help='increase output verbosity', default=False) + help='increase output verbosity', default=False) parser.add_argument('-d', '--debug', action='store_true', dest='debug', - help='debug mode') - - u_parser = subParsers.add_parser('1',help='Username/Password based authentication') + help='debug mode') + u_parser = subParsers.add_parser('1', help='Username/Password based authentication') u_parser.add_argument('-U', '--username', action='store', dest='username', required=True, - help='B2ACCESS user') + help='B2ACCESS user') u_parser.add_argument('-P', '--password', action='store', dest='password', required=True, - help='B2ACCESS password') + help='B2ACCESS password') u_parser.set_defaults(action='1') - - c_parser = subParsers.add_parser('2',help='X.509 Certificate based authentication') + + c_parser = subParsers.add_parser('2', help='X.509 Certificate based authentication') c_parser.add_argument('-C', '--cert', action='store', dest='certificate', - help='Path to public key certificate', required=True) + help='Path to public key certificate', required=True) c_parser.add_argument('-K', '--key', action='store', dest='key', - help='Path to private key', required=True) + help='Path to private key', required=True) c_parser.set_defaults(action='2') - - param = parser.parse_args() - base_url = param.url - timeout = param.timeout - - if param.action == "1": - username = param.username - password = param.password - - - if param.verbose == True: - print "verbosity is turned ON" - - if param.timeout and int(param.timeout) > 0 : - print "Timeout: "+timeout + + parser_args = parser.parse_args() + base_url = parser_args.url + timeout = parser_args.timeout + username = "" + print(parser_args) + + if parser_args.action == "1": + username = parser_args.username + password = parser_args.password + + if parser_args.verbose: + print("verbosity is turned ON") + + if parser_args.timeout and int(parser_args.timeout) > 0: + print(f"Timeout: {timeout}") signal.signal(signal.SIGALRM, handler) - signal.alarm(int(param.timeout)) - - - - if param.verbose: - print "Starting B2ACCESS Probe...\n---------------------------\n" - print "B2ACCESS url: "+str(base_url) - if param.action == "1": - print "B2ACCESS username: "+username - if param.action == "2": - print "Public key: "+param.certificate - - try: - if param.action == "2": - if not os.path.exists(param.certificate): - raise IOError("CRITICAL: Public key certificate file does not exist: {0}".format(param.certificate)) - if not os.path.exists(param.key): - raise IOError("CRITICAL: Private key file does not exist: : {0}".format(param.key)) - if not validators.url(param.url): - raise SyntaxError("CRITICAL: Invalid URL syntax {0}".format(param.url)) + signal.alarm(int(parser_args.timeout)) + + if parser_args.verbose: + print(f"Starting B2ACCESS Probe...\n---------------------------\n\ + B2ACCESS url: {str(base_url)}") + if parser_args.action == "1": + print(f"B2ACCESS username: {username}") + elif parser_args.action == "2": + print(f"Public key: {parser_args.certificate}") + try: + if parser_args.action == "2": + if not os.path.exists(parser_args.certificate): + raise IOError( + "CRITICAL: Public key certificate file does not exist: {0}".format(parser_args.certificate)) + if not os.path.exists(parser_args.key): + raise IOError("CRITICAL: Private key file does not exist: : {0}".format(parser_args.key)) + if not validators.url(parser_args.url): + raise SyntaxError("CRITICAL: Invalid URL syntax {0}".format(parser_args.url)) except IOError as e: - print e + print(e) sys.exit(2) except SyntaxError as e: - print e + print(e) sys.exit(2) - except: + except BaseException: print(sys.exc_info()[0]) sys.exit(2) - raise - - if param.action == "1": - getAccessToken(param) - getInfoUsernamePassword(param) - - if param.action == "2": - getInfoCert(param) - - if param.verbose: - if param.action == "1": - print "\nOK, User access token retrieval and login with username/password was successful" - if param.action == "2": - print "\nOK, User login with X.509 Certificate was successful" + + if parser_args.action == "1": + getAccessToken(parser_args) + getInfoUsernamePassword(parser_args) + + if parser_args.action == "2": + getInfoCert(parser_args) + + if parser_args.verbose: + if parser_args.action == "1": + print("\nOK, User access token retrieval and login with username/password was successful") + elif parser_args.action == "2": + print("\nOK, User login with X.509 Certificate was successful") else: - print "OK" - sys.exit(0) \ No newline at end of file + print("OK") + sys.exit(0) diff --git a/nagios-plugins-eudat-b2access.spec b/nagios-plugins-eudat-b2access.spec index 99f2678..12f610b 100644 --- a/nagios-plugins-eudat-b2access.spec +++ b/nagios-plugins-eudat-b2access.spec @@ -12,26 +12,26 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +%global python3_pkgversion 3.10 Name: nagios-plugins-eudat-b2access -Version: 0.3 +Version: 1.0 Release: 1%{?dist} Summary: Nagios B2ACCESS probes License: Apache License, Version 2.0 Packager: Shiraz Memon +URL: https://github.com/EUDAT-B2ACCESS/b2access-probe Source: %{name}-%{version}.tar.gz BuildArch: noarch BuildRoot: %{_tmppath}/%{name}-%{version} -Requires: python -Requires: python-argparse -Requires: python-lxml -Requires: python-simplejson -Requires: python-defusedxml -Requires: python-httplib2 -Requires: python-requests - +BuildRequires: python%{python3_pkgversion} +BuildRequires: python%{python3_pkgversion}-oauthlib +BuildRequires: python%{python3_pkgversion}-requests-oauthlib +BuildRequires: python%{python3_pkgversion}-urllib3 +BuildRequires: python%{python3_pkgversion}-validators +BuildRequires: python%{python3_pkgversion}-requests %description Nagios probes to check functionality of B2ACCESS Service @@ -55,6 +55,8 @@ install -m 755 check_b2access.py %{buildroot}/%{_libexecdir}/argo-monitoring/pro %attr(0755,root,root) /%{_libexecdir}/argo-monitoring/probes/%{probe_namespace}/check_b2access.py %changelog +* Mon Jul 14 2024 Marvin Winkens - 1.0-1 +- Updated to python 3 * Tue Jun 05 2018 Shiraz Memon - 0.4-1 - Adapted to Unity v2.x.x REST API - More details in verbose mode diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..84ad2d4 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,6 @@ +[pytest] +minversion = 6.0 +addopts = -ra -q +testpaths = + tests +pythonpath = . \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..dc94277 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +oauthlib +requests-oauthlib +urllib3 +validators +requests +pytest +requests-mock \ No newline at end of file diff --git a/tests/test_check_b2access.py b/tests/test_check_b2access.py new file mode 100644 index 0000000..1cabfeb --- /dev/null +++ b/tests/test_check_b2access.py @@ -0,0 +1,46 @@ +import json + +import pytest +from requests import Response + +from check_b2access import getAccessToken, getLdapName, getInfoCert, getTokenInfo, getUserInfo, getInfoUsernamePassword +import requests_mock + + +class TestProbe: + def test_get_ldap_name(self): + name = getLdapName("Apple/Pineapple/Banana ") + assert name == "Banana,Pineapple" + name = getLdapName("Apple/Pineapple/Banana/Oranges ") + assert name == "Oranges,Banana,Pineapple" + + def test_get_user_info(self): + url = "https://some_url" + token = "token" + + body = json.loads( + """{ + "sub": "subject", + "unity:persistent": "PersistentID" + } + """ + ) + + with requests_mock.Mocker() as m: + m.get(url, json=body, status_code=200) + getUserInfo(url, token, True) + + @pytest.mark.parametrize("status_code, body", [ + [200, '{"sub": "subject"}'], # missing persistent ID + [200, '{"unity:persistent": "PersistentID"}'], # missing subject + [500, '{"sub": "subject", "unity:persistent": "PersistentID"}'] # Server Error + ]) + def test_get_user_info_exception(self, status_code, body): + body = json.loads(body) + url = "https://some_url" + token = "token" + + with pytest.raises(SystemExit) as e: + with requests_mock.Mocker() as m: + m.get(url, json=body, status_code=status_code) + getUserInfo(url, token, True)