Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Use random IV in AtClient put/get methods #64

Draft
wants to merge 10 commits into
base: trunk
Choose a base branch
from
17 changes: 12 additions & 5 deletions at_client/atclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import traceback

from at_client.connections.notification.atevents import AtEvent, AtEventType
from at_client.util.iv_nonce import IVNonce


from .common.atsign import AtSign
Expand Down Expand Up @@ -187,7 +188,10 @@ def _put_self_key(self, key: SelfKey, value: str):
key.metadata.data_signature = EncryptionUtil.sign_sha256_rsa(value, self.keys[KeysUtil.encryption_private_key_name])

try:
cipher_text = EncryptionUtil.aes_encrypt_from_base64(value, self.keys[KeysUtil.self_encryption_key_name])
if key.metadata.iv_nonce is None:
raise AtMissingIVException("Missing IV for SelfKey")

cipher_text = EncryptionUtil.aes_encrypt_from_base64(value, self.keys[KeysUtil.self_encryption_key_name], IVNonce.get_bytes_from_b64(key.metadata.iv_nonce))
except Exception as e:
raise AtEncryptionException(f"Failed to encrypt value with self encryption key - {e}")

Expand All @@ -214,11 +218,14 @@ def _put_shared_key(self, key: SharedKey, value: str):
what = ""
cipher_text = None
try:
if key.metadata.iv_nonce is None:
raise AtMissingIVException("Missing IV for SharedKey")

what = "fetch/create shared encryption key"
share_to_encryption_key = self.get_encryption_key_shared_by_me(key)

what = "encrypt value with shared encryption key"
cipher_text = EncryptionUtil.aes_encrypt_from_base64(value, share_to_encryption_key)
cipher_text = EncryptionUtil.aes_encrypt_from_base64(value, share_to_encryption_key, IVNonce.get_bytes_from_b64(key.metadata.iv_nonce))
except Exception as e:
raise AtEncryptionException(f"Failed to {what} - {e}")

Expand Down Expand Up @@ -264,7 +271,7 @@ def _get_self_key(self, key: SelfKey):
encrypted_value = fetched["data"]
self_encryption_key = self.keys[KeysUtil.self_encryption_key_name]
try:
decrypted_value = EncryptionUtil.aes_decrypt_from_base64(encrypted_value, self_encryption_key)
decrypted_value = EncryptionUtil.aes_decrypt_from_base64(encrypted_value, self_encryption_key, base64.b64decode(key.metadata.iv_nonce))
except Exception as e:
raise AtDecryptionException(f"Failed to {command} - {e}")

Expand Down Expand Up @@ -305,7 +312,7 @@ def _get_shared_by_me_with_other(self, shared_key: SharedKey):
raise AtSecondaryConnectException(f"Failed to execute {command} - {e}")

try:
return EncryptionUtil.aes_decrypt_from_base64(raw_response.get_raw_data_response(), share_encryption_key)
return EncryptionUtil.aes_decrypt_from_base64(raw_response.get_raw_data_response(), share_encryption_key, IVNonce.get_bytes_from_b64(shared_key.metadata.iv_nonce))
except Exception as e:
raise AtDecryptionException(f"Failed to decrypt value with shared encryption key - {e}")

Expand All @@ -325,7 +332,7 @@ def _get_shared_by_other_with_me(self, shared_key:SharedKey):

what = "decrypt value with shared encryption key"
try:
return EncryptionUtil.aes_decrypt_from_base64(raw_response.get_raw_data_response(), share_encryption_key)
return EncryptionUtil.aes_decrypt_from_base64(raw_response.get_raw_data_response(), share_encryption_key, IVNonce.get_bytes_from_b64(shared_key.metadata.iv_nonce))
except Exception as e:
raise AtDecryptionException(f"Failed to {what} - {e}")

Expand Down
4 changes: 4 additions & 0 deletions at_client/common/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ def set_time_to_live(self, ttl: int):
def set_time_to_birth(self, ttb: int):
self.metadata.ttb = ttb
return self

def set_iv_nonce(self, iv_nonce: str):
self.metadata.iv_nonce = iv_nonce
return self


