diff --git a/examples/getST.py b/examples/getST.py index 1c5930632..bd97790fe 100755 --- a/examples/getST.py +++ b/examples/getST.py @@ -8,7 +8,7 @@ # for more information. # # Description: -# Given a password, hash, aesKey or TGT in ccache, it will request a Service Ticket and save it as ccache +# Given a password, hash, aesKey, aesSha2Key or TGT in ccache, it will request a Service Ticket and save it as ccache # If the account has constrained delegation (with protocol transition) privileges you will be able to use # the -impersonate switch to request the ticket on behalf other user (it will use S4U2Self/S4U2Proxy to # request the ticket.) @@ -54,16 +54,32 @@ from impacket.examples import logger from impacket.examples.utils import parse_credentials from impacket.krb5 import constants -from impacket.krb5.asn1 import AP_REQ, AS_REP, TGS_REQ, Authenticator, TGS_REP, seq_set, seq_set_iter, PA_FOR_USER_ENC, \ +from impacket.krb5.asn1 import AP_REQ, AS_REP, TGS_REQ, Authenticator, TGS_REP, seq_set, seq_set_iter, PA_S4U_X509_USER, S4UUserID, \ Ticket as TicketAsn1, EncTGSRepPart, PA_PAC_OPTIONS, EncTicketPart from impacket.krb5.ccache import CCache -from impacket.krb5.crypto import Key, _enctype_table, _HMACMD5, _AES256CTS, Enctype +from impacket.krb5.crypto import Key, _enctype_table, make_checksum, string_to_key, Enctype from impacket.krb5.constants import TicketFlags, encodeFlags from impacket.krb5.kerberosv5 import getKerberosTGS, getKerberosTGT, sendReceive from impacket.krb5.types import Principal, KerberosTime, Ticket from impacket.ntlm import compute_nthash from impacket.winregistry import hexdump +def cksumtype_for_enctype(enctype): + options = { + Enctype.DES_MD4: constants.ChecksumTypes.rsa_md4_des.value, + Enctype.DES_MD5: constants.ChecksumTypes.rsa_md5_des.value, + Enctype.DES3: constants.ChecksumTypes.hmac_sha1_des3_kd.value, + Enctype.RC4: constants.ChecksumTypes.hmac_md5.value, + Enctype.AES128_SHA1: constants.ChecksumTypes.hmac_sha1_96_aes128.value, + Enctype.AES256_SHA1: constants.ChecksumTypes.hmac_sha1_96_aes256.value, + Enctype.AES128_SHA256: constants.ChecksumTypes.hmac_sha256_128_aes128.value, + Enctype.AES256_SHA384: constants.ChecksumTypes.hmac_sha384_192_aes256.value + } + + if enctype not in options: + raise ValueError('No checksum type for enctype %d' % enctype) + return options[enctype] + class GETST: def __init__(self, target, password, domain, options): @@ -73,6 +89,7 @@ def __init__(self, target, password, domain, options): self.__lmhash = '' self.__nthash = '' self.__aesKey = options.aesKey + self.__aesSha2Key = options.aesSha2Key self.__options = options self.__kdcHost = options.dc_ip self.__force_forwardable = options.force_forwardable @@ -88,7 +105,7 @@ def saveTicket(self, ticket, sessionKey): ccache.fromTGS(ticket, sessionKey, sessionKey) ccache.saveFile(self.__saveFileName + '.ccache') - def doS4U2ProxyWithAdditionalTicket(self, tgt, cipher, oldSessionKey, sessionKey, nthash, aesKey, kdcHost, additional_ticket_path): + def doS4U2ProxyWithAdditionalTicket(self, tgt, cipher, oldSessionKey, sessionKey, nthash, aesKey, aesSha2Key, kdcHost, additional_ticket_path): if not os.path.isfile(additional_ticket_path): logging.error("Ticket %s doesn't exist" % additional_ticket_path) exit(0) @@ -118,6 +135,11 @@ def doS4U2ProxyWithAdditionalTicket(self, tgt, cipher, oldSessionKey, sessionKey aesKey = unhexlify(aesKey) except TypeError: pass + if isinstance(aesSha2Key, str): + try: + aesSha2Key = unhexlify(aesSha2Key) + except TypeError: + pass # Compute NTHash and AESKey if they're not provided in arguments if self.__password != '' and self.__domain != '' and self.__user != '': @@ -128,10 +150,16 @@ def doS4U2ProxyWithAdditionalTicket(self, tgt, cipher, oldSessionKey, sessionKey print(hexlify(nthash).decode()) if not aesKey: salt = self.__domain.upper() + self.__user - aesKey = _AES256CTS.string_to_key(self.__password, salt, params=None).contents + aesSha2Key = string_to_key(Enctype.AES256_SHA1, self.__password, salt, params=None).contents if logging.getLogger().level == logging.DEBUG: logging.debug('AESKey') print(hexlify(aesKey).decode()) + if not aesSha2Key: + salt = self.__domain.upper() + self.__user + aesSha2Key = string_to_key(Enctype.AES256_SHA384, self.__password, salt, params=None).contents + if logging.getLogger().level == logging.DEBUG: + logging.debug('AESSHA2Key') + print(hexlify(aesSha2Key).decode()) # Get the encrypted ticket returned in the TGS. It's encrypted with one of our keys cipherText = tgs['ticket']['enc-part']['cipher'] @@ -141,8 +169,10 @@ def doS4U2ProxyWithAdditionalTicket(self, tgt, cipher, oldSessionKey, sessionKey newCipher = _enctype_table[int(tgs['ticket']['enc-part']['etype'])] if newCipher.enctype == Enctype.RC4: key = Key(newCipher.enctype, nthash) - else: + elif newCipher.enctype in (Enctype.AES128_SHA1, Enctype.AES256_SHA1): key = Key(newCipher.enctype, aesKey) + else: + key = Key(newCipher.enctype, aesSha2Key) # Decrypt and decode the ticket # Key Usage 2 @@ -266,10 +296,7 @@ def doS4U2ProxyWithAdditionalTicket(self, tgt, cipher, oldSessionKey, sessionKey reqBody['nonce'] = random.getrandbits(31) seq_set_iter(reqBody, 'etype', ( - int(constants.EncryptionTypes.rc4_hmac.value), - int(constants.EncryptionTypes.des3_cbc_sha1_kd.value), - int(constants.EncryptionTypes.des_cbc_md5.value), - int(cipher.enctype) + int(cipher.enctype), ) ) message = encoder.encode(tgsReq) @@ -295,12 +322,21 @@ def doS4U2ProxyWithAdditionalTicket(self, tgt, cipher, oldSessionKey, sessionKey return r, cipher, sessionKey, newSessionKey - def doS4U(self, tgt, cipher, oldSessionKey, sessionKey, nthash, aesKey, kdcHost): + def doS4U(self, tgt, cipher, oldSessionKey, sessionKey, nthash, aesKey, aesSha2Key, kdcHost): decodedTGT = decoder.decode(tgt, asn1Spec=AS_REP())[0] # Extract the ticket from the TGT ticket = Ticket() ticket.from_asn1(decodedTGT['ticket']) + nonce = random.getrandbits(31) + + tgsReq = TGS_REQ() + + tgsReq['pvno'] = 5 + tgsReq['msg-type'] = int(constants.ApplicationTagNumbers.TGS_REQ.value) + + tgsReq['padata'] = noValue + apReq = AP_REQ() apReq['pvno'] = 5 apReq['msg-type'] = int(constants.ApplicationTagNumbers.AP_REQ.value) @@ -309,6 +345,27 @@ def doS4U(self, tgt, cipher, oldSessionKey, sessionKey, nthash, aesKey, kdcHost) apReq['ap-options'] = constants.encodeFlags(opts) seq_set(apReq, 'ticket', ticket.to_asn1) + reqBody = seq_set(tgsReq, 'req-body') + + opts = list() + opts.append(constants.KDCOptions.forwardable.value) + opts.append(constants.KDCOptions.renewable.value) + opts.append(constants.KDCOptions.canonicalize.value) + + reqBody['kdc-options'] = constants.encodeFlags(opts) + + serverName = Principal(self.__user, type=constants.PrincipalNameType.NT_UNKNOWN.value) + + seq_set(reqBody, 'sname', serverName.components_to_asn1) + reqBody['realm'] = str(decodedTGT['crealm']) + + now = datetime.datetime.utcnow() + datetime.timedelta(days=1) + + reqBody['till'] = KerberosTime.to_asn1(now) + reqBody['nonce'] = nonce + seq_set_iter(reqBody, 'etype', + (int(cipher.enctype),)) + authenticator = Authenticator() authenticator['authenticator-vno'] = 5 authenticator['crealm'] = str(decodedTGT['crealm']) @@ -318,6 +375,19 @@ def doS4U(self, tgt, cipher, oldSessionKey, sessionKey, nthash, aesKey, kdcHost) seq_set(authenticator, 'cname', clientName.components_to_asn1) + reqBodyEncCksumType = cksumtype_for_enctype(cipher.enctype) + + reqBodyEnc = encoder.encode(reqBody)[2:] + reqBodyEncCksum = make_checksum(reqBodyEncCksumType, sessionKey, 6, reqBodyEnc) + + if logging.getLogger().level == logging.DEBUG: + logging.debug('KDC-REQ-BODY checksum') + hexdump(reqBodyEncCksum) + + authenticator['cksum'] = noValue + authenticator['cksum']['cksumtype'] = int(reqBodyEncCksumType) + authenticator['cksum']['checksum'] = reqBodyEncCksum + now = datetime.datetime.utcnow() authenticator['cusec'] = now.microsecond authenticator['ctime'] = KerberosTime.to_asn1(now) @@ -341,12 +411,6 @@ def doS4U(self, tgt, cipher, oldSessionKey, sessionKey, nthash, aesKey, kdcHost) encodedApReq = encoder.encode(apReq) - tgsReq = TGS_REQ() - - tgsReq['pvno'] = 5 - tgsReq['msg-type'] = int(constants.ApplicationTagNumbers.TGS_REQ.value) - - tgsReq['padata'] = noValue tgsReq['padata'][0] = noValue tgsReq['padata'][0]['padata-type'] = int(constants.PreAuthenticationDataTypes.PA_TGS_REQ.value) tgsReq['padata'][0]['padata-value'] = encodedApReq @@ -356,61 +420,34 @@ def doS4U(self, tgt, cipher, oldSessionKey, sessionKey, nthash, aesKey, kdcHost) # identified to the KDC by the user's name and realm. clientName = Principal(self.__options.impersonate, type=constants.PrincipalNameType.NT_PRINCIPAL.value) - S4UByteArray = struct.pack('<:PASSWORD> + OR <:PASSWORD> :param credentials: credentials to parse @@ -55,6 +59,10 @@ def parse_credentials(credentials): :return: tuple of domain, username and password :rtype: (string, string, string) """ - domain, username, password = credential_regex.match(credentials).groups('') + res = krb5_credential_regex.match(credentials) + if res: + username, domain, password = res.groups('') + else: + domain, username, password = ad_credential_regex.match(credentials).groups('') return domain, username, password diff --git a/impacket/krb5/asn1.py b/impacket/krb5/asn1.py index 89a103356..0414c1a4f 100644 --- a/impacket/krb5/asn1.py +++ b/impacket/krb5/asn1.py @@ -502,6 +502,19 @@ class PA_FOR_USER_ENC(univ.Sequence): _sequence_optional_component('cksum', 2, Checksum()), _sequence_optional_component('auth-package', 3, KerberosString())) +class S4UUserID(univ.Sequence): + componentType = namedtype.NamedTypes( + _sequence_component('nonce', 0, UInt32()), + _sequence_optional_component("cname", 1, PrincipalName()), + _sequence_component("crealm", 2, Realm()), + _sequence_optional_component("subject-certificate", 3, univ.OctetString()), + _sequence_optional_component('options', 4, univ.BitString())) + +class PA_S4U_X509_USER(univ.Sequence): + componentType = namedtype.NamedTypes( + _sequence_component('user-id', 0, S4UUserID()), + _sequence_component('checksum', 1, Checksum())) + class KERB_ERROR_DATA(univ.Sequence): componentType = namedtype.NamedTypes( _sequence_component('data-type', 1, Int32()), diff --git a/impacket/krb5/constants.py b/impacket/krb5/constants.py index 24cf1b123..82d62c2ed 100644 --- a/impacket/krb5/constants.py +++ b/impacket/krb5/constants.py @@ -102,6 +102,7 @@ class PreAuthenticationDataTypes(Enum): TD_REQ_SEQ = 108 PA_PAC_REQUEST = 128 PA_FOR_USER = 129 + PA_S4U_X509_USER = 130 PA_FX_COOKIE = 133 PA_FX_FAST = 136 PA_FX_ERROR = 137 @@ -460,6 +461,8 @@ class EncryptionTypes(Enum): des3_cbc_sha1_kd = 16 aes128_cts_hmac_sha1_96 = 17 aes256_cts_hmac_sha1_96 = 18 + aes128_cts_hmac_sha256_128 = 19 + aes256_cts_hmac_sha384_192 = 20 rc4_hmac = 23 rc4_hmac_exp = 24 subkey_keymaterial = 65 @@ -472,3 +475,5 @@ class ChecksumTypes(Enum): hmac_sha1_des3_kd = 12 hmac_sha1_96_aes128 = 15 hmac_sha1_96_aes256 = 16 + hmac_sha256_128_aes128 = 19 + hmac_sha384_192_aes256 = 20 diff --git a/impacket/krb5/crypto.py b/impacket/krb5/crypto.py index 673554bf9..006679967 100644 --- a/impacket/krb5/crypto.py +++ b/impacket/krb5/crypto.py @@ -55,24 +55,28 @@ from struct import pack, unpack from Cryptodome.Cipher import AES, DES3, ARC4, DES -from Cryptodome.Hash import HMAC, MD4, MD5, SHA -from Cryptodome.Protocol.KDF import PBKDF2 +from Cryptodome.Hash import HMAC, MD4, MD5, SHA1, SHA256, SHA384 +from Cryptodome.Protocol.KDF import PBKDF2, SP800_108_Counter from Cryptodome.Util.number import GCD as gcd from six import b, PY3, indexbytes, binary_type +from impacket.krb5 import constants + def get_random_bytes(lenBytes): # We don't really need super strong randomness here to use PyCrypto.Random return urandom(lenBytes) class Enctype(object): - DES_CRC = 1 - DES_MD4 = 2 - DES_MD5 = 3 - DES3 = 16 - AES128 = 17 - AES256 = 18 - RC4 = 23 + DES_CRC = constants.EncryptionTypes.des_cbc_crc.value + DES_MD4 = constants.EncryptionTypes.des_cbc_md4.value + DES_MD5 = constants.EncryptionTypes.des_cbc_md5.value + DES3 = constants.EncryptionTypes.des3_cbc_sha1_kd.value + AES128_SHA1 = constants.EncryptionTypes.aes128_cts_hmac_sha1_96.value + AES256_SHA1 = constants.EncryptionTypes.aes256_cts_hmac_sha1_96.value + AES128_SHA256 = constants.EncryptionTypes.aes128_cts_hmac_sha256_128.value + AES256_SHA384 = constants.EncryptionTypes.aes256_cts_hmac_sha384_192.value + RC4 = constants.EncryptionTypes.rc4_hmac.value class Cksumtype(object): @@ -80,12 +84,14 @@ class Cksumtype(object): MD4 = 2 MD4_DES = 3 MD5 = 7 - MD5_DES = 8 + MD5_DES = constants.ChecksumTypes.rsa_md5_des.value SHA1 = 9 - SHA1_DES3 = 12 - SHA1_AES128 = 15 - SHA1_AES256 = 16 - HMAC_MD5 = -138 + SHA1_DES3 = constants.ChecksumTypes.hmac_sha1_des3_kd.value + SHA1_AES128 = constants.ChecksumTypes.hmac_sha1_96_aes128.value + SHA1_AES256 = constants.ChecksumTypes.hmac_sha1_96_aes256.value + SHA256_AES128 = constants.ChecksumTypes.hmac_sha256_128_aes128.value + SHA384_AES256 = constants.ChecksumTypes.hmac_sha384_192_aes256.value + HMAC_MD5 = constants.ChecksumTypes.hmac_md5.value class InvalidChecksum(ValueError): @@ -383,7 +389,7 @@ class _DES3CBC(_SimplifiedEnctype): blocksize = 8 padsize = 8 macsize = 20 - hashmod = SHA + hashmod = SHA1 @classmethod def random_to_key(cls, seed): @@ -434,12 +440,53 @@ def basic_decrypt(cls, key, ciphertext): return des3.decrypt(bytes(ciphertext)) -class _AESEnctype(_SimplifiedEnctype): +def basic_decrypt_all_aes(cls, key, ciphertext, iv): + bs = cls.blocksize + assert len(ciphertext) >= bs + aes = AES.new(key.contents, AES.MODE_ECB) + if len(ciphertext) == bs: + return aes.decrypt(ciphertext) + # Split the ciphertext into blocks. The last block may be partial. + cblocks = [bytearray(ciphertext[p:p+bs]) for p in range(0, len(ciphertext), bs)] + lastlen = len(cblocks[-1]) + # CBC-decrypt all but the last two blocks. + prev_cblock = bytearray(iv) + plaintext = b'' + for bb in cblocks[:-2]: + plaintext += _xorbytes(bytearray(aes.decrypt(bytes(bb))), prev_cblock) + prev_cblock = bb + # Decrypt the second-to-last cipher block. The left side of + # the decrypted block will be the final block of plaintext + # xor'd with the final partial cipher block; the right side + # will be the omitted bytes of ciphertext from the final + # block. + bb = bytearray(aes.decrypt(bytes(cblocks[-2]))) + lastplaintext =_xorbytes(bb[:lastlen], cblocks[-1]) + omitted = bb[lastlen:] + # Decrypt the final cipher block plus the omitted bytes to get + # the second-to-last plaintext block. + plaintext += _xorbytes(bytearray(aes.decrypt(bytes(cblocks[-1]) + bytes(omitted))), prev_cblock) + return plaintext + lastplaintext + +def basic_encrypt_all_aes(cls, key, plaintext, iv): + bs = cls.blocksize + assert len(plaintext) >= bs + aes = AES.new(key.contents, AES.MODE_CBC, iv) + ctext = aes.encrypt(_zeropad(bytes(plaintext), bs)) + if len(plaintext) > bs: + # Swap the last two ciphertext blocks and truncate the + # final block to match the plaintext length. + lastlen = len(plaintext) % bs or bs + ctext = ctext[:-(bs*2)] + ctext[-bs:] + ctext[-(bs*2):-bs][:lastlen] + return ctext + + +class _AES_SHA1_Enctype(_SimplifiedEnctype): # Base class for aes128-cts and aes256-cts. blocksize = 16 padsize = 1 macsize = 12 - hashmod = SHA + hashmod = SHA1 @classmethod def string_to_key(cls, string, salt, params): @@ -449,64 +496,130 @@ def string_to_key(cls, string, salt, params): salt = salt.encode("utf-8") (iterations,) = unpack('>L', params or b'\x00\x00\x10\x00') - prf = lambda p, s: HMAC.new(p, s, SHA).digest() + prf = lambda p, s: HMAC.new(p, s, SHA1).digest() seed = PBKDF2(string, salt, cls.seedsize, iterations, prf) tkey = cls.random_to_key(seed) return cls.derive(tkey, b'kerberos') @classmethod def basic_encrypt(cls, key, plaintext): - assert len(plaintext) >= 16 - aes = AES.new(key.contents, AES.MODE_CBC, b'\0' * 16) - ctext = aes.encrypt(_zeropad(bytes(plaintext), 16)) - if len(plaintext) > 16: - # Swap the last two ciphertext blocks and truncate the - # final block to match the plaintext length. - lastlen = len(plaintext) % 16 or 16 - ctext = ctext[:-32] + ctext[-16:] + ctext[-32:-16][:lastlen] - return ctext + iv = bytes(cls.blocksize) + return basic_encrypt_all_aes(cls, key, plaintext, iv) @classmethod def basic_decrypt(cls, key, ciphertext): - assert len(ciphertext) >= 16 - aes = AES.new(key.contents, AES.MODE_ECB) - if len(ciphertext) == 16: - return aes.decrypt(ciphertext) - # Split the ciphertext into blocks. The last block may be partial. - cblocks = [bytearray(ciphertext[p:p+16]) for p in range(0, len(ciphertext), 16)] - lastlen = len(cblocks[-1]) - # CBC-decrypt all but the last two blocks. - prev_cblock = bytearray(16) - plaintext = b'' - for bb in cblocks[:-2]: - plaintext += _xorbytes(bytearray(aes.decrypt(bytes(bb))), prev_cblock) - prev_cblock = bb - # Decrypt the second-to-last cipher block. The left side of - # the decrypted block will be the final block of plaintext - # xor'd with the final partial cipher block; the right side - # will be the omitted bytes of ciphertext from the final - # block. - bb = bytearray(aes.decrypt(bytes(cblocks[-2]))) - lastplaintext =_xorbytes(bb[:lastlen], cblocks[-1]) - omitted = bb[lastlen:] - # Decrypt the final cipher block plus the omitted bytes to get - # the second-to-last plaintext block. - plaintext += _xorbytes(bytearray(aes.decrypt(bytes(cblocks[-1]) + bytes(omitted))), prev_cblock) - return plaintext + lastplaintext - - -class _AES128CTS(_AESEnctype): - enctype = Enctype.AES128 + iv = bytes(cls.blocksize) + return basic_decrypt_all_aes(cls, key, ciphertext, iv) + + +class _AES128_SHA1_CTS(_AES_SHA1_Enctype): + enctype = Enctype.AES128_SHA1 keysize = 16 seedsize = 16 -class _AES256CTS(_AESEnctype): - enctype = Enctype.AES256 +class _AES256_SHA1_CTS(_AES_SHA1_Enctype): + enctype = Enctype.AES256_SHA1 keysize = 32 seedsize = 32 +class _RFC8009_Enctype(_EnctypeProfile): + # Base class for aes128-cts-hmac-sha256-128 and aes256-cts-hmac-sha384-192. + blocksize = 128 // 8 # Cipher block size + seedsize = None # PRF output size + keysize = None # Encryption key size + macsize = None # Integrity key size + hashmod = None # Hash function module + enctype_name = None # Encryption type name as byte string + + @classmethod + def random_to_key(cls, seed): + return Key(cls.enctype, seed) + + @classmethod + def basic_encrypt(cls, key, plaintext, iv): + return basic_encrypt_all_aes(cls, key, plaintext, iv) + + @classmethod + def basic_decrypt(cls, key, ciphertext, iv): + return basic_decrypt_all_aes(cls, key, ciphertext, iv) + + @classmethod + def kdf_hmac_sha2(cls, key, label, k, context=b''): + hmac_sha2 = lambda p, s: HMAC.new(p, s, cls.hashmod).digest() + return SP800_108_Counter(master=key, key_len=k, prf=hmac_sha2, label=label, context=context) + + @classmethod + def derive(cls, key, constant): + return cls.random_to_key(cls.kdf_hmac_sha2(key=key.contents, label=constant, k=cls.macsize)) + + @classmethod + def prf(cls, input_key, string): + return cls.kdf_hmac_sha2(key=input_key.contents, label=b'prf', + k=cls.seedsize, context=string) + + @classmethod + def string_to_key(cls, string, salt, params): + if not isinstance(string, binary_type): + string = string.encode("utf-8") + if not isinstance(salt, binary_type): + salt = salt.encode("utf-8") + + saltp = cls.enctype_name + b'\0' + salt + + iter_count = unpack('>L', params)[0] if params else 32768 + tkey = PBKDF2(password=string, salt=saltp, count=iter_count, dkLen=cls.keysize, hmac_hash_module=cls.hashmod) + return cls.random_to_key(cls.kdf_hmac_sha2(key=tkey, label=b'kerberos', k=cls.keysize)) + + + @classmethod + def encrypt(cls, key, keyusage, plaintext, confounder): + ke = cls.random_to_key(cls.kdf_hmac_sha2(key.contents, pack('>IB', keyusage, 0xAA), cls.keysize)) + ki = cls.random_to_key(cls.kdf_hmac_sha2(key.contents, pack('>IB', keyusage, 0x55), cls.macsize)) + n = get_random_bytes(cls.blocksize) + # Initial cipher state is a zeroed buffer + iv = bytes(cls.blocksize) + c = cls.basic_encrypt(ke, n + plaintext, iv) + h = HMAC.new(ki.contents, iv + c, cls.hashmod).digest() + ciphertext = c + h[:cls.macsize] + assert(plaintext == cls.decrypt(key, keyusage, ciphertext)) + return ciphertext + + @classmethod + def decrypt(cls, key, keyusage, ciphertext): + if not isinstance(ciphertext, binary_type): + ciphertext = bytes(ciphertext) + ke = cls.random_to_key(cls.kdf_hmac_sha2(key.contents, pack('>IB', keyusage, 0xAA), cls.keysize)) + ki = cls.random_to_key(cls.kdf_hmac_sha2(key.contents, pack('>IB', keyusage, 0x55), cls.macsize)) + c = ciphertext[:-cls.macsize] + h = ciphertext[-cls.macsize:] + # Initial cipher state is a zeroed buffer + iv = bytes(cls.blocksize) + if h != HMAC.new(ki.contents, iv + c, cls.hashmod).digest()[:cls.macsize]: + raise InvalidChecksum('ciphertext integrity failure') + plaintext = cls.basic_decrypt(ke, c, iv)[cls.blocksize:] + return plaintext + + +class _AES128_SHA256_CTS(_RFC8009_Enctype): + enctype = Enctype.AES128_SHA256 + seedsize = 256 // 8 + macsize = 128 // 8 + keysize = 128 // 8 + hashmod = SHA256 + enctype_name = b'aes128-cts-hmac-sha256-128' + + +class _AES256_SHA384_CTS(_RFC8009_Enctype): + enctype = Enctype.AES256_SHA384 + seedsize = 384 // 8 + macsize = 192 // 8 + keysize = 256 // 8 + hashmod = SHA384 + enctype_name = b'aes256-cts-hmac-sha384-192' + + class _RC4(_EnctypeProfile): enctype = Enctype.RC4 keysize = 16 @@ -556,7 +669,7 @@ def decrypt(cls, key, keyusage, ciphertext): @classmethod def prf(cls, key, string): - return HMAC.new(key.contents, bytes(string), SHA).digest() + return HMAC.new(key.contents, bytes(string), SHA1).digest() class _ChecksumProfile(object): @@ -591,14 +704,24 @@ def verify(cls, key, keyusage, text, cksum): super(_SimplifiedChecksum, cls).verify(key, keyusage, text, cksum) +class _SHA256AES128(_SimplifiedChecksum): + macsize = _AES128_SHA256_CTS.macsize + enc = _AES128_SHA256_CTS + + +class _SHA384AES256(_SimplifiedChecksum): + macsize = _AES256_SHA384_CTS.macsize + enc = _AES256_SHA384_CTS + + class _SHA1AES128(_SimplifiedChecksum): macsize = 12 - enc = _AES128CTS + enc = _AES128_SHA1_CTS class _SHA1AES256(_SimplifiedChecksum): macsize = 12 - enc = _AES256CTS + enc = _AES256_SHA1_CTS class _SHA1DES3(_SimplifiedChecksum): @@ -623,9 +746,11 @@ def verify(cls, key, keyusage, text, cksum): _enctype_table = { Enctype.DES_MD5: _DESCBC, Enctype.DES3: _DES3CBC, - Enctype.AES128: _AES128CTS, - Enctype.AES256: _AES256CTS, - Enctype.RC4: _RC4 + Enctype.AES128_SHA1: _AES128_SHA1_CTS, + Enctype.AES256_SHA1: _AES256_SHA1_CTS, + Enctype.RC4: _RC4, + Enctype.AES128_SHA256: _AES128_SHA256_CTS, + Enctype.AES256_SHA384: _AES256_SHA384_CTS } @@ -634,7 +759,9 @@ def verify(cls, key, keyusage, text, cksum): Cksumtype.SHA1_AES128: _SHA1AES128, Cksumtype.SHA1_AES256: _SHA1AES256, Cksumtype.HMAC_MD5: _HMACMD5, - 0xffffff76: _HMACMD5 + 0xffffff76: _HMACMD5, + Cksumtype.SHA256_AES128: _SHA256AES128, + Cksumtype.SHA384_AES256: _SHA384AES256 } @@ -653,7 +780,7 @@ def _get_checksum_profile(cksumtype): class Key(object): def __init__(self, enctype, contents): e = _get_enctype_profile(enctype) - if len(contents) != e.keysize: + if len(contents) != e.keysize and len(contents) != e.macsize: raise ValueError('Wrong key length') self.enctype = enctype self.contents = contents diff --git a/impacket/krb5/gssapi.py b/impacket/krb5/gssapi.py index 948e723b4..1431cc6dc 100644 --- a/impacket/krb5/gssapi.py +++ b/impacket/krb5/gssapi.py @@ -65,9 +65,13 @@ class CheckSumField(Structure): def GSSAPI(cipher): if cipher.enctype == constants.EncryptionTypes.aes256_cts_hmac_sha1_96.value: - return GSSAPI_AES256() + return GSSAPI_AES256_SHA1() if cipher.enctype == constants.EncryptionTypes.aes128_cts_hmac_sha1_96.value: - return GSSAPI_AES128() + return GSSAPI_AES128_SHA1() + if cipher.enctype == constants.EncryptionTypes.aes256_cts_hmac_sha384_192.value: + return GSSAPI_AES256_SHA2() + if cipher.enctype == constants.EncryptionTypes.aes128_cts_hmac_sha256_128.value: + return GSSAPI_AES128_SHA2() elif cipher.enctype == constants.EncryptionTypes.rc4_hmac.value: return GSSAPI_RC4() else: @@ -194,7 +198,7 @@ def GSS_Wrap(self, sessionKey, data, sequenceNumber, direction = 'init', encrypt def GSS_Unwrap(self, sessionKey, data, sequenceNumber, direction = 'init', encrypt=True, authData=None): return self.GSS_Wrap(sessionKey, data, sequenceNumber, direction, encrypt, authData) -class GSSAPI_AES(): +class GSSAPI_AES_SHA1(): checkSumProfile = None cipherType = None @@ -289,10 +293,22 @@ def GSS_Unwrap(self, sessionKey, data, sequenceNumber, direction = 'init', encry return plainText[:-(token['EC']+len(self.WRAP()))], None -class GSSAPI_AES256(GSSAPI_AES): +class GSSAPI_AES256_SHA1(GSSAPI_AES_SHA1): checkSumProfile = crypto._SHA1AES256 - cipherType = crypto._AES256CTS + cipherType = crypto._AES256_SHA1_CTS -class GSSAPI_AES128(GSSAPI_AES): +class GSSAPI_AES128_SHA1(GSSAPI_AES_SHA1): checkSumProfile = crypto._SHA1AES128 - cipherType = crypto._AES128CTS + cipherType = crypto._AES128_SHA1_CTS + +class GSSAPI_AES_SHA2(): + checkSumProfile = None + cipherType = None + +class GSSAPI_AES128_SHA256(GSSAPI_AES_SHA2): + checkSumProfile = crypto._SHA256AES128 + cipherType = crypto._AES128_SHA256_CTS + +class GSSAPI_AES256_SHA384(GSSAPI_AES_SHA2): + checkSumProfile = crypto._SHA384AES256 + cipherType = crypto._AES256_SHA384_CTS diff --git a/impacket/krb5/kerberosv5.py b/impacket/krb5/kerberosv5.py index d81af8590..7ef9ebbb8 100644 --- a/impacket/krb5/kerberosv5.py +++ b/impacket/krb5/kerberosv5.py @@ -92,7 +92,7 @@ def sendReceive(data, host, kdcHost, port=88): return r -def getKerberosTGT(clientName, password, domain, lmhash, nthash, aesKey='', kdcHost=None, requestPAC=True, serverName=None, kerberoast_no_preauth=False): +def getKerberosTGT(clientName, password, domain, lmhash, nthash, aesKey='', aesSha2Key='', kdcHost=None, requestPAC=True, serverName=None, kerberoast_no_preauth=False): # Convert to binary form, just in case we're receiving strings if isinstance(lmhash, str): @@ -110,6 +110,11 @@ def getKerberosTGT(clientName, password, domain, lmhash, nthash, aesKey='', kdcH aesKey = unhexlify(aesKey) except TypeError: pass + if isinstance(aesSha2Key, str): + try: + aesSha2Key = unhexlify(aesSha2Key) + except TypeError: + pass if serverName is not None and not isinstance(serverName, Principal): try: serverName = Principal(serverName, type=constants.PrincipalNameType.NT_PRINCIPAL.value) @@ -162,7 +167,13 @@ def getKerberosTGT(clientName, password, domain, lmhash, nthash, aesKey='', kdcH if aesKey is None: aesKey = b'' - if nthash == b'': + if aesSha2Key is None: + aesSha2Key = b'' + + if nthash != b'': + # We have hashes to try, only way is to request RC4 only + supportedCiphers = (int(constants.EncryptionTypes.rc4_hmac.value),) + else: # This is still confusing. I thought KDC_ERR_ETYPE_NOSUPP was enough, # but I found some systems that accepts all ciphers, and trigger an error # when requesting subsequent TGS :(. More research needed. @@ -174,11 +185,14 @@ def getKerberosTGT(clientName, password, domain, lmhash, nthash, aesKey='', kdcH supportedCiphers = (int(constants.EncryptionTypes.aes256_cts_hmac_sha1_96.value),) else: supportedCiphers = (int(constants.EncryptionTypes.aes128_cts_hmac_sha1_96.value),) + elif aesSha2Key != b'': + if len(aesSha2Key) == 32: + supportedCiphers = (int(constants.EncryptionTypes.aes128_cts_hmac_sha256_128.value),) + else: + supportedCiphers = (int(constants.EncryptionTypes.aes256_cts_hmac_sha384_192.value),) else: - supportedCiphers = (int(constants.EncryptionTypes.aes256_cts_hmac_sha1_96.value),) - else: - # We have hashes to try, only way is to request RC4 only - supportedCiphers = (int(constants.EncryptionTypes.rc4_hmac.value),) + supportedCiphers = (int(constants.EncryptionTypes.aes256_cts_hmac_sha384_192.value), + int(constants.EncryptionTypes.aes256_cts_hmac_sha1_96.value),) seq_set_iter(reqBody, 'etype', supportedCiphers) @@ -188,7 +202,12 @@ def getKerberosTGT(clientName, password, domain, lmhash, nthash, aesKey='', kdcH r = sendReceive(message, domain, kdcHost) except KerberosError as e: if e.getErrorCode() == constants.ErrorCodes.KDC_ERR_ETYPE_NOSUPP.value: - if supportedCiphers[0] in (constants.EncryptionTypes.aes128_cts_hmac_sha1_96.value, constants.EncryptionTypes.aes256_cts_hmac_sha1_96.value) and aesKey == b'': + if ( (supportedCiphers[0] in (constants.EncryptionTypes.aes128_cts_hmac_sha1_96.value, + constants.EncryptionTypes.aes256_cts_hmac_sha1_96.value) + and aesKey == b'') + or (supportedCiphers[0] in (constants.EncryptionTypes.aes128_cts_hmac_sha256_128_.value, + constants.EncryptionTypes.aes256_cts_hmac_sha384_192.value) + and aesSha2Key == b'')): supportedCiphers = (int(constants.EncryptionTypes.rc4_hmac.value),) seq_set_iter(reqBody, 'etype', supportedCiphers) message = encoder.encode(asReq) @@ -254,6 +273,8 @@ def getKerberosTGT(clientName, password, domain, lmhash, nthash, aesKey='', kdcH key = Key(cipher.enctype, nthash) elif aesKey != b'': key = Key(cipher.enctype, aesKey) + elif aesSha2Key != b'': + key = Key(cipher.enctype, aesSha2Key) else: key = cipher.string_to_key(password, encryptionTypesData[enctype], None) @@ -321,11 +342,11 @@ def getKerberosTGT(clientName, password, domain, lmhash, nthash, aesKey='', kdcH tgt = sendReceive(encoder.encode(asReq), domain, kdcHost) except Exception as e: if str(e).find('KDC_ERR_ETYPE_NOSUPP') >= 0: - if lmhash == b'' and nthash == b'' and (aesKey == b'' or aesKey is None): + if lmhash == b'' and nthash == b'' and (aesKey == b'' or aesKey is None) and not aesSha2Key: from impacket.ntlm import compute_lmhash, compute_nthash lmhash = compute_lmhash(password) nthash = compute_nthash(password) - return getKerberosTGT(clientName, password, domain, lmhash, nthash, aesKey, kdcHost, requestPAC) + return getKerberosTGT(clientName, password, domain, lmhash, nthash, aesKey, aesSha2Key, kdcHost, requestPAC) raise @@ -532,7 +553,7 @@ def getKerberosType3(cipher, sessionKey, auth_data): return cipher, sessionKey2, resp.getData() -def getKerberosType1(username, password, domain, lmhash, nthash, aesKey='', TGT = None, TGS = None, targetName='', +def getKerberosType1(username, password, domain, lmhash, nthash, aesKey='', aesSha2Key='', TGT = None, TGS = None, targetName='', kdcHost = None, useCache = True): # Convert to binary form, just in case we're receiving strings @@ -551,6 +572,11 @@ def getKerberosType1(username, password, domain, lmhash, nthash, aesKey='', TGT aesKey = unhexlify(aesKey) except TypeError: pass + if isinstance(aesSha2Key, str): + try: + aesSha2Key = unhexlify(aesSha2Key) + except TypeError: + pass targetName = 'host/%s' % targetName if TGT is None and TGS is None: @@ -563,14 +589,14 @@ def getKerberosType1(username, password, domain, lmhash, nthash, aesKey='', TGT if TGT is None: if TGS is None: try: - tgt, cipher, oldSessionKey, sessionKey = getKerberosTGT(userName, password, domain, lmhash, nthash, aesKey, kdcHost) + tgt, cipher, oldSessionKey, sessionKey = getKerberosTGT(userName, password, domain, lmhash, nthash, aesKey, aesSha2Key, kdcHost) except KerberosError as e: if e.getErrorCode() == constants.ErrorCodes.KDC_ERR_ETYPE_NOSUPP.value: # We might face this if the target does not support AES # So, if that's the case we'll force using RC4 by converting # the password to lm/nt hashes and hope for the best. If that's already # done, byebye. - if lmhash == b'' and nthash == b'' and (aesKey == b'' or aesKey is None) and TGT is None and TGS is None: + if lmhash == b'' and nthash == b'' and (aesKey == b'' or aesKey is None) and not aesSha2Key and TGT is None and TGS is None: from impacket.ntlm import compute_lmhash, compute_nthash LOG.debug('Got KDC_ERR_ETYPE_NOSUPP, fallback to RC4') lmhash = compute_lmhash(password) @@ -597,7 +623,7 @@ def getKerberosType1(username, password, domain, lmhash, nthash, aesKey='', TGT # So, if that's the case we'll force using RC4 by converting # the password to lm/nt hashes and hope for the best. If that's already # done, byebye. - if lmhash == b'' and nthash == b'' and (aesKey == b'' or aesKey is None) and TGT is None and TGS is None: + if lmhash == b'' and nthash == b'' and (aesKey == b'' or aesKey is None) and not aesSha2Key and TGT is None and TGS is None: from impacket.ntlm import compute_lmhash, compute_nthash LOG.debug('Got KDC_ERR_ETYPE_NOSUPP, fallback to RC4') lmhash = compute_lmhash(password) @@ -647,10 +673,14 @@ def getKerberosType1(username, password, domain, lmhash, nthash, aesKey='', TGT authenticator['cksum'] = noValue - authenticator['cksum']['cksumtype'] = 0x8003 - chkField = CheckSumField() - chkField['Lgth'] = 16 + if aesSha2Key: + authenticator['cksum']['cksumtype'] = constants.ChecksumTypes.hmac_sha384_192_aes256 + chkField['Lgth'] = 192 // 8 + else: + authenticator['cksum']['cksumtype'] = 0x8003 + chkField['Lgth'] = 16 + chkField['Flags'] = GSS_C_CONF_FLAG | GSS_C_INTEG_FLAG | GSS_C_SEQUENCE_FLAG | GSS_C_REPLAY_FLAG | GSS_C_MUTUAL_FLAG | GSS_C_DCE_STYLE #chkField['Flags'] = GSS_C_INTEG_FLAG | GSS_C_SEQUENCE_FLAG | GSS_C_REPLAY_FLAG | GSS_C_MUTUAL_FLAG | GSS_C_DCE_STYLE diff --git a/tests/misc/test_krb5_crypto.py b/tests/misc/test_krb5_crypto.py index 6d43f3956..16b8536ac 100644 --- a/tests/misc/test_krb5_crypto.py +++ b/tests/misc/test_krb5_crypto.py @@ -21,92 +21,92 @@ def h(hexstr): class AESTests(unittest.TestCase): - def test_AES128(self): - # AES128 encrypt and decrypt + def test_AES128_SHA1(self): + # AES128 HMAC-SHA1 encrypt and decrypt kb = h('9062430C8CDA3388922E6D6A509F5B7A') conf = h('94B491F481485B9A0678CD3C4EA386AD') keyusage = 2 plain = b'9 bytesss' ctxt = h('68FB9679601F45C78857B2BF820FD6E53ECA8D42FD4B1D7024A09205ABB7CD2E' 'C26C355D2F') - k = Key(Enctype.AES128, kb) + k = Key(Enctype.AES128_SHA1, kb) self.assertEqual(encrypt(k, keyusage, plain, conf), ctxt) self.assertEqual(decrypt(k, keyusage, ctxt), plain) - def test_AES256(self): - # AES256 encrypt and decrypt + def test_AES256_SHA1(self): + # AES256 HMAC-SHA1 encrypt and decrypt kb = h('F1C795E9248A09338D82C3F8D5B567040B0110736845041347235B1404231398') conf = h('E45CA518B42E266AD98E165E706FFB60') keyusage = 4 plain = b'30 bytes bytes bytes bytes byt' ctxt = h('D1137A4D634CFECE924DBC3BF6790648BD5CFF7DE0E7B99460211D0DAEF3D79A' '295C688858F3B34B9CBD6EEBAE81DAF6B734D4D498B6714F1C1D') - k = Key(Enctype.AES256, kb) + k = Key(Enctype.AES256_SHA1, kb) self.assertEqual(encrypt(k, keyusage, plain, conf), ctxt) self.assertEqual(decrypt(k, keyusage, ctxt), plain) - def test_AES128_checksum(self): - # AES128 checksum + def test_AES128_SHA1_checksum(self): + # AES128 HMAC-SHA1 checksum kb = h('9062430C8CDA3388922E6D6A509F5B7A') keyusage = 3 plain = b'eight nine ten eleven twelve thirteen' cksum = h('01A4B088D45628F6946614E3') - k = Key(Enctype.AES128, kb) + k = Key(Enctype.AES128_SHA1, kb) verify_checksum(Cksumtype.SHA1_AES128, k, keyusage, plain, cksum) - def test_AES256_checksum(self): - # AES256 checksum + def test_AES256_SHA1_checksum(self): + # AES256 HMAC-SHA1 checksum kb = h('B1AE4CD8462AFF1677053CC9279AAC30B796FB81CE21474DD3DDBCFEA4EC76D7') keyusage = 4 plain = b'fourteen' cksum = h('E08739E3279E2903EC8E3836') - k = Key(Enctype.AES256, kb) + k = Key(Enctype.AES256_SHA1, kb) verify_checksum(Cksumtype.SHA1_AES256, k, keyusage, plain, cksum) - def test_AES128_string_to_key(self): - # AES128 string-to-key + def test_AES128_SHA1_string_to_key(self): + # AES128 HMAC-SHA1 string-to-key string = 'password' salt = b'ATHENA.MIT.EDUraeburn' params = h('00000002') kb = h('C651BF29E2300AC27FA469D693BDDA13') - k = string_to_key(Enctype.AES128, string, salt, params) + k = string_to_key(Enctype.AES128_SHA1, string, salt, params) self.assertEqual(k.contents, kb) - def test_AES256_string_to_key(self): - # AES256 string-to-key + def test_AES256_SHA1_string_to_key(self): + # AES256 HMAC-SHA1 string-to-key string = 'X' * 64 salt = b'pass phrase equals block size' params = h('000004B0') kb = h('89ADEE3608DB8BC71F1BFBFE459486B05618B70CBAE22092534E56C553BA4B34') - k = string_to_key(Enctype.AES256, string, salt, params) + k = string_to_key(Enctype.AES256_SHA1, string, salt, params) self.assertEqual(k.contents, kb) - def test_AES128_prf(self): - # AES128 prf + def test_AES128_SHA1_prf(self): + # AES128 HMAC-SHA1 prf kb = h('77B39A37A868920F2A51F9DD150C5717') - k = string_to_key(Enctype.AES128, b'key1', b'key1') + k = string_to_key(Enctype.AES128_SHA1, b'key1', b'key1') self.assertEqual(prf(k, b'\x01\x61'), kb) - def test_AES256_prf(self): - # AES256 prf + def test_AES256_SHA1_prf(self): + # AES256 HMAC-SHA1 prf kb = h('0D674DD0F9A6806525A4D92E828BD15A') - k = string_to_key(Enctype.AES256, b'key2', b'key2') + k = string_to_key(Enctype.AES256_SHA1, b'key2', b'key2') self.assertEqual(prf(k, b'\x02\x62'), kb) - def test_AES128_cf2(self): - # AES128 cf2 + def test_AES128_SHA1_cf2(self): + # AES128 HMAC-SHA1 cf2 kb = h('97DF97E4B798B29EB31ED7280287A92A') - k1 = string_to_key(Enctype.AES128, b'key1', b'key1') - k2 = string_to_key(Enctype.AES128, b'key2', b'key2') - k = cf2(Enctype.AES128, k1, k2, b'a', b'b') + k1 = string_to_key(Enctype.AES128_SHA1, b'key1', b'key1') + k2 = string_to_key(Enctype.AES128_SHA1, b'key2', b'key2') + k = cf2(Enctype.AES128_SHA1, k1, k2, b'a', b'b') self.assertEqual(k.contents, kb) - def test_AES256_cf2(self): - # AES256 cf2 + def test_AES256_SHA1_cf2(self): + # AES256 HMAC-SHA1 cf2 kb = h('4D6CA4E629785C1F01BAF55E2E548566B9617AE3A96868C337CB93B5E72B1C7B') - k1 = string_to_key(Enctype.AES256, b'key1', b'key1') - k2 = string_to_key(Enctype.AES256, b'key2', b'key2') - k = cf2(Enctype.AES256, k1, k2, b'a', b'b') + k1 = string_to_key(Enctype.AES256_SHA1, b'key1', b'key1') + k2 = string_to_key(Enctype.AES256_SHA1, b'key2', b'key2') + k = cf2(Enctype.AES256_SHA1, k1, k2, b'a', b'b') self.assertEqual(k.contents, kb) def test_DES3(self): diff --git a/tests/misc/test_krb5_rfc8009.py b/tests/misc/test_krb5_rfc8009.py new file mode 100644 index 000000000..49b76eed9 --- /dev/null +++ b/tests/misc/test_krb5_rfc8009.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python +# Impacket - Collection of Python classes for working with network protocols. +# +# Copyright (C) 2023 Red Hat, Inc. All rights reserved. +# +# This software is provided under a slightly modified version +# of the Apache Software License. See the accompanying LICENSE file +# for more information. +# +# +# This test file implements test vectors from the appendix A of RFC8009 +# https://datatracker.ietf.org/doc/html/rfc8009#appendix-A +# +from impacket.krb5.crypto import _AES128_SHA256_CTS, _AES256_SHA384_CTS, _SHA256AES128, _SHA384AES256 +from Cryptodome.Hash import HMAC +from struct import pack +import unittest + + +def encrypt_empty_plaintext(tcase, confounder, ke, ki, exp_ciphertext): + plaintext = b'' + key = tcase.etype.random_to_key(ke) + c = tcase.etype.basic_encrypt(key, confounder + plaintext, bytes(tcase.etype.blocksize)) + h = HMAC.new(ki, bytes(tcase.etype.blocksize) + c, tcase.etype.hashmod).digest()[:tcase.etype.macsize] + ciphertext = c + h + tcase.assertEqual(ciphertext, exp_ciphertext) + c = ciphertext[:-tcase.etype.macsize] + h = ciphertext[-tcase.etype.macsize:] + dec_plaintext = tcase.etype.basic_decrypt(key, c, bytes(tcase.etype.blocksize))[len(confounder):] + tcase.assertEqual(plaintext, dec_plaintext) + tcase.assertEqual(h, HMAC.new(ki, bytes(tcase.etype.blocksize) + c, tcase.etype.hashmod).digest()[:tcase.etype.macsize]) + + ciphertext = tcase.etype.encrypt(key, 2, plaintext, confounder) + dec_plaintext = tcase.etype.decrypt(key, 2, ciphertext) + tcase.assertEqual(plaintext, dec_plaintext) + +def encrypt_less_than_block_size(tcase, plaintext, confounder, ke, ki, exp_ciphertext): + key = tcase.etype.random_to_key(ke) + c = tcase.etype.basic_encrypt(key, confounder + plaintext, bytes(tcase.etype.blocksize)) + h = HMAC.new(ki, bytes(tcase.etype.blocksize) + c, tcase.etype.hashmod).digest()[:tcase.etype.macsize] + ciphertext = c + h + tcase.assertEqual(ciphertext, exp_ciphertext) + c = ciphertext[:-tcase.etype.macsize] + h = ciphertext[-tcase.etype.macsize:] + dec_plaintext = tcase.etype.basic_decrypt(key, c, bytes(tcase.etype.blocksize))[len(confounder):] + tcase.assertEqual(plaintext, dec_plaintext) + tcase.assertEqual(h, HMAC.new(ki, bytes(tcase.etype.blocksize) + c, tcase.etype.hashmod).digest()[:tcase.etype.macsize]) + + ciphertext = tcase.etype.encrypt(key, 2, plaintext, confounder) + dec_plaintext = tcase.etype.decrypt(key, 2, ciphertext) + tcase.assertEqual(plaintext, dec_plaintext) + +def encrypt_equals_block_size(tcase, plaintext, confounder, ke, ki, exp_ciphertext): + key = tcase.etype.random_to_key(ke) + c = tcase.etype.basic_encrypt(key, confounder + plaintext, bytes(tcase.etype.blocksize)) + h = HMAC.new(ki, bytes(tcase.etype.blocksize) + c, tcase.etype.hashmod).digest()[:tcase.etype.macsize] + ciphertext = c + h + tcase.assertEqual(ciphertext, exp_ciphertext) + c = ciphertext[:-tcase.etype.macsize] + h = ciphertext[-tcase.etype.macsize:] + dec_plaintext = tcase.etype.basic_decrypt(key, c, bytes(tcase.etype.blocksize))[len(confounder):] + tcase.assertEqual(plaintext, dec_plaintext) + tcase.assertEqual(h, HMAC.new(ki, bytes(tcase.etype.blocksize) + c, tcase.etype.hashmod).digest()[:tcase.etype.macsize]) + + ciphertext = tcase.etype.encrypt(key, 2, plaintext, confounder) + dec_plaintext = tcase.etype.decrypt(key, 2, ciphertext) + tcase.assertEqual(plaintext, dec_plaintext) + +def encrypt_greater_than_block_size(tcase, plaintext, confounder, ke, ki, exp_ciphertext): + key = tcase.etype.random_to_key(ke) + c = tcase.etype.basic_encrypt(key, confounder + plaintext, bytes(tcase.etype.blocksize)) + h = HMAC.new(ki, bytes(tcase.etype.blocksize) + c, tcase.etype.hashmod).digest()[:tcase.etype.macsize] + ciphertext = c + h + tcase.assertEqual(ciphertext, exp_ciphertext) + c = ciphertext[:-tcase.etype.macsize] + h = ciphertext[-tcase.etype.macsize:] + dec_plaintext = tcase.etype.basic_decrypt(key, c, bytes(tcase.etype.blocksize))[len(confounder):] + tcase.assertEqual(plaintext, dec_plaintext) + tcase.assertEqual(h, HMAC.new(ki, bytes(tcase.etype.blocksize) + c, tcase.etype.hashmod).digest()[:tcase.etype.macsize]) + + ciphertext = tcase.etype.encrypt(key, 2, plaintext, None) + dec_plaintext = tcase.etype.decrypt(key, 2, ciphertext) + tcase.assertEqual(plaintext, dec_plaintext) + +def prf(tcase, inpt, exp_output, key, message): + output = tcase.etype.prf(tcase.etype.random_to_key(key), inpt) + tcase.assertEqual(output, exp_output) + +def string_to_key(tcase, iter_count, pw, salt, exp_key): + key = tcase.etype.string_to_key(pw, salt, None) + tcase.assertEqual(key.contents, exp_key) + +def key_derivation(tcase, keyusage, base_key, exp_ke, exp_ki, exp_kc): + key = tcase.etype.random_to_key(base_key) + ke = tcase.etype.random_to_key(tcase.etype.kdf_hmac_sha2(key.contents, pack('>IB', keyusage, 0xAA), tcase.etype.keysize)) + ki = tcase.etype.random_to_key(tcase.etype.kdf_hmac_sha2(key.contents, pack('>IB', keyusage, 0x55), tcase.etype.macsize)) + kc = tcase.etype.derive(key, pack('>IB', keyusage, 0x99)) + tcase.assertEqual(exp_ke, ke.contents) + tcase.assertEqual(exp_ki, ki.contents) + tcase.assertEqual(exp_kc, kc.contents) + +def checksum(tcase, kc, plaintext, exp_checksum): + checksum = HMAC.new(kc, plaintext, tcase.digest.enc.hashmod).digest()[:tcase.digest.enc.macsize] + tcase.assertEqual(checksum, exp_checksum) + + +class Aes128HmacSha256Tests(unittest.TestCase): + etype = _AES128_SHA256_CTS + digest = _SHA256AES128 + + def test_encrypt_empty_plaintext(self): + confounder = bytes.fromhex('7E 58 95 EA F2 67 24 35 BA D8 17 F5 45 A3 71 48') + ke = bytes.fromhex('9B 19 7D D1 E8 C5 60 9D 6E 67 C3 E3 7C 62 C7 2E') + ki = bytes.fromhex('9F DA 0E 56 AB 2D 85 E1 56 9A 68 86 96 C2 6A 6C') + exp_ciphertext = bytes.fromhex('EF 85 FB 89 0B B8 47 2F 4D AB 20 39 4D CA 78 1D AD 87 7E DA 39 D5 0C 87 0C 0D 5A 0A 8E 48 C7 18') + encrypt_empty_plaintext(self, confounder, ke, ki, exp_ciphertext) + + def test_encrypt_less_than_block_size(self): + plaintext = bytes.fromhex('00 01 02 03 04 05') + confounder = bytes.fromhex('7B CA 28 5E 2F D4 13 0F B5 5B 1A 5C 83 BC 5B 24') + ke = bytes.fromhex('9B 19 7D D1 E8 C5 60 9D 6E 67 C3 E3 7C 62 C7 2E') + ki = bytes.fromhex('9F DA 0E 56 AB 2D 85 E1 56 9A 68 86 96 C2 6A 6C') + exp_ciphertext = bytes.fromhex('84 D7 F3 07 54 ED 98 7B AB 0B F3 50 6B EB 09 CF B5 54 02 CE F7 E6 87 7C E9 9E 24 7E 52 D1 6E D4 42 1D FD F8 97 6C') + encrypt_less_than_block_size(self, plaintext, confounder, ke, ki, exp_ciphertext) + + def test_encrypt_equals_block_size(self): + plaintext = bytes.fromhex('00 01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F') + confounder = bytes.fromhex('56 AB 21 71 3F F6 2C 0A 14 57 20 0F 6F A9 94 8F') + ke = bytes.fromhex('9B 19 7D D1 E8 C5 60 9D 6E 67 C3 E3 7C 62 C7 2E') + ki = bytes.fromhex('9F DA 0E 56 AB 2D 85 E1 56 9A 68 86 96 C2 6A 6C') + exp_ciphertext = bytes.fromhex('35 17 D6 40 F5 0D DC 8A D3 62 87 22 B3 56 9D 2A E0 74 93 FA 82 63 25 40 80 EA 65 C1 00 8E 8F C2 95 FB 48 52 E7 D8 3E 1E 7C 48 C3 7E EB E6 B0 D3') + encrypt_equals_block_size(self, plaintext, confounder, ke, ki, exp_ciphertext) + + def test_encrypt_greater_than_block_size(self): + plaintext = bytes.fromhex('00 01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F 10 11 12 13 14') + confounder = bytes.fromhex('A7 A4 E2 9A 47 28 CE 10 66 4F B6 4E 49 AD 3F AC') + ke = bytes.fromhex('9B 19 7D D1 E8 C5 60 9D 6E 67 C3 E3 7C 62 C7 2E') + ki = bytes.fromhex('9F DA 0E 56 AB 2D 85 E1 56 9A 68 86 96 C2 6A 6C') + exp_ciphertext = bytes.fromhex('72 0F 73 B1 8D 98 59 CD 6C CB 43 46 11 5C D3 36 C7 0F 58 ED C0 C4 43 7C 55 73 54 4C 31 C8 13 BC E1 E6 D0 72 C1 86 B3 9A 41 3C 2F 92 CA 9B 83 34 A2 87 FF CB FC') + encrypt_greater_than_block_size(self, plaintext, confounder, ke, ki, exp_ciphertext) + + def test_prf(self): + inpt = b'test' + exp_output = bytes.fromhex('9D 18 86 16 F6 38 52 FE 86 91 5B B8 40 B4 A8 86 FF 3E 6B B0 F8 19 B4 9B 89 33 93 D3 93 85 42 95') + key = bytes.fromhex('37 05 D9 60 80 C1 77 28 A0 E8 00 EA B6 E0 D2 3C') + message = bytes.fromhex('00 00 00 01 70 72 66 00 74 65 73 74 00 00 01 00') + prf(self, inpt, exp_output, key, message) + + def test_string_to_key(self): + iter_count = 32768 + pw = b'password' + salt = bytes.fromhex('10 DF 9D D7 83 E5 BC 8A CE A1 73 0E 74 35 5F 61') + b'ATHENA.MIT.EDUraeburn' + exp_key = bytes.fromhex('08 9B CA 48 B1 05 EA 6E A7 7C A5 D2 F3 9D C5 E7') + string_to_key(self, iter_count, pw, salt, exp_key) + + def test_key_derivation(self): + keyusage = 2 + base_key = bytes.fromhex('37 05 D9 60 80 C1 77 28 A0 E8 00 EA B6 E0 D2 3C') + exp_ke = bytes.fromhex('9B 19 7D D1 E8 C5 60 9D 6E 67 C3 E3 7C 62 C7 2E') + exp_ki = bytes.fromhex('9F DA 0E 56 AB 2D 85 E1 56 9A 68 86 96 C2 6A 6C') + exp_kc = bytes.fromhex('B3 1A 01 8A 48 F5 47 76 F4 03 E9 A3 96 32 5D C3') + key_derivation(self, keyusage, base_key, exp_ke, exp_ki, exp_kc) + + def test_checksum(self): + kc = bytes.fromhex('B3 1A 01 8A 48 F5 47 76 F4 03 E9 A3 96 32 5D C3') + plaintext = bytes.fromhex('00 01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F 10 11 12 13 14') + exp_checksum = bytes.fromhex('D7 83 67 18 66 43 D6 7B 41 1C BA 91 39 FC 1D EE') + checksum(self, kc, plaintext, exp_checksum) + + +class Aes256HmacSha384Tests(unittest.TestCase): + etype = _AES256_SHA384_CTS + digest = _SHA384AES256 + + def test_encrypt_empty_plaintext(self): + confounder = bytes.fromhex('F7 64 E9 FA 15 C2 76 47 8B 2C 7D 0C 4E 5F 58 E4') + ke = bytes.fromhex('56 AB 22 BE E6 3D 82 D7 BC 52 27 F6 77 3F 8E A7 A5 EB 1C 82 51 60 C3 83 12 98 0C 44 2E 5C 7E 49') + ki = bytes.fromhex('69 B1 65 14 E3 CD 8E 56 B8 20 10 D5 C7 30 12 B6 22 C4 D0 0F FC 23 ED 1F') + exp_ciphertext = bytes.fromhex('41 F5 3F A5 BF E7 02 6D 91 FA F9 BE 95 91 95 A0 58 70 72 73 A9 6A 40 F0 A0 19 60 62 1A C6 12 74 8B 9B BF BE 7E B4 CE 3C') + encrypt_empty_plaintext(self, confounder, ke, ki, exp_ciphertext) + + def test_encrypt_less_than_block_size(self): + plaintext = bytes.fromhex('00 01 02 03 04 05') + confounder = bytes.fromhex('B8 0D 32 51 C1 F6 47 14 94 25 6F FE 71 2D 0B 9A') + ke = bytes.fromhex('56 AB 22 BE E6 3D 82 D7 BC 52 27 F6 77 3F 8E A7 A5 EB 1C 82 51 60 C3 83 12 98 0C 44 2E 5C 7E 49') + ki = bytes.fromhex('69 B1 65 14 E3 CD 8E 56 B8 20 10 D5 C7 30 12 B6 22 C4 D0 0F FC 23 ED 1F') + exp_ciphertext = bytes.fromhex('4E D7 B3 7C 2B CA C8 F7 4F 23 C1 CF 07 E6 2B C7 B7 5F B3 F6 37 B9 F5 59 C7 F6 64 F6 9E AB 7B 60 92 23 75 26 EA 0D 1F 61 CB 20 D6 9D 10 F2') + encrypt_less_than_block_size(self, plaintext, confounder, ke, ki, exp_ciphertext) + + def test_encrypt_equals_block_size(self): + plaintext = bytes.fromhex('00 01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F') + confounder = bytes.fromhex('53 BF 8A 0D 10 52 65 D4 E2 76 42 86 24 CE 5E 63') + ke = bytes.fromhex('56 AB 22 BE E6 3D 82 D7 BC 52 27 F6 77 3F 8E A7 A5 EB 1C 82 51 60 C3 83 12 98 0C 44 2E 5C 7E 49') + ki = bytes.fromhex('69 B1 65 14 E3 CD 8E 56 B8 20 10 D5 C7 30 12 B6 22 C4 D0 0F FC 23 ED 1F') + exp_ciphertext = bytes.fromhex('BC 47 FF EC 79 98 EB 91 E8 11 5C F8 D1 9D AC 4B BB E2 E1 63 E8 7D D3 7F 49 BE CA 92 02 77 64 F6 8C F5 1F 14 D7 98 C2 27 3F 35 DF 57 4D 1F 93 2E 40 C4 FF 25 5B 36 A2 66') + encrypt_equals_block_size(self, plaintext, confounder, ke, ki, exp_ciphertext) + + def test_encrypt_greater_than_block_size(self): + plaintext = bytes.fromhex('00 01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F 10 11 12 13 14') + confounder = bytes.fromhex('76 3E 65 36 7E 86 4F 02 F5 51 53 C7 E3 B5 8A F1') + ke = bytes.fromhex('56 AB 22 BE E6 3D 82 D7 BC 52 27 F6 77 3F 8E A7 A5 EB 1C 82 51 60 C3 83 12 98 0C 44 2E 5C 7E 49') + ki = bytes.fromhex('69 B1 65 14 E3 CD 8E 56 B8 20 10 D5 C7 30 12 B6 22 C4 D0 0F FC 23 ED 1F') + exp_ciphertext = bytes.fromhex('40 01 3E 2D F5 8E 87 51 95 7D 28 78 BC D2 D6 FE 10 1C CF D5 56 CB 1E AE 79 DB 3C 3E E8 64 29 F2 B2 A6 02 AC 86 FE F6 EC B6 47 D6 29 5F AE 07 7A 1F EB 51 75 08 D2 C1 6B 41 92 E0 1F 62') + encrypt_greater_than_block_size(self, plaintext, confounder, ke, ki, exp_ciphertext) + + def test_prf(self): + inpt = b'test' + exp_output = bytes.fromhex('98 01 F6 9A 36 8C 2B F6 75 E5 95 21 E1 77 D9 A0 7F 67 EF E1 CF DE 8D 3C 8D 6F 6A 02 56 E3 B1 7D B3 C1 B6 2A D1 B8 55 33 60 D1 73 67 EB 15 14 D2') + key = bytes.fromhex('6D 40 4D 37 FA F7 9F 9D F0 D3 35 68 D3 20 66 98 00 EB 48 36 47 2E A8 A0 26 D1 6B 71 82 46 0C 52') + message = bytes.fromhex('00 00 00 01 70 72 66 00 74 65 73 74 00 00 01 80') + prf(self, inpt, exp_output, key, message) + + def test_string_to_key(self): + iter_count = 32768 + pw = b'password' + salt = bytes.fromhex('10 DF 9D D7 83 E5 BC 8A CE A1 73 0E 74 35 5F 61') + b'ATHENA.MIT.EDUraeburn' + exp_key = bytes.fromhex('45 BD 80 6D BF 6A 83 3A 9C FF C1 C9 45 89 A2 22 36 7A 79 BC 21 C4 13 71 89 06 E9 F5 78 A7 84 67') + string_to_key(self, iter_count, pw, salt, exp_key) + + def test_key_derivation(self): + keyusage = 2 + base_key = bytes.fromhex('6D 40 4D 37 FA F7 9F 9D F0 D3 35 68 D3 20 66 98 00 EB 48 36 47 2E A8 A0 26 D1 6B 71 82 46 0C 52') + exp_ke = bytes.fromhex('56 AB 22 BE E6 3D 82 D7 BC 52 27 F6 77 3F 8E A7 A5 EB 1C 82 51 60 C3 83 12 98 0C 44 2E 5C 7E 49') + exp_ki = bytes.fromhex('69 B1 65 14 E3 CD 8E 56 B8 20 10 D5 C7 30 12 B6 22 C4 D0 0F FC 23 ED 1F') + exp_kc = bytes.fromhex('EF 57 18 BE 86 CC 84 96 3D 8B BB 50 31 E9 F5 C4 BA 41 F2 8F AF 69 E7 3D') + key_derivation(self, keyusage, base_key, exp_ke, exp_ki, exp_kc) + + def test_checksum(self): + kc = bytes.fromhex('EF 57 18 BE 86 CC 84 96 3D 8B BB 50 31 E9 F5 C4 BA 41 F2 8F AF 69 E7 3D') + plaintext = bytes.fromhex('00 01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F 10 11 12 13 14') + exp_checksum = bytes.fromhex('45 EE 79 15 67 EE FC A3 7F 4A C1 E0 22 2D E8 0D 43 C3 BF A0 66 99 67 2A') + checksum(self, kc, plaintext, exp_checksum) + + +if __name__ == '__main__': + unittest.main(verbosity=1)