From 24c9b515528da42616db09f6732e0b2636d64a80 Mon Sep 17 00:00:00 2001 From: Steve McGrath Date: Wed, 5 Feb 2025 14:35:42 -0600 Subject: [PATCH] Updated the Repo constructor to support universal #854 --- tenable/sc/repositories.py | 434 ++++++++++++++++++++++--------------- 1 file changed, 254 insertions(+), 180 deletions(-) diff --git a/tenable/sc/repositories.py b/tenable/sc/repositories.py index 68dba534e..60e81aac1 100644 --- a/tenable/sc/repositories.py +++ b/tenable/sc/repositories.py @@ -1,4 +1,4 @@ -''' +""" Repositories ============ @@ -11,133 +11,166 @@ .. rst-class:: hide-signature .. autoclass:: RepositoryAPI :members: -''' +""" + from semver import VersionInfo + from .base import SCEndpoint class RepositoryAPI(SCEndpoint): def _constructor(self, **kwargs): - ''' + """ Repository document constructor - ''' - if 'nessus_sched' in kwargs: - kwargs['nessusSchedule'] = self._schedule_constructor(kwargs['nessus_sched']) - del kwargs['nessus_sched'] - if 'mobile_sched' in kwargs: - kwargs['mobileSchedule'] = self._schedule_constructor(kwargs['mobile_sched']) - del kwargs['mobile_sched'] - if 'remote_sched' in kwargs: - kwargs['remoteSchedule'] = self._schedule_constructor(kwargs['remote_sched']) - del kwargs['remote_sched'] - - if 'name' in kwargs: + """ + if "nessus_sched" in kwargs: + kwargs["nessusSchedule"] = self._schedule_constructor( + kwargs["nessus_sched"] + ) + del kwargs["nessus_sched"] + if "mobile_sched" in kwargs: + kwargs["mobileSchedule"] = self._schedule_constructor( + kwargs["mobile_sched"] + ) + del kwargs["mobile_sched"] + if "remote_sched" in kwargs: + kwargs["remoteSchedule"] = self._schedule_constructor( + kwargs["remote_sched"] + ) + del kwargs["remote_sched"] + + if "name" in kwargs: # Validate the name is a string - self._check('name', kwargs['name'], str) + self._check("name", kwargs["name"], str) - if 'description' in kwargs: + if "description" in kwargs: # Verify that the description is a string - self._check('description', kwargs['description'], str) + self._check("description", kwargs["description"], str) - if 'format' in kwargs: + if "format" in kwargs: # The data format for the repository. - kwargs['dataFormat'] = self._check('format', kwargs['format'], str, - choices=['agent', 'IPv4', 'IPv6', 'mobile']) - del kwargs['format'] + kwargs["dataFormat"] = self._check( + "format", + kwargs["format"], + str, + choices=["agent", "IPv4", "IPv6", "mobile", "universal"], + ) + del kwargs["format"] - if 'repo_type' in kwargs: + if "repo_type" in kwargs: # The type of repository - kwargs['type'] = self._check('repo_type', kwargs['repo_type'], str, - choices=['Local', 'Remote', 'Offline']) - del kwargs['repo_type'] + kwargs["type"] = self._check( + "repo_type", + kwargs["repo_type"], + str, + choices=["Local", "Remote", "Offline"], + ) + del kwargs["repo_type"] - if 'orgs' in kwargs: + if "orgs" in kwargs: # Validate all of the organizational sub-documents. - kwargs['organizations'] = [{'id': self._check('org_id', o, int)} - for o in self._check('orgs', kwargs['orgs'], list)] - del kwargs['orgs'] + kwargs["organizations"] = [ + {"id": self._check("org_id", o, int)} + for o in self._check("orgs", kwargs["orgs"], list) + ] + del kwargs["orgs"] - if 'trending' in kwargs: + if "trending" in kwargs: # Trending should be between 0 and 365. - kwargs['trendingDays'] = self._check('trending', kwargs['trending'], int, - choices=list(range(366))) - del kwargs['trending'] + kwargs["trendingDays"] = self._check( + "trending", kwargs["trending"], int, choices=list(range(366)) + ) + del kwargs["trending"] - if 'fulltext_search' in kwargs: + if "fulltext_search" in kwargs: # trendWithRaw is the backend paramater name for "Full Text Search" # within the UI. We will be calling it fulltest_search to more # closely align with what the frontend calls this feature. - kwargs['trendWithRaw'] = str(self._check('fulltext_search', - kwargs['fulltext_search'], bool)).lower() - del kwargs['fulltext_search'] + kwargs["trendWithRaw"] = str( + self._check("fulltext_search", kwargs["fulltext_search"], bool) + ).lower() + del kwargs["fulltext_search"] - if 'lce_correlation' in kwargs: + if "lce_correlation" in kwargs: # The correlation parameter isn't well named here, we will call it # out as LCE correlation to specifically note what it is for. - kwargs['correlation'] = [{'id': self._check('lce_id', l, int)} - for l in self._check('lce_correlation', kwargs['lce_correlation'], list)] - del kwargs['lce_correlation'] + kwargs["correlation"] = [ + {"id": self._check("lce_id", l, int)} + for l in self._check("lce_correlation", kwargs["lce_correlation"], list) + ] + del kwargs["lce_correlation"] - if 'allowed_ips' in kwargs: + if "allowed_ips" in kwargs: # Using valid IPs here instead of ipRange to again more closely # align to the frontend and to more explicitly call out the # function of this paramater - kwargs['ipRange'] = ','.join([self._check('ip', i, str) - for i in self._check('allowed_ips', kwargs['allowed_ips'], list)]) - del kwargs['allowed_ips'] + kwargs["ipRange"] = ",".join( + [ + self._check("ip", i, str) + for i in self._check("allowed_ips", kwargs["allowed_ips"], list) + ] + ) + del kwargs["allowed_ips"] - if 'remote_ip' in kwargs: - kwargs['remoteIP'] = self._check('remote_ip', kwargs['remote_ip'], str) - del kwargs['remote_ip'] + if "remote_ip" in kwargs: + kwargs["remoteIP"] = self._check("remote_ip", kwargs["remote_ip"], str) + del kwargs["remote_ip"] - if 'remote_repo' in kwargs: - kwargs['remoteID'] = self._check('remote_repo', kwargs['remote_repo'], int) - del kwargs['remote_repo'] + if "remote_repo" in kwargs: + kwargs["remoteID"] = self._check("remote_repo", kwargs["remote_repo"], int) + del kwargs["remote_repo"] - if 'preferences' in kwargs: + if "preferences" in kwargs: # Validate that all of the preferences are K:V pairs of strings. - for key in self._check('preferences', kwargs['preferences'], dict): - self._check('preference:{}'.format(key), key, str) - self._check('preference:{}:value'.format(key), - kwargs['preferences'][key], str) - - if 'mdm_id' in kwargs: - kwargs['mdm'] = {'id': self._check('mdm_id', kwargs['mdm_id'], int)} - del kwargs['mdm_id'] - - if 'scanner_id' in kwargs: - kwargs['scanner'] = {'id': self._check( - 'scanner_id', kwargs['scanner_id'], int)} - del kwargs['scanner_id'] + for key in self._check("preferences", kwargs["preferences"], dict): + self._check("preference:{}".format(key), key, str) + self._check( + "preference:{}:value".format(key), kwargs["preferences"][key], str + ) + + if "mdm_id" in kwargs: + kwargs["mdm"] = {"id": self._check("mdm_id", kwargs["mdm_id"], int)} + del kwargs["mdm_id"] + + if "scanner_id" in kwargs: + kwargs["scanner"] = { + "id": self._check("scanner_id", kwargs["scanner_id"], int) + } + del kwargs["scanner_id"] return kwargs def _rules_constructor(self, **kwargs): - ''' + """ Accept/Recast Rule Query Creator - ''' - if 'plugin_id' in kwargs: + """ + if "plugin_id" in kwargs: # Convert the snake_cased variant to the camelCased variant. - kwargs['pluginID'] = self._check('plugin_id', kwargs['plugin_id'], int) - del kwargs['plugin_id'] - if 'port' in kwargs: + kwargs["pluginID"] = self._check("plugin_id", kwargs["plugin_id"], int) + del kwargs["plugin_id"] + if "port" in kwargs: # validate port is a integer - self._check('port', kwargs['port'], int) - if 'orgs' in kwargs: + self._check("port", kwargs["port"], int) + if "orgs" in kwargs: # convert the list of organization IDs into the comma-separated # string that the API expects. - kwargs['organizationIDs'] = ','.join([str(self._check('org:id', o, int)) - for o in self._check('orgs', kwargs['orgs'], list)]) - del kwargs['orgs'] - if 'fields' in kwargs: + kwargs["organizationIDs"] = ",".join( + [ + str(self._check("org:id", o, int)) + for o in self._check("orgs", kwargs["orgs"], list) + ] + ) + del kwargs["orgs"] + if "fields" in kwargs: # convert the list of field names into the comma-separated string # that the API expects. - kwargs['fields'] = ','.join([self._check('field', f, str) - for f in kwargs['fields']]) + kwargs["fields"] = ",".join( + [self._check("field", f, str) for f in kwargs["fields"]] + ) return kwargs def list(self, fields=None, repo_type=None): - ''' + """ Retrieves a list of repositories. :sc-api:`repository: list ` @@ -164,18 +197,21 @@ def list(self, fields=None, repo_type=None): Retrieve all of the remote repositories: >>> repos = sc.repositories.list(repo_type='Remote') - ''' + """ params = dict() if repo_type: - params['type'] = self._check('repo_type', repo_type, str, choices=[ - 'All', 'Local', 'Remote', 'Offline']) + params["type"] = self._check( + "repo_type", + repo_type, + str, + choices=["All", "Local", "Remote", "Offline"], + ) if fields: - params['fields'] = ','.join([self._check('field', f, str) - for f in fields]) - return self._api.get('repository', params=params).json()['response'] + params["fields"] = ",".join([self._check("field", f, str) for f in fields]) + return self._api.get("repository", params=params).json()["response"] def create(self, **kwargs): - ''' + """ Creates a new repository :sc-api:`repository: create ` @@ -305,34 +341,34 @@ def create(self, **kwargs): ... 'start': 'TZID=America/NewYork:20190212T060000', ... 'repeatRule': 'FREQ=DAILY;INTERVAL=1' ... }) - ''' + """ kwargs = self._constructor(**kwargs) - kwargs['dataFormat'] = kwargs.get('dataFormat', 'IPv4') - kwargs['type'] = kwargs.get('type', 'Local') + kwargs["dataFormat"] = kwargs.get("dataFormat", "IPv4") + kwargs["type"] = kwargs.get("type", "Local") - if kwargs['dataFormat'] in ['IPv4', 'IPv6', 'agent']: - kwargs['trendingDays'] = kwargs.get('trendingDays', 0) - kwargs['trendWithRaw'] = kwargs.get('trendWithRaw', 'false') + if kwargs["dataFormat"] in ["IPv4", "IPv6", "agent"]: + kwargs["trendingDays"] = kwargs.get("trendingDays", 0) + kwargs["trendWithRaw"] = kwargs.get("trendWithRaw", "false") - if kwargs['dataFormat'] in ['IPv4', 'IPv6']: - kwargs['nessusSchedule'] = kwargs.get('nessusSchedule', {'type': 'never'}) + if kwargs["dataFormat"] in ["IPv4", "IPv6"]: + kwargs["nessusSchedule"] = kwargs.get("nessusSchedule", {"type": "never"}) - if kwargs['dataFormat'] == 'IPv4': - kwargs['ipRange'] = kwargs.get('ipRange', '0.0.0.0/0') + if kwargs["dataFormat"] == "IPv4": + kwargs["ipRange"] = kwargs.get("ipRange", "0.0.0.0/0") - if kwargs['dataFormat'] == 'IPv6': - kwargs['ipRange'] = kwargs.get('ipRange', '::/0') + if kwargs["dataFormat"] == "IPv6": + kwargs["ipRange"] = kwargs.get("ipRange", "::/0") - if kwargs['dataFormat'] == 'mobile': - kwargs['mobileSchedule'] = kwargs.get('mobileSchedule', {'type': 'never'}) + if kwargs["dataFormat"] == "mobile": + kwargs["mobileSchedule"] = kwargs.get("mobileSchedule", {"type": "never"}) - if kwargs['type'] == 'remote': - kwargs['remoteSchedule'] = kwargs.get('remoteSchedule', {'type': 'never'}) + if kwargs["type"] == "remote": + kwargs["remoteSchedule"] = kwargs.get("remoteSchedule", {"type": "never"}) - return self._api.post('repository', json=kwargs).json()['response'] + return self._api.post("repository", json=kwargs).json()["response"] def details(self, repository_id, fields=None): - ''' + """ Retrieves the details for the specified repository. :sc-api:`repository: details ` @@ -350,16 +386,18 @@ def details(self, repository_id, fields=None): Examples: >>> repo = sc.repositories.details(1) - ''' + """ params = dict() if fields: - params['fields'] = ','.join([self._check('field', f, str) for f in fields]) + params["fields"] = ",".join([self._check("field", f, str) for f in fields]) - return self._api.get('repository/{}'.format( - self._check('repository_id', repository_id, int)), params=params).json()['response'] + return self._api.get( + "repository/{}".format(self._check("repository_id", repository_id, int)), + params=params, + ).json()["response"] def delete(self, repository_id): - ''' + """ Remove the specified repository from Tenable Security Center :sc-api:`repository: delete ` @@ -373,12 +411,13 @@ def delete(self, repository_id): Examples: >>> sc.repositories.delete(1) - ''' - return self._api.delete('repository/{}'.format( - self._check('repository_id', repository_id, int))).json()['response'] + """ + return self._api.delete( + "repository/{}".format(self._check("repository_id", repository_id, int)) + ).json()["response"] def edit(self, repository_id, **kwargs): - ''' + """ Updates an existing repository :sc-api:`repository: edit ` @@ -437,13 +476,15 @@ def edit(self, repository_id, **kwargs): Examples: >>> repo = sc.repositories.edit(1, name='Example IPv4') - ''' + """ kwargs = self._constructor(**kwargs) - return self._api.patch('repository/{}'.format( - self._check('repository_id', repository_id, int)), json=kwargs).json()['response'] + return self._api.patch( + "repository/{}".format(self._check("repository_id", repository_id, int)), + json=kwargs, + ).json()["response"] def accept_risk_rules(self, repository_id, **kwargs): - ''' + """ Retrieves the accepted risk rules associated with the specified repository. @@ -462,13 +503,17 @@ def accept_risk_rules(self, repository_id, **kwargs): Examples: >>> rules = sc.repositories.accept_risk_rules(1) - ''' + """ params = self._rules_constructor(**kwargs) - return self._api.get('repository/{}/acceptRiskRule'.format( - self._check('repository_id', repository_id, int)), params=params).json()['response'] + return self._api.get( + "repository/{}/acceptRiskRule".format( + self._check("repository_id", repository_id, int) + ), + params=params, + ).json()["response"] def recast_risk_rules(self, repository_id, **kwargs): - ''' + """ Retrieves the recast risk rules associated with the specified repository. @@ -488,13 +533,17 @@ def recast_risk_rules(self, repository_id, **kwargs): Examples: >>> rules = sc.repositories.recast_risk_rules(1) - ''' + """ params = self._rules_constructor(**kwargs) - return self._api.get('repository/{}/recastRiskRule'.format( - self._check('repository_id', repository_id, int)), params=params).json()['response'] + return self._api.get( + "repository/{}/recastRiskRule".format( + self._check("repository_id", repository_id, int) + ), + params=params, + ).json()["response"] def asset_intersections(self, repository_id, uuid=None, ip_address=None, dns=None): - ''' + """ Retrieves the asset lists that a UUID, DNS address, or IP exists in. :sc-api:`repository: asst intersections @@ -513,20 +562,27 @@ def asset_intersections(self, repository_id, uuid=None, ip_address=None, dns=Non Examples: >>> assetlists = sc.repositories.asset_intersection(1, ... ip='192.168.0.1') - ''' + """ params = dict() if dns: - params['dnsName'] = self._check('dns', dns, str) + params["dnsName"] = self._check("dns", dns, str) if ip_address: - params['ip'] = self._check('ip_address', ip_address, str) + params["ip"] = self._check("ip_address", ip_address, str) if uuid: - params['uuid'] = self._check('uuid', uuid, 'uuid') - return self._api.get('repository/{}/assetIntersections'.format( - self._check('repository_id', repository_id, int)), - params=params).json()['response'].get('assets') + params["uuid"] = self._check("uuid", uuid, "uuid") + return ( + self._api.get( + "repository/{}/assetIntersections".format( + self._check("repository_id", repository_id, int) + ), + params=params, + ) + .json()["response"] + .get("assets") + ) def import_repository(self, repository_id, fobj): - ''' + """ Imports the repository archive for an offline repository. :sc-api:`repository: import ` @@ -543,14 +599,16 @@ def import_repository(self, repository_id, fobj): Example: >>> with open('repo.tar.gz', 'rb') as archive: ... sc.repositories.import_repository(1, archive) - ''' - return self._api.post('repository/{}/import'.format( - self._check('repository_id', repository_id, int)), json={ - 'file': self._api.files.upload(fobj) - }).json()['response'] + """ + return self._api.post( + "repository/{}/import".format( + self._check("repository_id", repository_id, int) + ), + json={"file": self._api.files.upload(fobj)}, + ).json()["response"] def export_repository(self, repository_id, fobj): - ''' + """ Exports the repository and writes the archive tarball into the file object passed. @@ -568,9 +626,13 @@ def export_repository(self, repository_id, fobj): Example: >>> with open('repo.tar.gz', 'wb') as archive: ... sc.repositories.export_repository(1, archive) - ''' - resp = self._api.get('repository/{}/export'.format( - self._check('repository_id', repository_id, int)), stream=True) + """ + resp = self._api.get( + "repository/{}/export".format( + self._check("repository_id", repository_id, int) + ), + stream=True, + ) # Lets stream the file into the file-like object... for chunk in resp.iter_content(chunk_size=1024): @@ -581,7 +643,7 @@ def export_repository(self, repository_id, fobj): return fobj def remote_sync(self, repository_id): - ''' + """ Initiates a remote synchronization with a downstream Tenable Security Center instance. This action can only be performed on an offline repository. @@ -596,12 +658,16 @@ def remote_sync(self, repository_id): Examples: >>> sc.repositories.remote_sync(1) - ''' - return self._api.post('repository/{}/sync'.format( - self._check('repository_id', repository_id, int)), json={}).json()['response'] + """ + return self._api.post( + "repository/{}/sync".format( + self._check("repository_id", repository_id, int) + ), + json={}, + ).json()["response"] def mobile_sync(self, repository_id): - ''' + """ Initiates a MDM synchronization with the configured MDM source on the mobile repository specified. @@ -617,12 +683,18 @@ def mobile_sync(self, repository_id): Examples: >>> sc.repositories.mobile_sync(1) - ''' - return self._api.post('repository/{}/updateMobileData'.format( - self._check('repository_id', repository_id, int)), json={}).json()['response'] - - def device_info(self, repository_id, dns=None, ip_address=None, uuid=None, fields=None): - ''' + """ + return self._api.post( + "repository/{}/updateMobileData".format( + self._check("repository_id", repository_id, int) + ), + json={}, + ).json()["response"] + + def device_info( + self, repository_id, dns=None, ip_address=None, uuid=None, fields=None + ): + """ Retrieves the device information for the requested device on the associated repository. @@ -647,33 +719,31 @@ def device_info(self, repository_id, dns=None, ip_address=None, uuid=None, field Examples: >>> host = sc.repositories.device_info(1, ip_address='192.168.0.1') - ''' + """ # We will generally want to query the deviceInfo action, however if we # happen to be on a Tenable Security Center instance version that's less than 5.7, we # have to instead query ipInfo. - method = 'deviceInfo' - if VersionInfo.parse(self._api.version).match('<5.7.0'): - method = 'ipInfo' + method = "deviceInfo" + if VersionInfo.parse(self._api.version).match("<5.7.0"): + method = "ipInfo" params = dict() if fields: - params['fields'] = ','.join( - [self._check('field', f, str) for f in fields] - ) + params["fields"] = ",".join([self._check("field", f, str) for f in fields]) if dns: - params['dnsName'] = self._check('dns', dns, str) + params["dnsName"] = self._check("dns", dns, str) if ip_address: - params['ip'] = self._check('ip_address', ip_address, str) + params["ip"] = self._check("ip_address", ip_address, str) if uuid: - params['uuid'] = self._check('uuid', uuid, 'uuid') + params["uuid"] = self._check("uuid", uuid, "uuid") - self._check('repository_id', repository_id, int) - return self._api.get(f'repository/{repository_id}/{method}', - params=params - ).json()['response'] + self._check("repository_id", repository_id, int) + return self._api.get( + f"repository/{repository_id}/{method}", params=params + ).json()["response"] def remote_authorize(self, host, username, password): - ''' + """ Authorized communication to a downstream Tenable Security Center instance with the provided username and password. @@ -691,15 +761,18 @@ def remote_authorize(self, host, username, password): Examples: >>> sc.repositories.remote_authorize( ... '192.168.0.101', 'admin', 'password') - ''' - return self._api.post('repository/authorize', json={ - 'host': self._check('host', host, str), - 'username': self._check('username', username, str), - 'password': self._check('password', password, str) - }).json()['response'] + """ + return self._api.post( + "repository/authorize", + json={ + "host": self._check("host", host, str), + "username": self._check("username", username, str), + "password": self._check("password", password, str), + }, + ).json()["response"] def remote_fetch(self, host): - ''' + """ Retrieves the list of repositories from the specified downstream Tenable Security Center instance. @@ -711,6 +784,7 @@ def remote_fetch(self, host): Returns: :obj:`list`: The list of repositories on the downstream Tenable Security Center instance. - ''' - return self._api.get('repository/fetchRemote', params={ - 'host': self._check('host', host, str)}).json()['response'] + """ + return self._api.get( + "repository/fetchRemote", params={"host": self._check("host", host, str)} + ).json()["response"]