class PublicKey(AtKey):
Expand Down
2 changes: 1 addition & 1 deletion at_client/common/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def __str__(self):
if self.encoding:
s += f":encoding:{self.encoding}"
if self.iv_nonce:
s += f":ivNonce:{binascii.b2a_base64(self.iv_nonce).decode('utf-8')[:-1]}"
s += f":ivNonce:{self.iv_nonce}"
# TO?DO: Add new parameters

return s
Expand Down
4 changes: 4 additions & 0 deletions at_client/exception/atexception.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,9 @@ def __init__(self, message):
super().__init__(message)

class AtRegistrarException(AtException):
def __init__(self, message):
super().__init__(message)

class AtMissingIVException(AtException):
def __init__(self, message):
super().__init__(message)
3 changes: 1 addition & 2 deletions at_client/util/encryptionutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,4 @@ def public_key_from_base64(s):

@staticmethod
def generate_iv_nonce():
return secrets.token_bytes(16)

return secrets.token_bytes(16)
34 changes: 34 additions & 0 deletions at_client/util/iv_nonce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import base64
from at_client.util.encryptionutil import EncryptionUtil


class IVNonce:
def __init__(self, iv_nonce_bytes=EncryptionUtil.generate_iv_nonce()):
self.iv_nonce_bytes = iv_nonce_bytes

def __repr__(self):
return str(self)

def __str__(self):
return self.as_b64()

def as_b64(self):
b64_str = ""
if self.iv_nonce_bytes:
b64_str = base64.b64encode(self.iv_nonce_bytes).rstrip().decode('utf-8')
return b64_str

def as_bytes(self):
return self.iv_nonce_bytes

@classmethod
def from_b64(cls, b64_str: str):
return cls(base64.b64decode(b64_str))

def set_iv_nonce_bytes(self, iv_nonce_bytes):
self.iv_nonce_bytes = iv_nonce_bytes
return self

@staticmethod
def get_bytes_from_b64(b64_str: str):
return base64.b64decode(b64_str)
2 changes: 1 addition & 1 deletion examples/repl.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def main():
raise Exception("Could not evaluate the key type of: " + fullKeyName)
elif verb == "put":
fullKeyName = parts[1]
value = command[verb.length() + fullKeyName.length() + 2:].strip()
value = command[len(verb) + len(fullKeyName) + 2:].strip()
keyStringUtil = KeyStringUtil(full_key_name=fullKeyName)
keyType = keyStringUtil.get_key_type()
if keyType == KeyType.PUBLIC_KEY:
Expand Down
25 changes: 15 additions & 10 deletions test/atclient_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from at_client.common import AtSign
from at_client.common.keys import PublicKey, SelfKey, SharedKey
from at_client.exception import *
from at_client.util.iv_nonce import IVNonce
from test_wrapper import skip_if_dependabot_pr

class AtClientTest(unittest.TestCase):
Expand Down Expand Up @@ -58,7 +59,8 @@ def test_put_self_key(self):
"""Test Put Function with Self Key"""
atsign = AtSign(self.atsign1)
atclient = AtClient(atsign, verbose=self.verbose)
sk = SelfKey("test_self_key", atsign)
iv = IVNonce().as_b64()
sk = SelfKey("test_self_key", atsign).set_iv_nonce(iv)
response = atclient.put(sk, "test1")
self.assertIsNotNone(response)

Expand All @@ -68,14 +70,14 @@ def test_put_shared_key(self):
shared_by = AtSign(self.atsign1)
shared_with = AtSign(self.atsign2)
atclient = AtClient(shared_by, verbose=self.verbose)
sk = SharedKey("test_shared_key", shared_by, shared_with)
sk = SharedKey("test_shared_key", shared_by, shared_with).set_iv_nonce(IVNonce().as_b64())
response = atclient.put(sk, "test1")
self.assertIsNotNone(response)

