Merge "Handle authentication failure caused by invalid trust"

This commit is contained in:
Zuul 2026-01-09 14:13:20 +00:00 committed by Gerrit Code Review
commit 8d97b51e1f
4 changed files with 78 additions and 6 deletions

View file

@ -14,6 +14,7 @@
import functools
from keystoneauth1 import access
from keystoneauth1 import exceptions as ks_exceptions
from keystoneauth1.identity import access as access_plugin
from keystoneauth1.identity import generic
from keystoneauth1 import loading as ks_loading
@ -261,7 +262,10 @@ class RequestContext(context.RequestContext):
class StoredContext(RequestContext):
def _load_keystone_data(self):
self._keystone_loaded = True
auth_ref = self.auth_plugin.get_access(self.keystone_session)
try:
auth_ref = self.auth_plugin.get_access(self.keystone_session)
except ks_exceptions.Unauthorized:
raise exception.AuthorizationFailure()
self.roles = auth_ref.role_names
self.user_domain_id = auth_ref.user_domain_id

View file

@ -567,7 +567,14 @@ class Stack(collections.abc.Mapping):
creds_obj = ucreds_object.UserCreds.get_by_id(
context, stack.user_creds_id)
creds = creds_obj.obj_to_primitive()["versioned_object.data"]
stored_context = common_context.StoredContext.from_dict(creds)
try:
stored_context = common_context.StoredContext.from_dict(creds)
except exception.AuthorizationFailure:
LOG.debug(
'Failed to load context from stored credential '
'due to authorization failure. Regenerating the trust.')
return True
if cfg.CONF.deferred_auth_method == 'trusts':
old_trustor_proj_id = stored_context.project_id
@ -1928,8 +1935,14 @@ class Stack(collections.abc.Mapping):
LOG.debug("Context user_id doesn't match "
"trustor, using stored context")
sc = self.stored_context()
sc.clients.client('keystone').delete_trust(
trust_id)
try:
sc.clients.client('keystone').delete_trust(
trust_id)
except exception.AuthorizationFailure:
LOG.warning(
"The stored context is no longer valid. "
"Skip deleting the trust %s.", trust_id)
else:
self.clients.client('keystone').delete_trust(
trust_id)

View file

@ -1631,7 +1631,7 @@ class StackTest(common.HeatTestCase):
saved_stack = stack.Stack.load(self.ctx, stack_id=stack_ownee.id)
self.assertEqual(self.stack.id, saved_stack.owner_id)
def _test_load_with_refresh_cred(self, refresh=True):
def _test_load_with_refresh_cred(self, refresh=True, auth_fail=False):
cfg.CONF.set_override('deferred_auth_method', 'trusts')
self.patchobject(self.ctx.auth_plugin, 'get_user_id',
return_value='old_trustor_user_id')
@ -1642,17 +1642,32 @@ class StackTest(common.HeatTestCase):
old_context.trust_id = 'atrust123'
old_context.trustor_user_id = (
'trustor_user_id' if refresh else 'old_trustor_user_id')
m_sc = self.patchobject(context, 'StoredContext')
m_sc.from_dict.return_value = old_context
if auth_fail:
m_sc.from_dict.side_effect = exception.AuthorizationFailure()
else:
m_sc.from_dict.return_value = old_context
self.stack = stack.Stack(self.ctx, 'test_regenerate_trust', self.tmpl)
self.stack.store()
load_stack = stack.Stack.load(self.ctx, stack_id=self.stack.id,
check_refresh_cred=True)
self.assertEqual(refresh, load_stack.refresh_cred)
if auth_fail:
self.ctx.auth_plugin.get_user_id.assert_not_called()
self.ctx.auth_plugin.get_project_id.assert_not_called()
else:
self.ctx.auth_plugin.get_user_id.assert_called_once()
self.ctx.auth_plugin.get_project_id.assert_called_once()
def test_load_with_refresh_cred(self):
self._test_load_with_refresh_cred()
def test_load_with_refresh_auth_failure(self):
self._test_load_with_refresh_cred(refresh=True, auth_fail=True)
def test_load_with_no_refresh_cred(self):
self._test_load_with_refresh_cred(refresh=False)

View file

@ -283,6 +283,46 @@ class StackTest(common.HeatTestCase):
self.assertEqual((stack.Stack.DELETE, stack.Stack.COMPLETE),
loaded_stack.state)
def test_delete_trust_not_trustor_auth_fail(self):
# Stack gets created with trustor_ctx, deleted with other_ctx
# then the trust delete should be with stored_ctx (and fails)
trustor_ctx = utils.dummy_context(user_id='thetrustor')
other_ctx = utils.dummy_context(user_id='nottrustor')
stored_ctx = utils.dummy_context(trust_id='thetrust')
mock_kc = self.patchobject(hkc, 'KeystoneClient')
self.stub_keystoneclient(user_id='thetrustor')
mock_sc = self.patchobject(stack.Stack, 'stored_context')
mock_sc.return_value = stored_ctx
self.stack = stack.Stack(trustor_ctx, 'delete_trust_nt', self.tmpl)
stack_id = self.stack.store()
db_s = stack_object.Stack.get_by_id(self.ctx, stack_id)
self.assertIsNotNone(db_s)
user_creds_id = db_s.user_creds_id
self.assertIsNotNone(user_creds_id)
user_creds = ucreds_object.UserCreds.get_by_id(
self.ctx, user_creds_id)
self.assertEqual('thetrustor', user_creds.get('trustor_user_id'))
fkc = mock.Mock()
fkc.client = mock.PropertyMock(
side_effect=exception.AuthorizationFailure())
mock_kc.return_value = fkc
loaded_stack = stack.Stack.load(other_ctx, self.stack.id)
loaded_stack.delete()
mock_sc.assert_called_with()
fkc.delete_trust.assert_not_called()
db_s = stack_object.Stack.get_by_id(other_ctx, stack_id)
self.assertIsNone(db_s)
self.assertEqual((stack.Stack.DELETE, stack.Stack.COMPLETE),
loaded_stack.state)
def test_delete_trust_backup(self):
class FakeKeystoneClientFail(fake_ks.FakeKeystoneClient):
def delete_trust(self, trust_id):