diff --git a/test/test_api_challenges.py b/test/test_api_challenges.py index 8db9566..b0ae592 100644 --- a/test/test_api_challenges.py +++ b/test/test_api_challenges.py @@ -5,7 +5,17 @@ import requests import json -from .utils import config, create_challenge, delete_challenge +from .utils import ( + config, + create_challenge, + delete_challenge, + post_instance, + delete_instance, + get_admin_instance, + get_source_id, + reset_all_submissions, + bypass_ratelimit + ) base_challenge = { "name":"test", @@ -77,3 +87,81 @@ def test_cannot_create_challenge_if_no_scenario_id(self): r = requests.post(f"{config.ctfd_url}/api/v1/challenges", headers=config.headers_admin, data=json.dumps(base_challenge)) self.assertEqual(r.status_code, 500) # CTFd return internal server error # https://github.com/CTFd/CTFd/issues/2674 + + def test_attempt_ctfd_flag(self): + bypass_ratelimit() + + chall_id = create_challenge() + + ctfd_flag = "fallback" + payload = { + "challenge": chall_id, + "content": ctfd_flag, + "data": "", + "type": "static" + + } + r = requests.post(f"{config.ctfd_url}/api/v1/flags", headers=config.headers_admin, data=json.dumps(payload)) + a = json.loads(r.text) + self.assertEqual(a["success"], True) + + post_instance(chall_id) + + payload = { + "challenge_id": chall_id, + "submission": ctfd_flag + } + + r = requests.post(f"{config.ctfd_url}/api/v1/challenges/attempt", headers=config.headers_user, data=json.dumps(payload)) + a = json.loads(r.text) + self.assertEqual(a["success"], True) + self.assertEqual(a["data"]["status"], "correct") + + reset_all_submissions() + delete_instance(chall_id) + delete_challenge(chall_id) + + + def test_attempt_variate_flag(self): + bypass_ratelimit() + + chall_id = create_challenge() + post_instance(chall_id) + + # i can flag with variate flag + r = get_admin_instance(chall_id, get_source_id()) + a = json.loads(r.text) + + payload = { + "challenge_id": chall_id, + "submission": a["data"]["message"]["flag"] + } + + r = requests.post(f"{config.ctfd_url}/api/v1/challenges/attempt", headers=config.headers_user, data=json.dumps(payload)) + a = json.loads(r.text) + self.assertEqual(a["success"], True) + self.assertEqual(a["data"]["status"], "correct") + + # clear + reset_all_submissions() + delete_instance(chall_id) + delete_challenge(chall_id) + + + def test_attempt_expired(self): + bypass_ratelimit() + chall_id = create_challenge() + + payload = { + "challenge_id": chall_id, + "submission": "xx" + } + + r = requests.post(f"{config.ctfd_url}/api/v1/challenges/attempt", headers=config.headers_user, data=json.dumps(payload)) + a = json.loads(r.text) + self.assertEqual(a["success"], True) + self.assertEqual(a["data"]["status"], "incorrect") + self.assertIn("Expired", a["data"]["message"]) + + reset_all_submissions() + delete_challenge(chall_id) \ No newline at end of file diff --git a/test/utils.py b/test/utils.py index 916a731..b273527 100644 --- a/test/utils.py +++ b/test/utils.py @@ -103,6 +103,10 @@ def get_instance(challengeId: int): r = requests.get(f"{config.plugin_url}/instance?challengeId={challengeId}", headers=config.headers_user) return r +def get_admin_instance(challengeId: int, sourceId: int): + r = requests.get(f"{config.plugin_url}/admin/instance?challengeId={challengeId}&sourceId={sourceId}", headers=config.headers_admin) + return r + def delete_instance(challengeId: int): r = requests.delete(f"{config.plugin_url}/instance?challengeId={challengeId}", headers=config.headers_user) return r @@ -118,5 +122,40 @@ def run_post_instance(challengeId: int, results: dict, lock: threading.Lock): # Store the result in a shared dictionary with the challengeId as the key results[challengeId] = json.loads(r.text) +def get_source_id(): + r = requests.get(f"{config.ctfd_url}/api/v1/configs/user_mode", headers=config.headers_admin) + a = json.loads(r.text) + + user_mode = a["data"]["value"] + + r = requests.get(f"{config.ctfd_url}/api/v1/users/me", headers=config.headers_user) + a = json.loads(r.text) + + sourceId = a["data"]["id"] + if user_mode == "teams": + sourceId = a["data"]["team_id"] + + return sourceId + +def bypass_ratelimit(): + payload = { + "key": "incorrect_submissions_per_min", + "value": "99999999999" + } + + r = requests.patch(f"{config.ctfd_url}/api/v1/configs", headers=config.headers_admin, data=json.dumps(payload)) + return r + +def reset_all_submissions(): + r = requests.get(f"{config.ctfd_url}/api/v1/submissions", headers=config.headers_admin) + a = json.loads(r.text) + + submissions = [] + for i in a["data"]: + submissions.append(i["id"]) + + for id in submissions: + r = requests.delete(f"{config.ctfd_url}/api/v1/submissions/{id}", headers=config.headers_admin) + config.scenario_id = push_scenario_as_file(config.scenario_path)