Skip to content

Commit

Permalink
HDXDSYS-761 Check API token validity (#69)
Browse files Browse the repository at this point in the history
* Api token methods

* Increase coverage
  • Loading branch information
mcarans authored Jun 6, 2024
1 parent bad6900 commit 482b7e5
Show file tree
Hide file tree
Showing 2 changed files with 221 additions and 10 deletions.
132 changes: 122 additions & 10 deletions src/hdx/data/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def actions() -> Dict[str, str]:
"delete": "user_delete",
"list": "user_list",
"listorgs": "organization_list_for_user",
"token_list": "api_token_list",
"autocomplete": "user_autocomplete",
}

Expand Down Expand Up @@ -248,6 +249,26 @@ def email_users(
**kwargs,
)

def get_organization_dicts(self, permission: str = "read") -> List[Dict]: # noqa: F821
"""Get organization dictionaries (not organization objects) in HDX that this user is a member of.
Args:
permission (str): Permission to check for. Defaults to 'read'.
Returns:
List[Dict]: List of organization dicts in HDX that this user is a member of
"""
success, result = self._read_from_hdx(
"user",
self.data["name"],
"id",
self.actions()["listorgs"],
permission=permission,
)
if success:
return result
return []

def get_organizations(
self, permission: str = "read"
) -> List["Organization"]: # noqa: F821
Expand All @@ -259,21 +280,112 @@ def get_organizations(
Returns:
List[Organization]: List of organizations in HDX that this user is a member of
"""
result = self.get_organization_dicts(permission)
organizations = []
for organizationdict in result:
org = hdx.data.organization.Organization.read_from_hdx(
organizationdict["id"]
)
organizations.append(org)
return organizations

def check_organization_access(self, organization: str) -> bool:
"""Check user is a member of a given organization.
Args:
organization (str): Organization id or name.
Returns:
bool: True if the logged in user is a member of the organization.
"""
for organization_dict in self.get_organization_dicts():
if organization_dict["id"] == organization:
return True
if organization_dict["name"] == organization:
return True
return False

@classmethod
def get_current_user_organization_dicts(
cls,
permission: str = "read",
configuration: Optional[Configuration] = None,
) -> List["Organization"]: # noqa: F821
"""Get organization dictionaries (not Organization objects) in HDX that the logged in user is a member of.
Args:
permission (str): Permission to check for. Defaults to 'read'.
configuration (Optional[Configuration]): HDX configuration. Defaults to global configuration.
Returns:
List[Dict]: List of organization dicts in HDX that logged in user is a member of
"""
user = User(configuration=configuration)
try:
return user.configuration.call_remoteckan(
cls.actions()["listorgs"]
)
except Exception:
return []

@classmethod
def get_current_user_organizations(
cls,
permission: str = "read",
configuration: Optional[Configuration] = None,
) -> List["Organization"]: # noqa: F821
"""Get organizations in HDX that the logged in user is a member of.
Args:
permission (str): Permission to check for. Defaults to 'read'.
configuration (Optional[Configuration]): HDX configuration. Defaults to global configuration.
Returns:
List[Organization]: List of organizations in HDX that logged in user is a member of
"""
result = cls.get_current_user_organization_dicts(
permission, configuration
)
organizations = []
for organizationdict in result:
org = hdx.data.organization.Organization.read_from_hdx(
organizationdict["id"]
)
organizations.append(org)
return organizations

@classmethod
def check_current_user_organization_access(cls, organization: str) -> bool:
"""Check logged in user is a member of a given organization.
Args:
organization (str): Organization id or name.
Returns:
bool: True if the logged in user is a member of the organization.
"""
for organization_dict in cls.get_current_user_organization_dicts():
if organization_dict["id"] == organization:
return True
if organization_dict["name"] == organization:
return True
return False

def get_token_list(self):
"""Get API tokens for user.
Returns:
List[Dict]: List of API token details
"""
success, result = self._read_from_hdx(
"user",
self.data["name"],
"id",
self.actions()["listorgs"],
permission=permission,
"user_id",
self.actions()["token_list"],
)
organizations = []
if success:
for organizationdict in result:
org = hdx.data.organization.Organization.read_from_hdx(
organizationdict["id"]
)
organizations.append(org)
return organizations
return result
return []

@classmethod
def autocomplete(
Expand Down
99 changes: 99 additions & 0 deletions tests/hdx/data/test_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@
]


user_tokenlist = [
{
"id": "123",
"name": "my_token",
"user_id": "9f3e9973-7dbe-4c65-8820-f48578e3ffea",
}
]


def user_mockshow(url, datadict):
if "show" not in url:
return MockResponse(
Expand Down Expand Up @@ -298,6 +307,48 @@ def post(url, data, headers, files, allow_redirects, auth=None):

Configuration.read().remoteckan().session = MockSession()

@pytest.fixture(scope="function")
def post_listorgs_invalid(self):
class MockSession:
@staticmethod
def post(url, data, headers, files, allow_redirects, auth=None):
decodedata = data.decode("utf-8")
datadict = json.loads(decodedata)
if "user" in url:
if "show" in url:
return user_mockshow(url, datadict)
elif "list" in url:
return MockResponse(
404,
'{"success": false, "error": {"message": "Not found", "__type": "Not Found Error"}, "help": "http://test-data.humdata.org/api/3/action/help_show?name=organization_list"}',
)

Configuration.read().remoteckan().session = MockSession()

@pytest.fixture(scope="function")
def post_tokenlist(self):
class MockSession:
@staticmethod
def post(url, data, headers, files, allow_redirects, auth=None):
decodedata = data.decode("utf-8")
datadict = json.loads(decodedata)
if "user" in url:
return user_mockshow(url, datadict)
elif "token" in url:
if datadict["user_id"] == "MyUser1":
result = json.dumps(user_tokenlist)
return MockResponse(
200,
'{"success": true, "result": %s, "help": "http://test-data.humdata.org/api/3/action/help_show?name=user_tokenlist"}'
% result,
)
return MockResponse(
404,
'{"success": false, "error": {"message": "Not found", "__type": "Not Found Error"}, "help": "http://test-data.humdata.org/api/3/action/help_show?name=user_tokenlist"}',
)

Configuration.read().remoteckan().session = MockSession()

@pytest.fixture(scope="function")
def post_autocomplete(self):
class MockSession:
Expand Down Expand Up @@ -554,8 +605,56 @@ def test_get_all_users(self, configuration, post_list, mocksmtp):

def test_get_organizations(self, configuration, post_listorgs):
user = User.read_from_hdx("9f3e9973-7dbe-4c65-8820-f48578e3ffea")
organizations = user.get_organization_dicts()
assert organizations[0]["id"] == "b67e6c74-c185-4f43-b561-0e114a736f19"
assert len(organizations) == 1
organizations = user.get_organizations()
assert organizations[0]["id"] == "b67e6c74-c185-4f43-b561-0e114a736f19"
assert len(organizations) == 1
result = user.check_organization_access(
"b67e6c74-c185-4f43-b561-0e114a736f19"
)
assert result is True
result = user.check_organization_access("acled")
assert result is True
result = user.check_organization_access("lala")
assert result is False

organizations = User.get_current_user_organization_dicts()
assert organizations[0]["id"] == "b67e6c74-c185-4f43-b561-0e114a736f19"
assert len(organizations) == 1
organizations = User.get_current_user_organizations()
assert organizations[0]["id"] == "b67e6c74-c185-4f43-b561-0e114a736f19"
assert len(organizations) == 1
result = User.check_current_user_organization_access(
"b67e6c74-c185-4f43-b561-0e114a736f19"
)
assert result is True
result = User.check_current_user_organization_access("acled")
assert result is True
result = User.check_current_user_organization_access("lala")
assert result is False

def test_get_organizations_invalid_user(
self, configuration, post_listorgs_invalid
):
user = User.read_from_hdx("9f3e9973-7dbe-4c65-8820-f48578e3ffea")
user["name"] = "lala"
assert user.get_organization_dicts() == []
assert User.get_current_user_organization_dicts() == []

def test_get_token_list(self, configuration, post_tokenlist):
user = User.read_from_hdx("9f3e9973-7dbe-4c65-8820-f48578e3ffea")
tokens = user.get_token_list()
assert tokens == [
{
"id": "123",
"name": "my_token",
"user_id": "9f3e9973-7dbe-4c65-8820-f48578e3ffea",
}
]
user["name"] = "lala"
assert user.get_token_list() == []

def test_autocomplete(self, configuration, post_autocomplete):
assert User.autocomplete("fake%20test") == user_autocomplete

0 comments on commit 482b7e5

Please sign in to comment.