shared_with = AtSign(self.atsign1)
shared_by = AtSign(self.atsign2)
atclient = AtClient(shared_by, verbose=self.verbose)
sk = SharedKey("test_shared_key2", shared_by, shared_with)
sk = SharedKey("test_shared_key2", shared_by, shared_with).set_iv_nonce(IVNonce().as_b64())
response = atclient.put(sk, "test2")
self.assertIsNotNone(response)

Expand Down Expand Up @@ -169,7 +171,8 @@ def test_get_self_key(self):
"""Test Get Function with Self Key"""
atsign = AtSign(self.atsign1)
atclient = AtClient(atsign, verbose=self.verbose)
sk = SelfKey("test_self_key", atsign)
iv = IVNonce().as_b64()
sk = SelfKey("test_self_key", atsign).set_iv_nonce(iv)
response = atclient.put(sk, "test1")
response = atclient.get(sk)
self.assertEqual("test1", response)
Expand Down Expand Up @@ -206,15 +209,17 @@ def test_get_shared_key(self):
shared_by = AtSign(self.atsign1)
shared_with = AtSign(self.atsign2)
atclient = AtClient(shared_by, verbose=self.verbose)
sk = SharedKey("test_shared_key1445", shared_by, shared_with)
sk = SharedKey("test_shared_key1445", shared_by, shared_with).set_iv_nonce(IVNonce().as_b64())
atclient.put(sk, "test")
response = atclient.get(sk)
self.assertEqual("test", response)

# Shared by other with me
sk = SharedKey("test_shared_key2", shared_with, shared_by)
atclient.put(SharedKey("test_shared_key2", shared_by, shared_with), "test2")
response = atclient.get(sk)
iv = IVNonce().as_b64()
sk1 = SharedKey("test_shared_key2", shared_with, shared_by).set_iv_nonce(iv)
sk2 = SharedKey("test_shared_key2", shared_by, shared_with).set_iv_nonce(iv)
atclient.put(sk2, "test2")
response = atclient.get(sk1)
self.assertEqual("test2", response)

# Shared Key not found test
Expand All @@ -240,7 +245,7 @@ def test_delete_self_key(self):
atsign = AtSign(self.atsign1)
atclient = AtClient(atsign, verbose=self.verbose)
random_key_name = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6))
sk = SelfKey(random_key_name, atsign)
sk = SelfKey(random_key_name, atsign).set_iv_nonce(IVNonce().as_b64())
response = atclient.put(sk, "test1")

response = atclient.delete(sk)
Expand All @@ -253,7 +258,7 @@ def test_delete_shared_key(self):
shared_with = AtSign(self.atsign2)
atclient = AtClient(shared_by, verbose=self.verbose)
random_key_name = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6))
sk = SharedKey(random_key_name, shared_by, shared_with)
sk = SharedKey(random_key_name, shared_by, shared_with).set_iv_nonce(IVNonce().as_b64())
response = atclient.put(sk, "test1")

response = atclient.delete(sk)
Expand Down
5 changes: 3 additions & 2 deletions test/encryptionutil_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ def test_aes_encryption(self):
"""Test generating an AES key and encryption/decryption."""
secret_key = EncryptionUtil.generate_aes_key_base64()
plain_text = "AES"
encrypted_text = EncryptionUtil.aes_encrypt_from_base64(plain_text, secret_key)
decrypted_text = EncryptionUtil.aes_decrypt_from_base64(encrypted_text, secret_key)
iv_nonce = EncryptionUtil.generate_iv_nonce()
encrypted_text = EncryptionUtil.aes_encrypt_from_base64(plain_text, secret_key, iv_nonce)
decrypted_text = EncryptionUtil.aes_decrypt_from_base64(encrypted_text, secret_key, iv_nonce)
self.assertEqual(plain_text, decrypted_text)

def test_rsa_encryption(self):
Expand Down
3 changes: 2 additions & 1 deletion test/test_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

def skip_if_dependabot_pr(func):
"""Decorator for skipping a test method if it's a Dependabot PR."""
if int(os.getenv('DEPENDABOT_PR')):
dependabot_pr = os.getenv('DEPENDABOT_PR')
if dependabot_pr is not None and int(dependabot_pr):
return unittest.skip("Dependabot PR")(func)
else:
return func
Expand Down