From 2242486139dc09cc78d36553d593f84ba97d0ed9 Mon Sep 17 00:00:00 2001 From: mver-al <114071453+mver-al@users.noreply.github.com> Date: Wed, 8 Jan 2025 15:30:51 +0100 Subject: [PATCH 1/3] Fixes for using activate_session to change users. - The updated ServerNonce was not saved after activate_session, which means that subsequent activate_sessions would fail with BadIdentityTokenInvalid. - The _username and _password attributes of Client were never updated but checked in the code (_add_user_auth function). --- asyncua/client/client.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/asyncua/client/client.py b/asyncua/client/client.py index e95799e3f..758b0a10d 100644 --- a/asyncua/client/client.py +++ b/asyncua/client/client.py @@ -658,7 +658,9 @@ async def activate_session( self._add_certificate_auth(params, user_certificate, challenge) else: self._add_user_auth(params, username, password) - return await self.uaclient.activate_session(params) + res = await self.uaclient.activate_session(params) + self._server_nonce = res.ServerNonce + return res def _add_anonymous_auth(self, params): params.UserIdentityToken = ua.AnonymousIdentityToken() @@ -676,8 +678,11 @@ def _add_certificate_auth(self, params, certificate, challenge): params.UserTokenSignature.Signature = sig def _add_user_auth(self, params, username: str, password: str): + self.set_user(username) + self.set_password(password) + params.UserIdentityToken = ua.UserNameIdentityToken() - params.UserIdentityToken.UserName = username + params.UserIdentityToken.UserName = self._username policy = self.server_policy(ua.UserTokenType.UserName) if not policy.SecurityPolicyUri or policy.SecurityPolicyUri == security_policies.SecurityPolicyNone.URI: # see specs part 4, 7.36.3: if the token is NOT encrypted, @@ -686,10 +691,10 @@ def _add_user_auth(self, params, username: str, password: str): if self._password: if self.security_policy.Mode != ua.MessageSecurityMode.SignAndEncrypt: _logger.warning("Sending plain-text password") - params.UserIdentityToken.Password = password.encode("utf8") + params.UserIdentityToken.Password = self._password.encode("utf8") params.UserIdentityToken.EncryptionAlgorithm = None elif self._password: - data, uri = self._encrypt_password(password, policy.SecurityPolicyUri) + data, uri = self._encrypt_password(self._password, policy.SecurityPolicyUri) params.UserIdentityToken.Password = data params.UserIdentityToken.EncryptionAlgorithm = uri params.UserIdentityToken.PolicyId = policy.PolicyId From ac4eb7478109027af4460a8fdf85ab7a2d5fe3c4 Mon Sep 17 00:00:00 2001 From: mver-al <114071453+mver-al@users.noreply.github.com> Date: Wed, 8 Jan 2025 16:03:50 +0100 Subject: [PATCH 2/3] Allow setting password to None. --- asyncua/client/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/asyncua/client/client.py b/asyncua/client/client.py index 758b0a10d..755b94329 100644 --- a/asyncua/client/client.py +++ b/asyncua/client/client.py @@ -143,12 +143,12 @@ def set_user(self, username: str) -> None: """ self._username = username - def set_password(self, pwd: str) -> None: + def set_password(self, pwd: str | None) -> None: """ Set user password for the connection. initial password from the URL will be overwritten """ - if not isinstance(pwd, str): + if pwd is not None and not isinstance(pwd, str): raise TypeError(f"Password must be a string, got {pwd} of type {type(pwd)}") self._password = pwd From d0f4bbad88a2f8bbda6c0d8c029fbf667a61d46b Mon Sep 17 00:00:00 2001 From: mver-al <114071453+mver-al@users.noreply.github.com> Date: Wed, 8 Jan 2025 16:18:13 +0100 Subject: [PATCH 3/3] Don't set _password and _username in activate_session. --- asyncua/client/client.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/asyncua/client/client.py b/asyncua/client/client.py index 755b94329..b9e00ccc4 100644 --- a/asyncua/client/client.py +++ b/asyncua/client/client.py @@ -143,12 +143,12 @@ def set_user(self, username: str) -> None: """ self._username = username - def set_password(self, pwd: str | None) -> None: + def set_password(self, pwd: str) -> None: """ Set user password for the connection. initial password from the URL will be overwritten """ - if pwd is not None and not isinstance(pwd, str): + if not isinstance(pwd, str): raise TypeError(f"Password must be a string, got {pwd} of type {type(pwd)}") self._password = pwd @@ -678,23 +678,20 @@ def _add_certificate_auth(self, params, certificate, challenge): params.UserTokenSignature.Signature = sig def _add_user_auth(self, params, username: str, password: str): - self.set_user(username) - self.set_password(password) - params.UserIdentityToken = ua.UserNameIdentityToken() - params.UserIdentityToken.UserName = self._username + params.UserIdentityToken.UserName = username policy = self.server_policy(ua.UserTokenType.UserName) if not policy.SecurityPolicyUri or policy.SecurityPolicyUri == security_policies.SecurityPolicyNone.URI: # see specs part 4, 7.36.3: if the token is NOT encrypted, # then the password only contains UTF-8 encoded password # and EncryptionAlgorithm is null - if self._password: + if password: if self.security_policy.Mode != ua.MessageSecurityMode.SignAndEncrypt: _logger.warning("Sending plain-text password") - params.UserIdentityToken.Password = self._password.encode("utf8") + params.UserIdentityToken.Password = password.encode("utf8") params.UserIdentityToken.EncryptionAlgorithm = None - elif self._password: - data, uri = self._encrypt_password(self._password, policy.SecurityPolicyUri) + elif password: + data, uri = self._encrypt_password(password, policy.SecurityPolicyUri) params.UserIdentityToken.Password = data params.UserIdentityToken.EncryptionAlgorithm = uri params.UserIdentityToken.PolicyId = policy.PolicyId