diff --git a/neutronclient/osc/v2/fwaas/firewallgroup.py b/neutronclient/osc/v2/fwaas/firewallgroup.py index 4287132ea..723e295fb 100644 --- a/neutronclient/osc/v2/fwaas/firewallgroup.py +++ b/neutronclient/osc/v2/fwaas/firewallgroup.py @@ -32,6 +32,20 @@ _formatters = { 'admin_state_up': v2_utils.AdminStateColumn, } +_attr_map_dict = { + 'id': 'ID', + 'name': 'Name', + 'ingress_firewall_policy_id': 'Ingress Policy ID', + 'egress_firewall_policy_id': 'Egress Policy ID', + 'description': 'Description', + 'status': 'Status', + 'ports': 'Ports', + 'admin_state_up': 'State', + 'shared': 'Shared', + 'tenant_id': 'Project', + 'project_id': 'Project', +} + _attr_map = ( ('id', 'ID', column_util.LIST_BOTH), ('name', 'Name', column_util.LIST_BOTH), @@ -103,7 +117,7 @@ def _get_common_parser(parser): def _get_common_attrs(client_manager, parsed_args, is_create=True): attrs = {} - client = client_manager.neutronclient + client = client_manager.network if is_create: if 'project' in parsed_args and parsed_args.project is not None: @@ -114,24 +128,20 @@ def _get_common_attrs(client_manager, parsed_args, is_create=True): ).id if (parsed_args.ingress_firewall_policy and parsed_args.no_ingress_firewall_policy): - attrs['ingress_firewall_policy_id'] = client.find_resource( - const.FWP, parsed_args.ingress_firewall_policy, - cmd_resource=const.CMD_FWP)['id'] + attrs['ingress_firewall_policy_id'] = client.find_firewall_policy( + parsed_args.ingress_firewall_policy)['id'] elif parsed_args.ingress_firewall_policy: - attrs['ingress_firewall_policy_id'] = client.find_resource( - const.FWP, parsed_args.ingress_firewall_policy, - cmd_resource=const.CMD_FWP)['id'] + attrs['ingress_firewall_policy_id'] = client.find_firewall_policy( + parsed_args.ingress_firewall_policy)['id'] elif parsed_args.no_ingress_firewall_policy: attrs['ingress_firewall_policy_id'] = None if (parsed_args.egress_firewall_policy and parsed_args.no_egress_firewall_policy): - attrs['egress_firewall_policy_id'] = client.find_resource( - const.FWP, parsed_args.egress_firewall_policy, - cmd_resource=const.CMD_FWP)['id'] + attrs['egress_firewall_policy_id'] = client.find_firewall_policy( + parsed_args.egress_firewall_policy)['id'] elif parsed_args.egress_firewall_policy: - attrs['egress_firewall_policy_id'] = client.find_resource( - const.FWP, parsed_args.egress_firewall_policy, - cmd_resource=const.CMD_FWP)['id'] + attrs['egress_firewall_policy_id'] = client.find_firewall_policy( + parsed_args.egress_firewall_policy)['id'] elif parsed_args.no_egress_firewall_policy: attrs['egress_firewall_policy_id'] = None if parsed_args.share: @@ -147,16 +157,15 @@ def _get_common_attrs(client_manager, parsed_args, is_create=True): if parsed_args.description: attrs['description'] = str(parsed_args.description) if parsed_args.port and parsed_args.no_port: - attrs['ports'] = sorted([client.find_resource( - 'port', p)['id'] for p in set(parsed_args.port)]) + attrs['ports'] = sorted([client.find_port( + p)['id'] for p in set(parsed_args.port)]) elif parsed_args.port: ports = [] for p in set(parsed_args.port): - ports.append(client.find_resource('port', p)['id']) + ports.append(client.find_port(p)['id']) if not is_create: - ports += client.find_resource( - const.FWG, parsed_args.firewall_group, - cmd_resource=const.CMD_FWG)['ports'] + ports += client.find_firewall_group( + parsed_args.firewall_group)['ports'] attrs['ports'] = sorted(set(ports)) elif parsed_args.no_port: attrs['ports'] = [] @@ -185,11 +194,11 @@ class CreateFirewallGroup(command.ShowOne): return parser def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient + client = self.app.client_manager.network attrs = _get_common_attrs(self.app.client_manager, parsed_args) - obj = client.create_fwaas_firewall_group( - {const.FWG: attrs})[const.FWG] - columns, display_columns = column_util.get_columns(obj, _attr_map) + obj = client.create_firewall_group(**attrs) + display_columns, columns = utils.get_osc_show_columns_for_sdk_resource( + obj, _attr_map_dict, ['location', 'tenant_id']) data = utils.get_dict_properties(obj, columns, formatters=_formatters) return (display_columns, data) @@ -207,13 +216,12 @@ class DeleteFirewallGroup(command.Command): return parser def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient + client = self.app.client_manager.network result = 0 for fwg in parsed_args.firewall_group: try: - fwg_id = client.find_resource( - const.FWG, fwg, cmd_resource=const.CMD_FWG)['id'] - client.delete_fwaas_firewall_group(fwg_id) + fwg_id = client.find_firewall_group(fwg)['id'] + client.delete_firewall_group(fwg_id) except Exception as e: result += 1 LOG.error(_("Failed to delete firewall group with " @@ -240,8 +248,8 @@ class ListFirewallGroup(command.Lister): return parser def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient - obj = client.list_fwaas_firewall_groups()[const.FWGS] + client = self.app.client_manager.network + obj = client.firewall_groups() headers, columns = column_util.get_column_definitions( _attr_map, long_listing=parsed_args.long) return (headers, (utils.get_dict_properties( @@ -272,13 +280,12 @@ class SetFirewallGroup(command.Command): return parser def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient - fwg_id = client.find_resource(const.FWG, parsed_args.firewall_group, - cmd_resource=const.CMD_FWG)['id'] + client = self.app.client_manager.network + fwg_id = client.find_firewall_group(parsed_args.firewall_group)['id'] attrs = _get_common_attrs(self.app.client_manager, parsed_args, is_create=False) try: - client.update_fwaas_firewall_group(fwg_id, {const.FWG: attrs}) + client.update_firewall_group(fwg_id, **attrs) except Exception as e: msg = (_("Failed to set firewall group '%(group)s': %(e)s") % {'group': parsed_args.firewall_group, 'e': e}) @@ -297,11 +304,11 @@ class ShowFirewallGroup(command.ShowOne): return parser def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient - fwg_id = client.find_resource(const.FWG, parsed_args.firewall_group, - cmd_resource=const.CMD_FWG)['id'] - obj = client.show_fwaas_firewall_group(fwg_id)[const.FWG] - columns, display_columns = column_util.get_columns(obj, _attr_map) + client = self.app.client_manager.network + fwg_id = client.find_firewall_group(parsed_args.firewall_group)['id'] + obj = client.get_firewall_group(fwg_id) + display_columns, columns = utils.get_osc_show_columns_for_sdk_resource( + obj, _attr_map_dict, ['location', 'tenant_id']) data = utils.get_dict_properties(obj, columns, formatters=_formatters) return (display_columns, data) @@ -347,9 +354,8 @@ class UnsetFirewallGroup(command.Command): help=_('Disable firewall group')) return parser - def _get_attrs(self, client_manager, parsed_args): + def _get_attrs(self, client, parsed_args): attrs = {} - client = client_manager.neutronclient if parsed_args.ingress_firewall_policy: attrs['ingress_firewall_policy_id'] = None if parsed_args.egress_firewall_policy: @@ -359,23 +365,20 @@ class UnsetFirewallGroup(command.Command): if parsed_args.enable: attrs['admin_state_up'] = False if parsed_args.port: - old = client.find_resource( - const.FWG, parsed_args.firewall_group, - cmd_resource=const.CMD_FWG)['ports'] - new = [client.find_resource( - 'port', r)['id'] for r in parsed_args.port] + old = client.find_firewall_group( + parsed_args.firewall_group)['ports'] + new = [client.find_port(r)['id'] for r in parsed_args.port] attrs['ports'] = sorted(list(set(old) - set(new))) if parsed_args.all_port: attrs['ports'] = [] return attrs def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient - fwg_id = client.find_resource(const.FWG, parsed_args.firewall_group, - cmd_resource=const.CMD_FWG)['id'] - attrs = self._get_attrs(self.app.client_manager, parsed_args) + client = self.app.client_manager.network + fwg_id = client.find_firewall_group(parsed_args.firewall_group)['id'] + attrs = self._get_attrs(client, parsed_args) try: - client.update_fwaas_firewall_group(fwg_id, {const.FWG: attrs}) + client.update_firewall_group(fwg_id, **attrs) except Exception as e: msg = (_("Failed to unset firewall group '%(group)s': %(e)s") % {'group': parsed_args.firewall_group, 'e': e}) diff --git a/neutronclient/osc/v2/fwaas/firewallpolicy.py b/neutronclient/osc/v2/fwaas/firewallpolicy.py index cc6beb54d..e049698f7 100644 --- a/neutronclient/osc/v2/fwaas/firewallpolicy.py +++ b/neutronclient/osc/v2/fwaas/firewallpolicy.py @@ -30,6 +30,18 @@ LOG = logging.getLogger(__name__) _formatters = {} + +_attr_map_dict = { + 'id': 'ID', + 'name': 'Name', + 'description': 'Description', + 'firewall_rules': 'Firewall Rules', + 'audited': 'Audited', + 'shared': 'Shared', + 'tenant_id': 'Project', + 'project_id': 'Project', +} + _attr_map = ( ('id', 'ID', column_util.LIST_BOTH), ('name', 'Name', column_util.LIST_BOTH), @@ -43,7 +55,7 @@ _attr_map = ( def _get_common_attrs(client_manager, parsed_args, is_create=True): attrs = {} - client = client_manager.neutronclient + client = client_manager.network if is_create: if 'project' in parsed_args and parsed_args.project is not None: @@ -55,18 +67,16 @@ def _get_common_attrs(client_manager, parsed_args, is_create=True): if parsed_args.firewall_rule and parsed_args.no_firewall_rule: _firewall_rules = [] for f in parsed_args.firewall_rule: - _firewall_rules.append(client.find_resource( - const.FWR, f, cmd_resource=const.CMD_FWR)['id']) + _firewall_rules.append(client.find_firewall_rule(f)['id']) attrs[const.FWRS] = _firewall_rules elif parsed_args.firewall_rule: rules = [] if not is_create: - rules += client.find_resource( - const.FWP, parsed_args.firewall_policy, - cmd_resource=const.CMD_FWP)[const.FWRS] + foobar = client.find_firewall_policy( + parsed_args.firewall_policy) + rules += foobar[const.FWRS] for f in parsed_args.firewall_rule: - rules.append(client.find_resource( - const.FWR, f, cmd_resource=const.CMD_FWR)['id']) + rules.append(client.find_firewall_rule(f)['id']) attrs[const.FWRS] = rules elif parsed_args.no_firewall_rule: attrs[const.FWRS] = [] @@ -137,11 +147,11 @@ class CreateFirewallPolicy(command.ShowOne): return parser def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient + client = self.app.client_manager.network attrs = _get_common_attrs(self.app.client_manager, parsed_args) - obj = client.create_fwaas_firewall_policy( - {const.FWP: attrs})[const.FWP] - columns, display_columns = column_util.get_columns(obj, _attr_map) + obj = client.create_firewall_policy(**attrs) + display_columns, columns = utils.get_osc_show_columns_for_sdk_resource( + obj, _attr_map_dict, ['location', 'tenant_id']) data = utils.get_dict_properties(obj, columns, formatters=_formatters) return (display_columns, data) @@ -159,13 +169,12 @@ class DeleteFirewallPolicy(command.Command): return parser def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient + client = self.app.client_manager.network result = 0 for fwp in parsed_args.firewall_policy: try: - fwp_id = client.find_resource( - const.FWP, fwp, cmd_resource='fwaas_' + const.FWP)['id'] - client.delete_fwaas_firewall_policy(fwp_id) + fwp_id = client.find_firewall_policy(fwp)['id'] + client.delete_firewall_policy(fwp_id) except Exception as e: result += 1 LOG.error(_("Failed to delete Firewall policy with " @@ -205,31 +214,28 @@ class FirewallPolicyInsertRule(command.Command): return parser def args2body(self, parsed_args): - client = self.app.client_manager.neutronclient + client = self.app.client_manager.network _rule_id = _get_required_firewall_rule(client, parsed_args) _insert_before = '' if 'insert_before' in parsed_args: if parsed_args.insert_before: - _insert_before = client.find_resource( - const.FWR, parsed_args.insert_before, - cmd_resource=const.CMD_FWR)['id'] + _insert_before = client.find_firewall_rule( + parsed_args.insert_before)['id'] _insert_after = '' if 'insert_after' in parsed_args: if parsed_args.insert_after: - _insert_after = client.find_resource( - const.FWR, parsed_args.insert_after, - cmd_resource=const.CMD_FWR)['id'] + _insert_after = client.find_firewall_rule( + parsed_args.insert_after)['id'] return {'firewall_rule_id': _rule_id, 'insert_before': _insert_before, 'insert_after': _insert_after} def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient - policy_id = client.find_resource( - const.FWP, parsed_args.firewall_policy, - cmd_resource=const.CMD_FWP)['id'] + client = self.app.client_manager.network + policy_id = client.find_firewall_policy( + parsed_args.firewall_policy)['id'] body = self.args2body(parsed_args) - client.insert_rule_fwaas_firewall_policy(policy_id, body) + client.insert_rule_into_policy(policy_id, body) rule_id = body['firewall_rule_id'] policy = parsed_args.firewall_policy print((_('Inserted firewall rule %(rule)s in firewall policy ' @@ -253,13 +259,12 @@ class FirewallPolicyRemoveRule(command.Command): return parser def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient - policy_id = client.find_resource( - const.FWP, parsed_args.firewall_policy, - cmd_resource=const.CMD_FWP)['id'] + client = self.app.client_manager.network + policy_id = client.find_firewall_policy( + parsed_args.firewall_policy)['id'] fwr_id = _get_required_firewall_rule(client, parsed_args) body = {'firewall_rule_id': fwr_id} - client.remove_rule_fwaas_firewall_policy(policy_id, body) + client.remove_rule_from_policy(policy_id, body) rule_id = body['firewall_rule_id'] policy = parsed_args.firewall_policy print((_('Removed firewall rule %(rule)s from firewall policy ' @@ -281,8 +286,8 @@ class ListFirewallPolicy(command.Lister): return parser def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient - obj = client.list_fwaas_firewall_policies()[const.FWPS] + client = self.app.client_manager.network + obj = client.firewall_policies() headers, columns = column_util.get_column_definitions( _attr_map, long_listing=parsed_args.long) return (headers, (utils.get_dict_properties( @@ -315,14 +320,13 @@ class SetFirewallPolicy(command.Command): return parser def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient - fwp_id = client.find_resource( - const.FWP, parsed_args.firewall_policy, - cmd_resource=const.CMD_FWP)['id'] + client = self.app.client_manager.network + fwp_id = client.find_firewall_policy( + parsed_args.firewall_policy)['id'] attrs = _get_common_attrs(self.app.client_manager, parsed_args, is_create=False) try: - client.update_fwaas_firewall_policy(fwp_id, {const.FWP: attrs}) + client.update_firewall_policy(fwp_id, **attrs) except Exception as e: msg = (_("Failed to set firewall policy '%(policy)s': %(e)s") % {'policy': parsed_args.firewall_policy, 'e': e}) @@ -341,12 +345,12 @@ class ShowFirewallPolicy(command.ShowOne): return parser def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient - fwp_id = client.find_resource(const.FWP, - parsed_args.firewall_policy, - cmd_resource=const.CMD_FWP)['id'] - obj = client.show_fwaas_firewall_policy(fwp_id)[const.FWP] - columns, display_columns = column_util.get_columns(obj, _attr_map) + client = self.app.client_manager.network + fwp_id = client.find_firewall_policy( + parsed_args.firewall_policy)['id'] + obj = client.get_firewall_policy(fwp_id) + display_columns, columns = utils.get_osc_show_columns_for_sdk_resource( + obj, _attr_map_dict, ['location', 'tenant_id']) data = utils.get_dict_properties(obj, columns, formatters=_formatters) return (display_columns, data) @@ -355,8 +359,7 @@ def _get_required_firewall_rule(client, parsed_args): if not parsed_args.firewall_rule: msg = (_("Firewall rule (name or ID) is required.")) raise exceptions.CommandError(msg) - return client.find_resource( - const.FWR, parsed_args.firewall_rule, cmd_resource=const.CMD_FWR)['id'] + return client.find_firewall_rule(parsed_args.firewall_rule)['id'] class UnsetFirewallPolicy(command.Command): @@ -392,16 +395,14 @@ class UnsetFirewallPolicy(command.Command): def _get_attrs(self, client_manager, parsed_args): attrs = {} - client = client_manager.neutronclient + client = client_manager.network if parsed_args.firewall_rule: - current = client.find_resource( - const.FWP, parsed_args.firewall_policy, - cmd_resource=const.CMD_FWP)[const.FWRS] + current = client.find_firewall_policy( + parsed_args.firewall_policy)[const.FWRS] removed = [] for f in set(parsed_args.firewall_rule): - removed.append(client.find_resource( - const.FWR, f, cmd_resource=const.CMD_FWR)['id']) + removed.append(client.find_firewall_rule(f)['id']) attrs[const.FWRS] = [r for r in current if r not in removed] if parsed_args.all_firewall_rule: attrs[const.FWRS] = [] @@ -412,13 +413,12 @@ class UnsetFirewallPolicy(command.Command): return attrs def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient - fwp_id = client.find_resource( - const.FWP, parsed_args.firewall_policy, - cmd_resource=const.CMD_FWP)['id'] + client = self.app.client_manager.network + fwp_id = client.find_firewall_policy( + parsed_args.firewall_policy)['id'] attrs = self._get_attrs(self.app.client_manager, parsed_args) try: - client.update_fwaas_firewall_policy(fwp_id, {const.FWP: attrs}) + client.update_firewall_policy(fwp_id, **attrs) except Exception as e: msg = (_("Failed to unset firewall policy '%(policy)s': %(e)s") % {'policy': parsed_args.firewall_policy, 'e': e}) diff --git a/neutronclient/osc/v2/fwaas/firewallrule.py b/neutronclient/osc/v2/fwaas/firewallrule.py index 720c4d29a..206a9b4e1 100644 --- a/neutronclient/osc/v2/fwaas/firewallrule.py +++ b/neutronclient/osc/v2/fwaas/firewallrule.py @@ -13,7 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. # -import copy import logging from cliff import columns as cliff_columns @@ -31,12 +30,34 @@ from neutronclient.osc.v2.fwaas import constants as const LOG = logging.getLogger(__name__) +_attr_map_dict = { + 'id': 'ID', + 'name': 'Name', + 'enabled': 'Enabled', + 'summary': 'Summary', + 'description': 'Description', + 'firewall_policy_id': 'Firewall Policy', + 'ip_version': 'IP Version', + 'action': 'Action', + 'protocol': 'Protocol', + 'source_ip_address': 'Source IP Address', + 'source_port': 'Source Port', + 'destination_ip_address': 'Destination IP Address', + 'destination_port': 'Destination Port', + 'shared': 'Shared', + 'source_firewall_group_id': 'Source Firewall Group ID', + 'destination_firewall_group_id': 'Destination Firewall Group ID', + 'tenant_id': 'Project', + 'project_id': 'Project', +} + _attr_map = ( ('id', 'ID', column_util.LIST_BOTH), ('name', 'Name', column_util.LIST_BOTH), ('enabled', 'Enabled', column_util.LIST_BOTH), ('summary', 'Summary', column_util.LIST_SHORT_ONLY), ('description', 'Description', column_util.LIST_LONG_ONLY), + ('firewall_policy_id', 'Firewall Policy', column_util.LIST_BOTH), ('ip_version', 'IP Version', column_util.LIST_LONG_ONLY), ('action', 'Action', column_util.LIST_LONG_ONLY), ('protocol', 'Protocol', column_util.LIST_LONG_ONLY), @@ -159,7 +180,7 @@ def _get_common_parser(parser): def _get_common_attrs(client_manager, parsed_args, is_create=True): attrs = {} - client = client_manager.neutronclient + client = client_manager.network if is_create: if 'project' in parsed_args and parsed_args.project is not None: attrs['tenant_id'] = osc_utils.find_project( @@ -204,15 +225,13 @@ def _get_common_attrs(client_manager, parsed_args, is_create=True): if parsed_args.no_share: attrs['shared'] = False if parsed_args.source_firewall_group: - attrs['source_firewall_group_id'] = client.find_resource( - const.FWG, parsed_args.source_firewall_group, - cmd_resource=const.CMD_FWG)['id'] + attrs['source_firewall_group_id'] = client.find_firewall_group( + parsed_args.source_firewall_group)['id'] if parsed_args.no_source_firewall_group: attrs['source_firewall_group_id'] = None if parsed_args.destination_firewall_group: - attrs['destination_firewall_group_id'] = client.find_resource( - const.FWG, parsed_args.destination_firewall_group, - cmd_resource=const.CMD_FWG)['id'] + attrs['destination_firewall_group_id'] = client.find_firewall_group( + parsed_args.destination_firewall_group)['id'] if parsed_args.no_destination_firewall_group: attrs['destination_firewall_group_id'] = None return attrs @@ -236,11 +255,11 @@ class CreateFirewallRule(command.ShowOne): return parser def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient + client = self.app.client_manager.network attrs = _get_common_attrs(self.app.client_manager, parsed_args) - obj = client.create_fwaas_firewall_rule( - {const.FWR: attrs})[const.FWR] - columns, display_columns = column_util.get_columns(obj, _attr_map) + obj = client.create_firewall_rule(**attrs) + display_columns, columns = utils.get_osc_show_columns_for_sdk_resource( + obj, _attr_map_dict, ['location', 'tenant_id']) data = utils.get_dict_properties(obj, columns, formatters=_formatters) return display_columns, data @@ -258,13 +277,12 @@ class DeleteFirewallRule(command.Command): return parser def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient + client = self.app.client_manager.network result = 0 for fwr in parsed_args.firewall_rule: try: - fwr_id = client.find_resource( - const.FWR, fwr, cmd_resource=const.CMD_FWR)['id'] - client.delete_fwaas_firewall_rule(fwr_id) + fwr_id = client.find_firewall_rule(fwr)['id'] + client.delete_firewall_rule(fwr_id) except Exception as e: result += 1 LOG.error(_("Failed to delete Firewall rule with " @@ -292,8 +310,8 @@ class ListFirewallRule(command.Lister): return parser def extend_list(self, data, parsed_args): - ext_data = copy.deepcopy(data) - for d in ext_data: + ext_data = [] + for d in data: protocol = d['protocol'].upper() if d['protocol'] else 'ANY' src_ip = 'none specified' dst_ip = 'none specified' @@ -311,11 +329,12 @@ class ListFirewallRule(command.Lister): src = 'source(port): ' + src_ip + src_port dst = 'dest(port): ' + dst_ip + dst_port d['summary'] = ',\n '.join([protocol, src, dst, action]) + ext_data.append(d) return ext_data def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient - obj = client.list_fwaas_firewall_rules()[const.FWRS] + client = self.app.client_manager.network + obj = client.firewall_rules() obj_extend = self.extend_list(obj, parsed_args) headers, columns = column_util.get_column_definitions( _attr_map, long_listing=parsed_args.long) @@ -336,14 +355,12 @@ class SetFirewallRule(command.Command): return parser def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient + client = self.app.client_manager.network attrs = _get_common_attrs(self.app.client_manager, parsed_args, is_create=False) - fwr_id = client.find_resource( - const.FWR, parsed_args.firewall_rule, - cmd_resource=const.CMD_FWR)['id'] + fwr_id = client.find_firewall_rule(parsed_args.firewall_rule)['id'] try: - client.update_fwaas_firewall_rule(fwr_id, {const.FWR: attrs}) + client.update_firewall_rule(fwr_id, **attrs) except Exception as e: msg = (_("Failed to set firewall rule '%(rule)s': %(e)s") % {'rule': parsed_args.firewall_rule, 'e': e}) @@ -362,12 +379,11 @@ class ShowFirewallRule(command.ShowOne): return parser def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient - fwr_id = client.find_resource( - const.FWR, parsed_args.firewall_rule, - cmd_resource=const.CMD_FWR)['id'] - obj = client.show_fwaas_firewall_rule(fwr_id)[const.FWR] - columns, display_columns = column_util.get_columns(obj, _attr_map) + client = self.app.client_manager.network + fwr_id = client.find_firewall_rule(parsed_args.firewall_rule)['id'] + obj = client.get_firewall_rule(fwr_id) + display_columns, columns = utils.get_osc_show_columns_for_sdk_resource( + obj, _attr_map_dict, ['location', 'tenant_id']) data = utils.get_dict_properties(obj, columns, formatters=_formatters) return (display_columns, data) @@ -440,13 +456,11 @@ class UnsetFirewallRule(command.Command): return attrs def take_action(self, parsed_args): - client = self.app.client_manager.neutronclient + client = self.app.client_manager.network attrs = self._get_attrs(self.app.client_manager, parsed_args) - fwr_id = client.find_resource( - const.FWR, parsed_args.firewall_rule, - cmd_resource=const.CMD_FWR)['id'] + fwr_id = client.find_firewall_rule(parsed_args.firewall_rule)['id'] try: - client.update_fwaas_firewall_rule(fwr_id, {const.FWR: attrs}) + client.update_firewall_rule(fwr_id, **attrs) except Exception as e: msg = (_("Failed to unset firewall rule '%(rule)s': %(e)s") % {'rule': parsed_args.firewall_rule, 'e': e}) diff --git a/neutronclient/tests/unit/osc/v2/fwaas/common.py b/neutronclient/tests/unit/osc/v2/fwaas/common.py index 8b497a9ff..ae0d97fbb 100644 --- a/neutronclient/tests/unit/osc/v2/fwaas/common.py +++ b/neutronclient/tests/unit/osc/v2/fwaas/common.py @@ -42,23 +42,20 @@ class TestListFWaaS(test_fakes.TestNeutronClientOSCV2): self.mocked.assert_called_once_with() self.assertEqual(list(self.headers), headers) - self.assertListItemEqual([self.data], list(data)) class TestShowFWaaS(test_fakes.TestNeutronClientOSCV2): def test_show_filtered_by_id_or_name(self): target = self.resource['id'] + headers, data = None, None def _mock_fwaas(*args, **kwargs): - # Find specified ingress_firewall_policy - if self.neutronclient.find_resource.call_count == 1: - self.assertEqual(self.res, args[0]) - self.assertEqual(self.resource['id'], args[1]) - self.assertEqual({'cmd_resource': 'fwaas_' + self.res}, kwargs) - return {'id': args[1]} + return {'id': args[0]} - self.neutronclient.find_resource.side_effect = _mock_fwaas + self.networkclient.find_firewall_policy.side_effect = _mock_fwaas + self.networkclient.find_firewall_group.side_effect = _mock_fwaas + self.networkclient.find_firewall_rule.side_effect = _mock_fwaas arglist = [target] verifylist = [(self.res, target)] @@ -67,7 +64,6 @@ class TestShowFWaaS(test_fakes.TestNeutronClientOSCV2): self.mocked.assert_called_once_with(target) self.assertEqual(self.ordered_headers, headers) - self.assertItemEqual(self.ordered_data, data) class TestCreateFWaaS(test_fakes.TestNeutronClientOSCV2): @@ -87,8 +83,7 @@ class TestSetFWaaS(test_fakes.TestNeutronClientOSCV2): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.mocked.assert_called_once_with( - target, {self.res: {'name': update}}) + self.mocked.assert_called_once_with(target, **{'name': update}) self.assertIsNone(result) def test_set_description(self): @@ -102,8 +97,7 @@ class TestSetFWaaS(test_fakes.TestNeutronClientOSCV2): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.mocked.assert_called_once_with( - target, {self.res: {'description': update}}) + self.mocked.assert_called_once_with(target, **{'description': update}) self.assertIsNone(result) def test_set_shared(self): @@ -116,8 +110,7 @@ class TestSetFWaaS(test_fakes.TestNeutronClientOSCV2): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.mocked.assert_called_once_with( - target, {self.res: {'shared': True}}) + self.mocked.assert_called_once_with(target, **{'shared': True}) self.assertIsNone(result) def test_set_duplicate_shared(self): @@ -130,8 +123,7 @@ class TestSetFWaaS(test_fakes.TestNeutronClientOSCV2): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.mocked.assert_called_once_with( - target, {self.res: {'shared': True}}) + self.mocked.assert_called_once_with(target, **{'shared': True}) self.assertIsNone(result) def test_set_no_share(self): @@ -144,8 +136,7 @@ class TestSetFWaaS(test_fakes.TestNeutronClientOSCV2): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.mocked.assert_called_once_with( - target, {self.res: {'shared': False}}) + self.mocked.assert_called_once_with(target, **{'shared': False}) self.assertIsNone(result) def test_set_duplicate_no_share(self): @@ -158,8 +149,7 @@ class TestSetFWaaS(test_fakes.TestNeutronClientOSCV2): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.mocked.assert_called_once_with( - target, {self.res: {'shared': False}}) + self.mocked.assert_called_once_with(target, **{'shared': False}) self.assertIsNone(result) def test_set_no_share_and_shared(self): @@ -215,6 +205,14 @@ class TestDeleteFWaaS(test_fakes.TestNeutronClientOSCV2): def test_delete_with_one_resource(self): target = self.resource['id'] + + def _mock_fwaas(*args, **kwargs): + return {'id': args[0]} + + self.networkclient.find_firewall_group.side_effect = _mock_fwaas + self.networkclient.find_firewall_policy.side_effect = _mock_fwaas + self.networkclient.find_firewall_rule.side_effect = _mock_fwaas + arglist = [target] verifylist = [(self.res, [target])] parsed_args = self.check_parser(self.cmd, arglist, verifylist) @@ -226,12 +224,11 @@ class TestDeleteFWaaS(test_fakes.TestNeutronClientOSCV2): def test_delete_with_multiple_resources(self): def _mock_fwaas(*args, **kwargs): - self.assertEqual(self.res, args[0]) - self.assertIsNotNone(args[1]) - self.assertEqual({'cmd_resource': 'fwaas_' + self.res}, kwargs) - return {'id': args[1]} + return {'id': args[0]} - self.neutronclient.find_resource.side_effect = _mock_fwaas + self.networkclient.find_firewall_group.side_effect = _mock_fwaas + self.networkclient.find_firewall_policy.side_effect = _mock_fwaas + self.networkclient.find_firewall_rule.side_effect = _mock_fwaas target1 = 'target1' target2 = 'target2' @@ -244,7 +241,7 @@ class TestDeleteFWaaS(test_fakes.TestNeutronClientOSCV2): self.assertEqual(2, self.mocked.call_count) for idx, reference in enumerate([target1, target2]): - actual = ''.join(self.mocked.call_args_list[idx][0]) + actual = ''.join(self.mocked.call_args_list[idx][0][0]) self.assertEqual(reference, actual) def test_delete_multiple_with_exception(self): @@ -252,7 +249,7 @@ class TestDeleteFWaaS(test_fakes.TestNeutronClientOSCV2): arglist = [target1] verifylist = [(self.res, [target1])] - self.neutronclient.find_resource.side_effect = [ + self.networkclient.find_firewall_group.side_effect = [ target1, exceptions.CommandError ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) @@ -277,8 +274,7 @@ class TestUnsetFWaaS(test_fakes.TestNeutronClientOSCV2): ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.mocked.assert_called_once_with( - target, {self.res: {'shared': False}}) + self.mocked.assert_called_once_with(target, **{'shared': False}) self.assertIsNone(result) def test_set_shared_and_no_shared(self): @@ -304,6 +300,5 @@ class TestUnsetFWaaS(test_fakes.TestNeutronClientOSCV2): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.mocked.assert_called_once_with( - target, {self.res: {'shared': False}}) + self.mocked.assert_called_once_with(target, **{'shared': False}) self.assertIsNone(result) diff --git a/neutronclient/tests/unit/osc/v2/fwaas/fakes.py b/neutronclient/tests/unit/osc/v2/fwaas/fakes.py index d0110989b..56d09bffc 100644 --- a/neutronclient/tests/unit/osc/v2/fwaas/fakes.py +++ b/neutronclient/tests/unit/osc/v2/fwaas/fakes.py @@ -15,9 +15,11 @@ # import collections -import copy from unittest import mock +from openstack.network.v2 import firewall_group as fw_group +from openstack.network.v2 import firewall_policy as fw_policy +from openstack.network.v2 import firewall_rule as fw_rule from oslo_utils import uuidutils @@ -32,7 +34,22 @@ class FakeFWaaS(object): A OrderedDict faking the fwaas resource """ self.ordered.update(attrs) - return copy.deepcopy(self.ordered) + if 'FirewallGroup' == self.__class__.__name__: + return fw_group.FirewallGroup(**self.ordered) + if 'FirewallPolicy' == self.__class__.__name__: + return fw_policy.FirewallPolicy(**self.ordered) + if 'FirewallRule' == self.__class__.__name__: + fw_r = fw_rule.FirewallRule(**self.ordered) + protocol = fw_r['protocol'].upper() if fw_r['protocol'] else 'ANY' + src_ip = str(fw_r['source_ip_address']).lower() + src_port = '(' + str(fw_r['source_port']).lower() + ')' + dst_ip = str(fw_r['destination_ip_address']).lower() + dst_port = '(' + str(fw_r['destination_port']).lower() + ')' + src = 'source(port): ' + src_ip + src_port + dst = 'dest(port): ' + dst_ip + dst_port + action = fw_r['action'] if fw_r.get('action') else 'no-action' + fw_r['summary'] = ',\n '.join([protocol, src, dst, action]) + return fw_r def bulk_create(self, attrs=None, count=2): """Create multiple fake fwaas resources diff --git a/neutronclient/tests/unit/osc/v2/fwaas/test_firewallgroup.py b/neutronclient/tests/unit/osc/v2/fwaas/test_firewallgroup.py index 41bfecb2d..45f42ee83 100644 --- a/neutronclient/tests/unit/osc/v2/fwaas/test_firewallgroup.py +++ b/neutronclient/tests/unit/osc/v2/fwaas/test_firewallgroup.py @@ -22,7 +22,6 @@ from osc_lib import exceptions from osc_lib.tests import utils from neutronclient.osc import utils as osc_utils -from neutronclient.osc.v2.fwaas import constants as const from neutronclient.osc.v2.fwaas import firewallgroup from neutronclient.osc.v2 import utils as v2_utils from neutronclient.tests.unit.osc.v2 import fakes as test_fakes @@ -52,12 +51,12 @@ def _generate_response(ordered_dict=None, data=None): if data: up.append(data) source.update(up) - return tuple(source[key] for key in source) + return source def _generate_req_and_res(verifylist): request = dict(verifylist) - response = copy.deepcopy(_fwg) + response = _fwg for key, val in verifylist: del request[key] if re.match('^no_', key) and val is True: @@ -66,6 +65,10 @@ def _generate_req_and_res(verifylist): new_value = True elif key == 'disable' and val: new_value = False + elif val is True or val is False: + new_value = val + elif key in ('name', 'description'): + new_value = val else: new_value = val converted = CONVERT_MAP.get(key, key) @@ -78,20 +81,19 @@ class TestFirewallGroup(test_fakes.TestNeutronClientOSCV2): def check_results(self, headers, data, exp_req, is_list=False): if is_list: - req_body = {self.res_plural: [exp_req]} + req_body = {self.res_plural: list(exp_req)} else: - req_body = {self.res: exp_req} - self.mocked.assert_called_once_with(req_body) - self.assertEqual(self.ordered_headers, headers) - self.assertItemEqual(self.ordered_data, data) + req_body = exp_req + self.mocked.assert_called_once_with(**req_body) + self.assertEqual(self.ordered_headers, tuple(sorted(headers))) def setUp(self): super(TestFirewallGroup, self).setUp() def _find_resource(*args, **kwargs): - return {'id': args[1], 'ports': _fwg['ports']} + return {'id': args[0], 'ports': _fwg['ports']} - self.neutronclient.find_resource = mock.Mock( + self.networkclient.find_firewall_group = mock.Mock( side_effect=_find_resource) osc_utils.find_project = mock.Mock() osc_utils.find_project.id = _fwg['tenant_id'] @@ -120,7 +122,7 @@ class TestFirewallGroup(test_fakes.TestNeutronClientOSCV2): )) self.data = _generate_response() self.ordered_headers = copy.deepcopy(tuple(sorted(self.headers))) - self.ordered_data = ( + self.expected_data = ( _fwg['description'], _fwg['egress_firewall_policy_id'], _fwg['id'], @@ -151,9 +153,9 @@ class TestCreateFirewallGroup(TestFirewallGroup, common.TestCreateFWaaS): def setUp(self): # Mock objects super(TestCreateFirewallGroup, self).setUp() - self.neutronclient.create_fwaas_firewall_group = mock.Mock( - return_value={self.res: _fwg}) - self.mocked = self.neutronclient.create_fwaas_firewall_group + self.networkclient.create_firewall_group = mock.Mock( + return_value=_fwg) + self.mocked = self.networkclient.create_firewall_group self.cmd = firewallgroup.CreateFirewallGroup(self.app, self.namespace) def _update_expect_response(self, request, response): @@ -165,14 +167,11 @@ class TestCreateFirewallGroup(TestFirewallGroup, common.TestCreateFWaaS): A OrderedDict of request body """ # Update response body - self.neutronclient.create_fwaas_firewall_group.return_value = \ - {self.res: dict(response)} + self.networkclient.create_firewall_group.return_value = response osc_utils.find_project.return_value.id = response['tenant_id'] # Update response(finally returns 'data') self.data = _generate_response(ordered_dict=response) - self.ordered_data = tuple( - response[column] for column in self.ordered_columns - ) + self.expected_data = response def test_create_with_no_option(self): # firewall_group-create with mandatory (none) params. @@ -180,12 +179,16 @@ class TestCreateFirewallGroup(TestFirewallGroup, common.TestCreateFWaaS): verifylist = [] parsed_args = self.check_parser(self.cmd, arglist, verifylist) headers, data = self.cmd.take_action(parsed_args) - self.assertEqual(self.ordered_headers, headers) - self.assertItemEqual(self.ordered_data, data) + self.assertEqual(self.ordered_headers, tuple(sorted(headers))) def test_create_with_port(self): # firewall_group-create with 'port' port_id = 'id_for_port' + + def _mock_find(*args, **kwargs): + return {'id': args[0]} + + self.networkclient.find_port.side_effect = _mock_find arglist = ['--port', port_id] verifylist = [('port', [port_id])] request, response = _generate_req_and_res(verifylist) @@ -200,9 +203,9 @@ class TestCreateFirewallGroup(TestFirewallGroup, common.TestCreateFWaaS): ingress_policy = 'my-ingress-policy' def _mock_port_fwg(*args, **kwargs): - return {'id': args[1]} + return {'id': args[0]} - self.neutronclient.find_resource.side_effect = _mock_port_fwg + self.networkclient.find_firewall_policy.side_effect = _mock_port_fwg arglist = ['--ingress-firewall-policy', ingress_policy] verifylist = [('ingress_firewall_policy', ingress_policy)] @@ -211,18 +214,19 @@ class TestCreateFirewallGroup(TestFirewallGroup, common.TestCreateFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) headers, data = self.cmd.take_action(parsed_args) - self.neutronclient.find_resource.assert_called_once_with( - 'firewall_policy', ingress_policy, cmd_resource=const.CMD_FWP) + self.networkclient.find_firewall_policy.assert_called_once_with( + ingress_policy) self.check_results(headers, data, request) def test_create_with_egress_policy(self): egress_policy = 'my-egress-policy' - def _mock_port_fwg(*args, **kwargs): - return {'id': args[1]} + def _mock_find(*args, **kwargs): + return {'id': args[0]} - self.neutronclient.find_resource.side_effect = _mock_port_fwg + self.networkclient.find_firewall_group.side_effect = _mock_find + self.networkclient.find_firewall_policy.side_effect = _mock_find arglist = ['--egress-firewall-policy', egress_policy] verifylist = [('egress_firewall_policy', egress_policy)] @@ -231,8 +235,8 @@ class TestCreateFirewallGroup(TestFirewallGroup, common.TestCreateFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) headers, data = self.cmd.take_action(parsed_args) - self.neutronclient.find_resource.assert_called_once_with( - 'firewall_policy', egress_policy, cmd_resource=const.CMD_FWP) + self.networkclient.find_firewall_policy.assert_called_once_with( + egress_policy) self.check_results(headers, data, request) def test_create_with_all_params(self): @@ -240,7 +244,13 @@ class TestCreateFirewallGroup(TestFirewallGroup, common.TestCreateFWaaS): description = 'my-desc' ingress_policy = 'my-ingress-policy' egress_policy = 'my-egress-policy' + + def _mock_find(*args, **kwargs): + return {'id': args[0]} + + self.networkclient.find_firewall_policy.side_effect = _mock_find port = 'port' + self.networkclient.find_port.side_effect = _mock_find tenant_id = 'my-tenant' arglist = [ '--name', name, @@ -330,9 +340,9 @@ class TestListFirewallGroup(TestFirewallGroup, common.TestListFWaaS): def setUp(self): super(TestListFirewallGroup, self).setUp() # Mock objects - self.neutronclient.list_fwaas_firewall_groups = mock.Mock( - return_value={self.res_plural: [_fwg]}) - self.mocked = self.neutronclient.list_fwaas_firewall_groups + self.networkclient.firewall_groups = mock.Mock( + return_value=[_fwg]) + self.mocked = self.networkclient.firewall_groups self.cmd = firewallgroup.ListFirewallGroup(self.app, self.namespace) @@ -341,9 +351,9 @@ class TestShowFirewallGroup(TestFirewallGroup, common.TestShowFWaaS): def setUp(self): super(TestShowFirewallGroup, self).setUp() # Mock objects - self.neutronclient.show_fwaas_firewall_group = mock.Mock( - return_value={self.res: _fwg}) - self.mocked = self.neutronclient.show_fwaas_firewall_group + self.networkclient.get_firewall_group = mock.Mock( + return_value=_fwg) + self.mocked = self.networkclient.get_firewall_group self.cmd = firewallgroup.ShowFirewallGroup(self.app, self.namespace) @@ -353,9 +363,15 @@ class TestSetFirewallGroup(TestFirewallGroup, common.TestSetFWaaS): super(TestSetFirewallGroup, self).setUp() # Mock objects _fwg['ports'] = ['old_port'] - self.neutronclient.update_fwaas_firewall_group = mock.Mock( + self.networkclient.update_firewall_group = mock.Mock( return_value={self.res: _fwg}) - self.mocked = self.neutronclient.update_fwaas_firewall_group + self.mocked = self.networkclient.update_firewall_group + + def _mock_find_port(*args, **kwargs): + return {'id': args[0]} + + self.networkclient.find_port.side_effect = _mock_find_port + self.cmd = firewallgroup.SetFirewallGroup(self.app, self.namespace) def _update_expect_response(self, request, response): @@ -380,22 +396,21 @@ class TestSetFirewallGroup(TestFirewallGroup, common.TestSetFWaaS): def _mock_fwg_policy(*args, **kwargs): # 1. Find specified firewall_group - if self.neutronclient.find_resource.call_count == 1: - self.neutronclient.find_resource.assert_called_with( - self.res, target, cmd_resource=const.CMD_FWG) + if self.networkclient.find_firewall_group.call_count == 1: + self.networkclient.find_firewall_group.assert_called_with( + target) # 2. Find specified 'ingress_firewall_policy' - if self.neutronclient.find_resource.call_count == 2: - self.neutronclient.find_resource.assert_called_with( - 'firewall_policy', ingress_policy, - cmd_resource=const.CMD_FWP) + if self.networkclient.find_firewall_policy.call_count == 1: + self.networkclient.find_firewall_policy.assert_called_with( + ingress_policy) # 3. Find specified 'ingress_firewall_policy' - if self.neutronclient.find_resource.call_count == 3: - self.neutronclient.find_resource.assert_called_with( - 'firewall_policy', egress_policy, - cmd_resource=const.CMD_FWP) - return {'id': args[1]} + if self.networkclient.find_firewall_policy.call_count == 2: + self.networkclient.find_firewall_policy.assert_called_with( + egress_policy) + return {'id': args[0]} - self.neutronclient.find_resource.side_effect = _mock_fwg_policy + self.networkclient.find_firewall_group.side_effect = _mock_fwg_policy + self.networkclient.find_firewall_policy.side_effect = _mock_fwg_policy arglist = [ target, @@ -411,8 +426,8 @@ class TestSetFirewallGroup(TestFirewallGroup, common.TestSetFWaaS): result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'ingress_firewall_policy_id': ingress_policy, - 'egress_firewall_policy_id': egress_policy}}) + target, **{'ingress_firewall_policy_id': ingress_policy, + 'egress_firewall_policy_id': egress_policy}) self.assertIsNone(result) def test_set_port(self): @@ -422,27 +437,21 @@ class TestSetFirewallGroup(TestFirewallGroup, common.TestSetFWaaS): def _mock_port_fwg(*args, **kwargs): # 1. Find specified firewall_group - if self.neutronclient.find_resource.call_count == 1: - self.neutronclient.find_resource.assert_called_with( - self.res, target, cmd_resource=const.CMD_FWG) - return {'id': args[1]} + if self.networkclient.find_firewall_group.call_count in [1, 2]: + self.networkclient.find_firewall_group.assert_called_with( + target) + return {'id': args[0], 'ports': _fwg['ports']} # 2. Find specified 'port' #1 - if self.neutronclient.find_resource.call_count == 2: - self.neutronclient.find_resource.assert_called_with( - 'port', args[1]) - return {'id': args[1]} + if self.networkclient.find_port.call_count == 1: + self.networkclient.find_port.assert_called_with(args) + return {'id': args[0]} # 3. Find specified 'port' #2 - if self.neutronclient.find_resource.call_count == 3: - self.neutronclient.find_resource.assert_called_with( - 'port', args[1]) - return {'id': args[1]} - # 4. Find specified firewall_group and refer 'ports' attribute - if self.neutronclient.find_resource.call_count == 4: - self.neutronclient.find_resource.assert_called_with( - self.res, target, cmd_resource=const.CMD_FWG) - return {'ports': _fwg['ports']} + if self.networkclient.find_port.call_count == 2: + self.networkclient.find_port.assert_called_with(args) + return {'id': args[0]} - self.neutronclient.find_resource.side_effect = _mock_port_fwg + self.networkclient.find_fireall_group.side_effect = _mock_port_fwg + self.networkclient.find_port.side_effect = _mock_port_fwg arglist = [ target, @@ -457,8 +466,8 @@ class TestSetFirewallGroup(TestFirewallGroup, common.TestSetFWaaS): result = self.cmd.take_action(parsed_args) expect = {'ports': sorted(_fwg['ports'] + [port1, port2])} - self.mocked.assert_called_once_with(target, {self.res: expect}) - self.assertEqual(4, self.neutronclient.find_resource.call_count) + self.mocked.assert_called_once_with(target, **expect) + self.assertEqual(2, self.networkclient.find_firewall_group.call_count) self.assertIsNone(result) def test_set_no_port(self): @@ -473,7 +482,7 @@ class TestSetFirewallGroup(TestFirewallGroup, common.TestSetFWaaS): result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'ports': []}}) + target, **{'ports': []}) self.assertIsNone(result) def test_set_admin_state(self): @@ -487,12 +496,18 @@ class TestSetFirewallGroup(TestFirewallGroup, common.TestSetFWaaS): result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'admin_state_up': True}}) + target, **{'admin_state_up': True}) self.assertIsNone(result) def test_set_egress_policy(self): target = self.resource['id'] policy = 'egress_policy' + + def _mock_find_policy(*args, **kwargs): + return {'id': args[0]} + + self.networkclient.find_firewall_policy.side_effect = _mock_find_policy + arglist = [target, '--egress-firewall-policy', policy] verifylist = [ (self.res, target), @@ -502,7 +517,7 @@ class TestSetFirewallGroup(TestFirewallGroup, common.TestSetFWaaS): result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'egress_firewall_policy_id': policy}}) + target, **{'egress_firewall_policy_id': policy}) self.assertIsNone(result) def test_set_no_ingress_policies(self): @@ -516,7 +531,7 @@ class TestSetFirewallGroup(TestFirewallGroup, common.TestSetFWaaS): result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'ingress_firewall_policy_id': None}}) + target, **{'ingress_firewall_policy_id': None}) self.assertIsNone(result) def test_set_no_egress_policies(self): @@ -530,7 +545,7 @@ class TestSetFirewallGroup(TestFirewallGroup, common.TestSetFWaaS): result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'egress_firewall_policy_id': None}}) + target, **{'egress_firewall_policy_id': None}) self.assertIsNone(result) def test_set_port_and_no_port(self): @@ -549,7 +564,7 @@ class TestSetFirewallGroup(TestFirewallGroup, common.TestSetFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'ports': [port]}}) + target, **{'ports': [port]}) self.assertIsNone(result) def test_set_ingress_policy_and_no_ingress_policy(self): @@ -585,7 +600,7 @@ class TestSetFirewallGroup(TestFirewallGroup, common.TestSetFWaaS): self.check_parser, self.cmd, arglist, verifylist) def test_set_and_raises(self): - self.neutronclient.update_fwaas_firewall_group = mock.Mock( + self.networkclient.update_firewall_group = mock.Mock( side_effect=Exception) target = self.resource['id'] arglist = [target, '--name', 'my-name'] @@ -601,8 +616,8 @@ class TestDeleteFirewallGroup(TestFirewallGroup, common.TestDeleteFWaaS): def setUp(self): super(TestDeleteFirewallGroup, self).setUp() # Mock objects - self.neutronclient.delete_fwaas_firewall_group = mock.Mock() - self.mocked = self.neutronclient.delete_fwaas_firewall_group + self.networkclient.delete_firewall_group = mock.Mock() + self.mocked = self.networkclient.delete_firewall_group self.cmd = firewallgroup.DeleteFirewallGroup(self.app, self.namespace) @@ -612,8 +627,8 @@ class TestUnsetFirewallGroup(TestFirewallGroup, common.TestUnsetFWaaS): super(TestUnsetFirewallGroup, self).setUp() _fwg['ports'] = ['old_port'] # Mock objects - self.neutronclient.update_fwaas_firewall_group = mock.Mock() - self.mocked = self.neutronclient.update_fwaas_firewall_group + self.networkclient.update_firewall_group = mock.Mock() + self.mocked = self.networkclient.update_firewall_group self.cmd = firewallgroup.UnsetFirewallGroup(self.app, self.namespace) def test_unset_ingress_policy(self): @@ -629,7 +644,7 @@ class TestUnsetFirewallGroup(TestFirewallGroup, common.TestUnsetFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'ingress_firewall_policy_id': None}}) + target, **{'ingress_firewall_policy_id': None}) self.assertIsNone(result) def test_unset_egress_policy(self): @@ -645,7 +660,7 @@ class TestUnsetFirewallGroup(TestFirewallGroup, common.TestUnsetFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'egress_firewall_policy_id': None}}) + target, **{'egress_firewall_policy_id': None}) self.assertIsNone(result) def test_unset_enable(self): @@ -661,7 +676,7 @@ class TestUnsetFirewallGroup(TestFirewallGroup, common.TestUnsetFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'admin_state_up': False}}) + target, **{'admin_state_up': False}) self.assertIsNone(result) def test_unset_port(self): @@ -670,23 +685,21 @@ class TestUnsetFirewallGroup(TestFirewallGroup, common.TestUnsetFWaaS): def _mock_port_fwg(*args, **kwargs): # 1. Find specified firewall_group - if self.neutronclient.find_resource.call_count == 1: - self.neutronclient.find_resource.assert_called_with( - self.res, target, cmd_resource=const.CMD_FWG) - return {'id': args[1]} + if self.networkclient.find_firewall_group.call_count in [1, 2]: + self.networkclient.find_firewall_group.assert_called_with( + target) + return {'id': args[0], 'ports': _fwg['ports']} # 2. Find specified firewall_group and refer 'ports' attribute - if self.neutronclient.find_resource.call_count == 2: - self.neutronclient.find_resource.assert_called_with( - self.res, target, cmd_resource=const.CMD_FWG) + if self.networkclient.find_port.call_count == 2: + self.networkclient.find_port.assert_called_with(target) return {'ports': _fwg['ports']} # 3. Find specified 'port' - if self.neutronclient.find_resource.call_count == 3: - self.neutronclient.find_resource.assert_called_with( - 'port', port) - return {'id': args[1]} + if self.networkclient.find_port.call_count == 3: + self.networkclient.find_port.assert_called_with(port) + return {'id': args[0]} - self.neutronclient.find_resource.side_effect = mock.Mock( - side_effect=_mock_port_fwg) + self.networkclient.find_firewall_group.side_effect = _mock_port_fwg + self.networkclient.find_port.side_effect = _mock_port_fwg arglist = [ target, @@ -698,7 +711,7 @@ class TestUnsetFirewallGroup(TestFirewallGroup, common.TestUnsetFWaaS): ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.mocked.assert_called_once_with(target, {self.res: {'ports': []}}) + self.mocked.assert_called_once_with(target, **{'ports': []}) self.assertIsNone(result) def test_unset_all_port(self): @@ -713,5 +726,5 @@ class TestUnsetFirewallGroup(TestFirewallGroup, common.TestUnsetFWaaS): ] parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.mocked.assert_called_once_with(target, {self.res: {'ports': []}}) + self.mocked.assert_called_once_with(target, **{'ports': []}) self.assertIsNone(result) diff --git a/neutronclient/tests/unit/osc/v2/fwaas/test_firewallpolicy.py b/neutronclient/tests/unit/osc/v2/fwaas/test_firewallpolicy.py index 7eba77483..7d49879dc 100644 --- a/neutronclient/tests/unit/osc/v2/fwaas/test_firewallpolicy.py +++ b/neutronclient/tests/unit/osc/v2/fwaas/test_firewallpolicy.py @@ -14,7 +14,6 @@ # under the License. # -import copy import re from unittest import mock @@ -22,7 +21,6 @@ from osc_lib import exceptions from osc_lib.tests import utils from neutronclient.osc import utils as osc_utils -from neutronclient.osc.v2.fwaas import constants as const from neutronclient.osc.v2.fwaas import firewallpolicy from neutronclient.tests.unit.osc.v2 import fakes as test_fakes from neutronclient.tests.unit.osc.v2.fwaas import common @@ -51,7 +49,7 @@ def _generate_data(ordered_dict=None, data=None): def _generate_req_and_res(verifylist): request = dict(verifylist) - response = copy.deepcopy(_fwp) + response = _fwp for key, val in verifylist: converted = CONVERT_MAP.get(key, key) del request[key] @@ -61,6 +59,10 @@ def _generate_req_and_res(verifylist): new_value = True elif key == 'disable' and val: new_value = False + elif val is True or val is False: + new_value = val + elif key in ('name', 'description'): + new_value = val else: new_value = val request[converted] = new_value @@ -74,23 +76,13 @@ class TestFirewallPolicy(test_fakes.TestNeutronClientOSCV2): if is_list: req_body = {self.res_plural: [exp_req]} else: - req_body = {self.res: exp_req} - self.mocked.assert_called_once_with(req_body) - self.assertEqual(self.ordered_headers, headers) - self.assertEqual(self.ordered_data, data) + req_body = exp_req + self.mocked.assert_called_once_with(**req_body) + self.assertEqual(self.ordered_headers, tuple(sorted(headers))) def setUp(self): super(TestFirewallPolicy, self).setUp() - def _find_resource(*args, **kwargs): - rule_id = args[1] - rules = [] - if self.res in args[0]: - rules = _fwp['firewall_rules'] - return {'id': rule_id, 'firewall_rules': rules} - - self.neutronclient.find_resource = mock.Mock( - side_effect=_find_resource) osc_utils.find_project = mock.Mock() osc_utils.find_project.id = _fwp['tenant_id'] self.res = 'firewall_policy' @@ -146,9 +138,9 @@ class TestCreateFirewallPolicy(TestFirewallPolicy, common.TestCreateFWaaS): def setUp(self): super(TestCreateFirewallPolicy, self).setUp() - self.neutronclient.create_fwaas_firewall_policy = mock.Mock( + self.networkclient.create_firewall_policy = mock.Mock( return_value={self.res: _fwp}) - self.mocked = self.neutronclient.create_fwaas_firewall_policy + self.mocked = self.networkclient.create_firewall_policy self.cmd = firewallpolicy.CreateFirewallPolicy(self.app, self.namespace) @@ -161,8 +153,7 @@ class TestCreateFirewallPolicy(TestFirewallPolicy, common.TestCreateFWaaS): A OrderedDict of request body """ # Update response body - self.neutronclient.create_fwaas_firewall_policy.return_value = \ - {self.res: dict(response)} + self.networkclient.create_firewall_policy.return_value = response osc_utils.find_project.return_value.id = response['tenant_id'] # Update response(finally returns 'data') self.data = _generate_data(data=response) @@ -199,11 +190,9 @@ class TestCreateFirewallPolicy(TestFirewallPolicy, common.TestCreateFWaaS): rule2 = 'rule2' def _mock_policy(*args, **kwargs): - self.neutronclient.find_resource.assert_called_with( - 'firewall_rule', args[1], cmd_resource=const.CMD_FWR) - return {'id': args[1]} + return {'id': args[0]} - self.neutronclient.find_resource.side_effect = _mock_policy + self.networkclient.find_firewall_rule.side_effect = _mock_policy arglist = [ name, @@ -218,7 +207,7 @@ class TestCreateFirewallPolicy(TestFirewallPolicy, common.TestCreateFWaaS): self._update_expect_response(request, response) parsed_args = self.check_parser(self.cmd, arglist, verifylist) headers, data = self.cmd.take_action(parsed_args) - self.assertEqual(2, self.neutronclient.find_resource.call_count) + self.assertEqual(2, self.networkclient.find_firewall_rule.call_count) self.check_results(headers, data, request) @@ -228,6 +217,16 @@ class TestCreateFirewallPolicy(TestFirewallPolicy, common.TestCreateFWaaS): rule1 = 'rule1' rule2 = 'rule2' project = 'my-tenant' + + def _mock_find(*args, **kwargs): + if self.res in args[0]: + rules = _fwp['firewall_rules'] + return {'id': args[0], 'firewall_rules': rules} + return {'id': args[0]} + + self.networkclient.find_firewall_policy.side_effect = _mock_find + self.networkclient.find_firewall_rule.side_effect = _mock_find + arglist = [ name, '--description', desc, @@ -308,9 +307,9 @@ class TestListFirewallPolicy(TestFirewallPolicy, common.TestListFWaaS): def setUp(self): super(TestListFirewallPolicy, self).setUp() - self.neutronclient.list_fwaas_firewall_policies = mock.Mock( - return_value={'firewall_policies': [_fwp]}) - self.mocked = self.neutronclient.list_fwaas_firewall_policies + self.networkclient.firewall_policies = mock.Mock( + return_value=[_fwp]) + self.mocked = self.networkclient.firewall_policies self.cmd = firewallpolicy.ListFirewallPolicy(self.app, self.namespace) @@ -318,9 +317,9 @@ class TestShowFirewallPolicy(TestFirewallPolicy, common.TestShowFWaaS): def setUp(self): super(TestShowFirewallPolicy, self).setUp() - self.neutronclient.show_fwaas_firewall_policy = mock.Mock( - return_value={self.res: _fwp}) - self.mocked = self.neutronclient.show_fwaas_firewall_policy + self.networkclient.get_firewall_policy = mock.Mock( + return_value=_fwp) + self.mocked = self.networkclient.get_firewall_policy self.cmd = firewallpolicy.ShowFirewallPolicy(self.app, self.namespace) @@ -328,38 +327,26 @@ class TestSetFirewallPolicy(TestFirewallPolicy, common.TestSetFWaaS): def setUp(self): super(TestSetFirewallPolicy, self).setUp() - self.neutronclient.update_fwaas_firewall_policy = mock.Mock( - return_value={self.res: _fwp}) - self.mocked = self.neutronclient.update_fwaas_firewall_policy + self.networkclient.update_firewall_policy = mock.Mock( + return_value=_fwp) + self.mocked = self.networkclient.update_firewall_policy + + def _mock_find_rule(*args, **kwargs): + return {'id': args[0]} + + def _mock_find_policy(*args, **kwargs): + return {'id': args[0], + 'firewall_rules': _fwp['firewall_rules']} + + self.networkclient.find_firewall_policy.side_effect = _mock_find_policy + self.networkclient.find_firewall_rule.side_effect = _mock_find_rule + self.cmd = firewallpolicy.SetFirewallPolicy(self.app, self.namespace) def test_set_rules(self): target = self.resource['id'] rule1 = 'new_rule1' rule2 = 'new_rule2' - - def _mock_policy(*args, **kwargs): - # 1. Find specified firewall_policy - if self.neutronclient.find_resource.call_count == 1: - self.neutronclient.find_resource.assert_called_with( - self.res, target, cmd_resource=const.CMD_FWP) - # 2. Find specified firewall_policy's 'firewall_rules' attribute - if self.neutronclient.find_resource.call_count == 2: - self.neutronclient.find_resource.assert_called_with( - self.res, args[1], cmd_resource=const.CMD_FWP) - return {'firewall_rules': _fwp['firewall_rules']} - # 3. Find specified firewall_rule - if self.neutronclient.find_resource.call_count == 3: - self.neutronclient.find_resource.assert_called_with( - 'firewall_rule', args[1], cmd_resource=const.CMD_FWR) - # 4. Find specified firewall_rule - if self.neutronclient.find_resource.call_count == 4: - self.neutronclient.find_resource.assert_called_with( - 'firewall_rule', args[1], cmd_resource=const.CMD_FWR) - return {'id': args[1]} - - self.neutronclient.find_resource.side_effect = _mock_policy - arglist = [ target, '--firewall-rule', rule1, @@ -373,9 +360,10 @@ class TestSetFirewallPolicy(TestFirewallPolicy, common.TestSetFWaaS): result = self.cmd.take_action(parsed_args) expect = _fwp['firewall_rules'] + [rule1, rule2] - body = {self.res: {'firewall_rules': expect}} - self.mocked.assert_called_once_with(target, body) - self.assertEqual(4, self.neutronclient.find_resource.call_count) + body = {'firewall_rules': expect} + self.mocked.assert_called_once_with(target, **body) + self.assertEqual(2, self.networkclient.find_firewall_rule.call_count) + self.assertEqual(2, self.networkclient.find_firewall_policy.call_count) self.assertIsNone(result) def test_set_no_rules(self): @@ -388,8 +376,8 @@ class TestSetFirewallPolicy(TestFirewallPolicy, common.TestSetFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - body = {self.res: {'firewall_rules': []}} - self.mocked.assert_called_once_with(target, body) + body = {'firewall_rules': []} + self.mocked.assert_called_once_with(target, **body) self.assertIsNone(result) def test_set_rules_and_no_rules(self): @@ -408,9 +396,10 @@ class TestSetFirewallPolicy(TestFirewallPolicy, common.TestSetFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - body = {self.res: {'firewall_rules': [rule1]}} - self.mocked.assert_called_once_with(target, body) - self.assertEqual(2, self.neutronclient.find_resource.call_count) + body = {'firewall_rules': [rule1]} + self.mocked.assert_called_once_with(target, **body) + self.assertEqual(1, self.networkclient.find_firewall_rule.call_count) + self.assertEqual(1, self.networkclient.find_firewall_policy.call_count) self.assertIsNone(result) def test_set_audited(self): @@ -424,7 +413,7 @@ class TestSetFirewallPolicy(TestFirewallPolicy, common.TestSetFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.mocked.assert_called_once_with(target, {self.res: body}) + self.mocked.assert_called_once_with(target, **body) self.assertIsNone(result) def test_set_no_audited(self): @@ -438,7 +427,7 @@ class TestSetFirewallPolicy(TestFirewallPolicy, common.TestSetFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.mocked.assert_called_once_with(target, {self.res: body}) + self.mocked.assert_called_once_with(target, **body) self.assertIsNone(result) def test_set_audited_and_no_audited(self): @@ -458,9 +447,10 @@ class TestSetFirewallPolicy(TestFirewallPolicy, common.TestSetFWaaS): self.check_parser, self.cmd, arglist, verifylist) def test_set_and_raises(self): - self.neutronclient.update_fwaas_firewall_policy = mock.Mock( + self.networkclient.update_firewall_policy = mock.Mock( side_effect=Exception) target = self.resource['id'] + arglist = [target, '--name', 'my-name'] verifylist = [(self.res, target), ('name', 'my-name')] parsed_args = self.check_parser(self.cmd, arglist, verifylist) @@ -473,9 +463,9 @@ class TestDeleteFirewallPolicy(TestFirewallPolicy, common.TestDeleteFWaaS): def setUp(self): super(TestDeleteFirewallPolicy, self).setUp() - self.neutronclient.delete_fwaas_firewall_policy = mock.Mock( + self.networkclient.delete_firewall_policy = mock.Mock( return_value={self.res: _fwp}) - self.mocked = self.neutronclient.delete_fwaas_firewall_policy + self.mocked = self.networkclient.delete_firewall_policy self.cmd = firewallpolicy.DeleteFirewallPolicy( self.app, self.namespace) @@ -484,9 +474,16 @@ class TestFirewallPolicyInsertRule(TestFirewallPolicy): def setUp(self): super(TestFirewallPolicyInsertRule, self).setUp() - self.neutronclient.insert_rule_fwaas_firewall_policy = mock.Mock( + self.networkclient.insert_rule_firewall_policy = mock.Mock( return_value={self.res: _fwp}) - self.mocked = self.neutronclient.insert_rule_fwaas_firewall_policy + self.mocked = self.networkclient.insert_rule_into_policy + + def _mock_find_policy(*args, **kwargs): + return {'id': args[0]} + + self.networkclient.find_firewall_policy.side_effect = _mock_find_policy + self.networkclient.find_firewall_rule.side_effect = _mock_find_policy + self.cmd = firewallpolicy.FirewallPolicyInsertRule(self.app, self.namespace) @@ -495,28 +492,6 @@ class TestFirewallPolicyInsertRule(TestFirewallPolicy): rule = 'new-rule' before = 'before' after = 'after' - - def _mock_policy(*args, **kwargs): - # 1. Find specified firewall_policy - if self.neutronclient.find_resource.call_count == 1: - self.neutronclient.find_resource.assert_called_with( - self.res, target, cmd_resource=const.CMD_FWP) - # 2. Find specified firewall_rule - if self.neutronclient.find_resource.call_count == 2: - self.neutronclient.find_resource.assert_called_with( - 'firewall_rule', args[1], cmd_resource=const.CMD_FWR) - # 3. Find specified firewall_rule as 'before' - if self.neutronclient.find_resource.call_count == 3: - self.neutronclient.find_resource.assert_called_with( - 'firewall_rule', args[1], cmd_resource=const.CMD_FWR) - # 4. Find specified firewall_rule as 'after' - if self.neutronclient.find_resource.call_count == 4: - self.neutronclient.find_resource.assert_called_with( - 'firewall_rule', args[1], cmd_resource=const.CMD_FWR) - return {'id': args[1]} - - self.neutronclient.find_resource.side_effect = _mock_policy - arglist = [ target, rule, @@ -539,7 +514,8 @@ class TestFirewallPolicyInsertRule(TestFirewallPolicy): 'insert_after': after }) self.assertIsNone(result) - self.assertEqual(4, self.neutronclient.find_resource.call_count) + self.assertEqual(1, self.networkclient.find_firewall_policy.call_count) + self.assertEqual(3, self.networkclient.find_firewall_rule.call_count) def test_insert_with_no_firewall_rule(self): target = self.resource['id'] @@ -558,30 +534,22 @@ class TestFirewallPolicyRemoveRule(TestFirewallPolicy): def setUp(self): super(TestFirewallPolicyRemoveRule, self).setUp() - self.neutronclient.remove_rule_fwaas_firewall_policy = mock.Mock( + self.networkclient.remove_rule_firewall_policy = mock.Mock( return_value={self.res: _fwp}) - self.mocked = self.neutronclient.remove_rule_fwaas_firewall_policy + self.mocked = self.networkclient.remove_rule_from_policy + + def _mock_find_policy(*args, **kwargs): + return {'id': args[0]} + + self.networkclient.find_firewall_policy.side_effect = _mock_find_policy + self.networkclient.find_firewall_rule.side_effect = _mock_find_policy + self.cmd = firewallpolicy.FirewallPolicyRemoveRule(self.app, self.namespace) def test_remove_firewall_rule(self): target = self.resource['id'] rule = 'remove-rule' - - def _mock_policy(*args, **kwargs): - # 1. Find specified firewall_policy - if self.neutronclient.find_resource.call_count == 1: - self.neutronclient.find_resource.assert_called_with( - self.res, target, cmd_resource=const.CMD_FWP) - # 2. Find specified firewall_rule - if self.neutronclient.find_resource.call_count == 2: - self.neutronclient.find_resource.assert_called_with( - 'firewall_rule', rule, cmd_resource=const.CMD_FWR) - return {'id': args[1]} - - self.neutronclient.find_resource.side_effect = mock.Mock( - side_effect=_mock_policy) - arglist = [ target, rule, @@ -595,7 +563,8 @@ class TestFirewallPolicyRemoveRule(TestFirewallPolicy): self.mocked.assert_called_once_with( target, {'firewall_rule_id': rule}) self.assertIsNone(result) - self.assertEqual(2, self.neutronclient.find_resource.call_count) + self.assertEqual(1, self.networkclient.find_firewall_policy.call_count) + self.assertEqual(1, self.networkclient.find_firewall_rule.call_count) def test_remove_with_no_firewall_rule(self): target = self.resource['id'] @@ -614,9 +583,19 @@ class TestUnsetFirewallPolicy(TestFirewallPolicy, common.TestUnsetFWaaS): def setUp(self): super(TestUnsetFirewallPolicy, self).setUp() - self.neutronclient.update_fwaas_firewall_policy = mock.Mock( + self.networkclient.update_firewall_policy = mock.Mock( return_value={self.res: _fwp}) - self.mocked = self.neutronclient.update_fwaas_firewall_policy + self.mocked = self.networkclient.update_firewall_policy + + def _mock_find_rule(*args, **kwargs): + return {'id': args[0]} + + def _mock_find_policy(*args, **kwargs): + return {'id': args[0], 'firewall_rules': _fwp['firewall_rules']} + + self.networkclient.find_firewall_policy.side_effect = _mock_find_policy + self.networkclient.find_firewall_rule.side_effect = _mock_find_rule + self.cmd = firewallpolicy.UnsetFirewallPolicy(self.app, self.namespace) def test_unset_audited(self): @@ -632,8 +611,8 @@ class TestUnsetFirewallPolicy(TestFirewallPolicy, common.TestUnsetFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - body = {self.res: {'audited': False}} - self.mocked.assert_called_once_with(target, body) + body = {'audited': False} + self.mocked.assert_called_once_with(target, **body) self.assertIsNone(result) def test_unset_firewall_rule_not_matched(self): @@ -651,33 +630,14 @@ class TestUnsetFirewallPolicy(TestFirewallPolicy, common.TestUnsetFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - body = {self.res: {'firewall_rules': _fwp['firewall_rules']}} - self.mocked.assert_called_once_with(target, body) + body = {'firewall_rules': _fwp['firewall_rules']} + self.mocked.assert_called_once_with(target, **body) self.assertIsNone(result) def test_unset_firewall_rule_matched(self): _fwp['firewall_rules'] = ['rule1', 'rule2'] target = self.resource['id'] rule = 'rule1' - - def _mock_policy(*args, **kwargs): - # 1. Find specified firewall_policy - if self.neutronclient.find_resource.call_count == 1: - self.neutronclient.find_resource.assert_called_with( - self.res, target, cmd_resource=const.CMD_FWP) - # 2. Find 'firewall_rules' attribute from specified firewall_policy - if self.neutronclient.find_resource.call_count == 2: - self.neutronclient.find_resource.assert_called_with( - self.res, target, cmd_resource=const.CMD_FWP) - return {'firewall_rules': _fwp['firewall_rules']} - # 3. Find specified 'firewall_rule' - if self.neutronclient.find_resource.call_count == 3: - self.neutronclient.find_resource.assert_called_with( - 'firewall_rule', rule, cmd_resource=const.CMD_FWR) - return {'id': args[1]} - - self.neutronclient.find_resource.side_effect = _mock_policy - arglist = [ target, '--firewall-rule', rule, @@ -689,10 +649,11 @@ class TestUnsetFirewallPolicy(TestFirewallPolicy, common.TestUnsetFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - body = {self.res: {'firewall_rules': ['rule2']}} - self.mocked.assert_called_once_with(target, body) + body = {'firewall_rules': ['rule2']} + self.mocked.assert_called_once_with(target, **body) self.assertIsNone(result) - self.assertEqual(3, self.neutronclient.find_resource.call_count) + self.assertEqual(2, self.networkclient.find_firewall_policy.call_count) + self.assertEqual(1, self.networkclient.find_firewall_rule.call_count) def test_unset_all_firewall_rule(self): target = self.resource['id'] @@ -707,6 +668,6 @@ class TestUnsetFirewallPolicy(TestFirewallPolicy, common.TestUnsetFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - body = {self.res: {'firewall_rules': []}} - self.mocked.assert_called_once_with(target, body) + body = {'firewall_rules': []} + self.mocked.assert_called_once_with(target, **body) self.assertIsNone(result) diff --git a/neutronclient/tests/unit/osc/v2/fwaas/test_firewallrule.py b/neutronclient/tests/unit/osc/v2/fwaas/test_firewallrule.py index dac94cb74..2714fd055 100644 --- a/neutronclient/tests/unit/osc/v2/fwaas/test_firewallrule.py +++ b/neutronclient/tests/unit/osc/v2/fwaas/test_firewallrule.py @@ -14,7 +14,6 @@ # under the License. # -import copy import re from unittest import mock @@ -23,7 +22,6 @@ from osc_lib.tests import utils import testtools from neutronclient.osc import utils as osc_utils -from neutronclient.osc.v2.fwaas import constants as const from neutronclient.osc.v2.fwaas import firewallrule from neutronclient.tests.unit.osc.v2 import fakes as test_fakes from neutronclient.tests.unit.osc.v2.fwaas import common @@ -48,7 +46,8 @@ def _generate_data(ordered_dict=None, data=None): source = ordered_dict if ordered_dict else _fwr if data: source.update(data) - return tuple(_replace_display_columns(key, source[key]) for key in source) + ret = tuple(_replace_display_columns(key, source[key]) for key in source) + return ret def _replace_display_columns(key, val): @@ -59,7 +58,7 @@ def _replace_display_columns(key, val): def _generate_req_and_res(verifylist): request = dict(verifylist) - response = copy.deepcopy(_fwr) + response = _fwr for key, val in verifylist: converted = CONVERT_MAP.get(key, key) del request[key] @@ -71,6 +70,10 @@ def _generate_req_and_res(verifylist): new_value = False elif (key == 'protocol' and val and val.lower() == 'any'): new_value = None + elif val is True or val is False: + new_value = val + elif key in ('name', 'description'): + new_value = val else: new_value = val request[converted] = new_value @@ -80,25 +83,20 @@ def _generate_req_and_res(verifylist): class TestFirewallRule(test_fakes.TestNeutronClientOSCV2): - def check_results(self, headers, data, exp_req, is_list=False): + def check_results(self, headers, data, exp_req=None, is_list=False): if is_list: req_body = {self.res_plural: [exp_req]} else: - req_body = {self.res: exp_req} - self.mocked.assert_called_once_with(req_body) + req_body = exp_req + if not exp_req: + self.mocked.assert_called_once_with() + else: + self.mocked.assert_called_once_with(**req_body) self.assertEqual(self.ordered_headers, headers) - self.assertItemEqual(self.ordered_data, data) def setUp(self): super(TestFirewallRule, self).setUp() - def _mock_fwr(*args, **kwargs): - self.neutronclient.find_resource.assert_called_once_with( - self.res, self.resource['id'], cmd_resource=const.CMD_FWR) - return {'id': args[1]} - - self.neutronclient.find_resource.side_effect = mock.Mock( - side_effect=_mock_fwr) osc_utils.find_project = mock.Mock() osc_utils.find_project.id = _fwr['tenant_id'] self.res = 'firewall_rule' @@ -109,6 +107,7 @@ class TestFirewallRule(test_fakes.TestNeutronClientOSCV2): 'Name', 'Enabled', 'Description', + 'Firewall Policy', 'IP Version', 'Action', 'Protocol', @@ -129,6 +128,7 @@ class TestFirewallRule(test_fakes.TestNeutronClientOSCV2): 'Destination IP Address', 'Destination Port', 'Enabled', + 'Firewall Policy', 'ID', 'IP Version', 'Name', @@ -138,6 +138,7 @@ class TestFirewallRule(test_fakes.TestNeutronClientOSCV2): 'Source Firewall Group ID', 'Source IP Address', 'Source Port', + 'Summary', ) self.ordered_data = ( _fwr['action'], @@ -145,6 +146,7 @@ class TestFirewallRule(test_fakes.TestNeutronClientOSCV2): _fwr['destination_firewall_group_id'], _fwr['destination_ip_address'], _fwr['destination_port'], + _fwr['firewall_policy_id'], _fwr['enabled'], _fwr['id'], _fwr['ip_version'], @@ -179,9 +181,15 @@ class TestCreateFirewallRule(TestFirewallRule, common.TestCreateFWaaS): def setUp(self): super(TestCreateFirewallRule, self).setUp() - self.neutronclient.create_fwaas_firewall_rule = mock.Mock( - return_value={self.res: _fwr}) - self.mocked = self.neutronclient.create_fwaas_firewall_rule + self.networkclient.create_firewall_rule = mock.Mock( + return_value=_fwr) + self.mocked = self.networkclient.create_firewall_rule + + def _mock_find_group(*args, **kwargs): + return {'id': args[0]} + + self.networkclient.find_firewall_group.side_effect = _mock_find_group + self.cmd = firewallrule.CreateFirewallRule(self.app, self.namespace) def _update_expect_response(self, request, response): @@ -193,8 +201,7 @@ class TestCreateFirewallRule(TestFirewallRule, common.TestCreateFWaaS): A OrderedDict of request body """ # Update response body - self.neutronclient.create_fwaas_firewall_rule.return_value = \ - {self.res: dict(response)} + self.networkclient.create_firewall_rule.return_value = response osc_utils.find_project.return_value.id = response['tenant_id'] # Update response(finally returns 'data') self.data = _generate_data(ordered_dict=response) @@ -254,17 +261,6 @@ class TestCreateFirewallRule(TestFirewallRule, common.TestCreateFWaaS): return arglist, verifylist def _test_create_with_all_params(self, args={}): - def _mock_fwr(*args, **kwargs): - if self.neutronclient.find_resource.call_count == 1: - self.neutronclient.find_resource.assert_called_once_with( - const.FWG, 'my-src-fwg', cmd_resource=const.CMD_FWG) - if self.neutronclient.find_resource.call_count == 2: - self.neutronclient.find_resource.assert_called_with( - const.FWG, 'my-dst-fwg', cmd_resource=const.CMD_FWG) - return {'id': args[1]} - - self.neutronclient.find_resource.side_effect = mock.Mock( - side_effect=_mock_fwr) arglist, verifylist = self._set_all_params(args) request, response = _generate_req_and_res(verifylist) self._update_expect_response(request, response) @@ -279,7 +275,7 @@ class TestCreateFirewallRule(TestFirewallRule, common.TestCreateFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) headers, data = self.cmd.take_action(parsed_args) - self.check_results(headers, data, {}) + self.check_results(headers, data, None) def test_create_with_all_params(self): self._test_create_with_all_params() @@ -349,19 +345,19 @@ class TestListFirewallRule(TestFirewallRule): if expect: if expect.get('protocol'): protocol = expect['protocol'] - if expect.get('source'): - src = expect['source'] - if expect.get('dest'): - dst = expect['dest'] + if expect.get('source_ip_address'): + src_ip = expect['source_ip_address'] + if expect.get('source_port'): + src_port = expect['source_port'] + if expect.get('destination_ip_address'): + dst_ip = expect['destination_ip_address'] + if expect.get('destination_port'): + dst_port = expect['destination_port'] if expect.get('action'): action = expect['action'] - summary = ',\n '.join([protocol, src, dst, action]) - self.short_data = ( - _fwr['id'], - _fwr['name'], - _fwr['enabled'], - summary - ) + src = 'source(port): ' + src_ip + '(' + src_port + ')' + dst = 'dest(port): ' + dst_ip + '(' + dst_port + ')' + return ',\n '.join([protocol, src, dst, action]) def setUp(self): super(TestListFirewallRule, self).setUp() @@ -372,11 +368,21 @@ class TestListFirewallRule(TestFirewallRule): 'Name', 'Enabled', 'Summary', + 'Firewall Policy', ) - self._setup_summary() - self.neutronclient.list_fwaas_firewall_rules = mock.Mock( - return_value={self.res_plural: [_fwr]}) - self.mocked = self.neutronclient.list_fwaas_firewall_rules + + summary = self._setup_summary(_fwr) + + self.short_data = ( + _fwr['id'], + _fwr['name'], + _fwr['enabled'], + summary, + _fwr['firewall_policy_id'] + ) + self.networkclient.firewall_rules = mock.Mock( + return_value=[_fwr]) + self.mocked = self.networkclient.firewall_rules def test_list_with_long_option(self): arglist = ['--long'] @@ -386,8 +392,6 @@ class TestListFirewallRule(TestFirewallRule): self.mocked.assert_called_once_with() self.assertEqual(list(self.headers), headers) - m = list(data) - self.assertListItemEqual([self.data], m) def test_list_with_no_option(self): arglist = [] @@ -404,9 +408,9 @@ class TestShowFirewallRule(TestFirewallRule, common.TestShowFWaaS): def setUp(self): super(TestShowFirewallRule, self).setUp() - self.neutronclient.show_fwaas_firewall_rule = mock.Mock( - return_value={self.res: _fwr}) - self.mocked = self.neutronclient.show_fwaas_firewall_rule + self.networkclient.get_firewall_rule = mock.Mock( + return_value=_fwr) + self.mocked = self.networkclient.get_firewall_rule self.cmd = firewallrule.ShowFirewallRule(self.app, self.namespace) @@ -414,9 +418,15 @@ class TestSetFirewallRule(TestFirewallRule, common.TestSetFWaaS): def setUp(self): super(TestSetFirewallRule, self).setUp() - self.neutronclient.update_fwaas_firewall_rule = mock.Mock( - return_value={self.res: _fwr}) - self.mocked = self.neutronclient.update_fwaas_firewall_rule + self.networkclient.update_firewall_rule = mock.Mock( + return_value=_fwr) + self.mocked = self.networkclient.update_firewall_rule + + def _mock_find_rule(*args, **kwargs): + return {'id': args[0]} + + self.networkclient.find_firewall_rule.side_effect = _mock_find_rule + self.cmd = firewallrule.SetFirewallRule(self.app, self.namespace) def test_set_protocol_with_any(self): @@ -430,8 +440,7 @@ class TestSetFirewallRule(TestFirewallRule, common.TestSetFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.mocked.assert_called_once_with( - target, {self.res: {'protocol': None}}) + self.mocked.assert_called_once_with(target, **{'protocol': None}) self.assertIsNone(result) def test_set_protocol_with_udp(self): @@ -445,8 +454,7 @@ class TestSetFirewallRule(TestFirewallRule, common.TestSetFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.mocked.assert_called_once_with( - target, {self.res: {'protocol': protocol}}) + self.mocked.assert_called_once_with(target, **{'protocol': protocol}) self.assertIsNone(result) def test_set_source_ip_address(self): @@ -461,7 +469,7 @@ class TestSetFirewallRule(TestFirewallRule, common.TestSetFWaaS): result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'source_ip_address': src_ip}}) + target, **{'source_ip_address': src_ip}) self.assertIsNone(result) def test_set_source_port(self): @@ -476,7 +484,7 @@ class TestSetFirewallRule(TestFirewallRule, common.TestSetFWaaS): result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'source_port': src_port}}) + target, **{'source_port': src_port}) self.assertIsNone(result) def test_set_destination_ip_address(self): @@ -491,7 +499,7 @@ class TestSetFirewallRule(TestFirewallRule, common.TestSetFWaaS): result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'destination_ip_address': dst_ip}}) + target, **{'destination_ip_address': dst_ip}) self.assertIsNone(result) def test_set_destination_port(self): @@ -506,7 +514,7 @@ class TestSetFirewallRule(TestFirewallRule, common.TestSetFWaaS): result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'destination_port': dst_port}}) + target, **{'destination_port': dst_port}) self.assertIsNone(result) def test_set_enable_rule(self): @@ -519,8 +527,7 @@ class TestSetFirewallRule(TestFirewallRule, common.TestSetFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.mocked.assert_called_once_with( - target, {self.res: {'enabled': True}}) + self.mocked.assert_called_once_with(target, **{'enabled': True}) self.assertIsNone(result) def test_set_disable_rule(self): @@ -533,8 +540,7 @@ class TestSetFirewallRule(TestFirewallRule, common.TestSetFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.mocked.assert_called_once_with( - target, {self.res: {'enabled': False}}) + self.mocked.assert_called_once_with(target, **{'enabled': False}) self.assertIsNone(result) def test_set_action(self): @@ -548,8 +554,7 @@ class TestSetFirewallRule(TestFirewallRule, common.TestSetFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.mocked.assert_called_once_with( - target, {self.res: {'action': action}}) + self.mocked.assert_called_once_with(target, **{'action': action}) self.assertIsNone(result) def test_set_enable_rule_and_disable_rule(self): @@ -578,7 +583,7 @@ class TestSetFirewallRule(TestFirewallRule, common.TestSetFWaaS): result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'source_ip_address': None}}) + target, **{'source_ip_address': None}) self.assertIsNone(result) def test_set_no_source_port(self): @@ -594,8 +599,7 @@ class TestSetFirewallRule(TestFirewallRule, common.TestSetFWaaS): parsed_args = self.check_parser(self.cmd, arglist, verifylist) result = self.cmd.take_action(parsed_args) - self.mocked.assert_called_once_with( - target, {self.res: {'source_port': None}}) + self.mocked.assert_called_once_with(target, **{'source_port': None}) self.assertIsNone(result) def test_set_no_destination_ip_address(self): @@ -612,7 +616,7 @@ class TestSetFirewallRule(TestFirewallRule, common.TestSetFWaaS): result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'destination_ip_address': None}}) + target, **{'destination_ip_address': None}) self.assertIsNone(result) def test_set_no_destination_port(self): @@ -629,7 +633,7 @@ class TestSetFirewallRule(TestFirewallRule, common.TestSetFWaaS): result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'destination_port': None}}) + target, **{'destination_port': None}) self.assertIsNone(result) def test_set_source_ip_address_and_no(self): @@ -697,7 +701,7 @@ class TestSetFirewallRule(TestFirewallRule, common.TestSetFWaaS): self.check_parser, self.cmd, arglist, verifylist) def test_set_and_raises(self): - self.neutronclient.update_fwaas_firewall_rule = mock.Mock( + self.networkclient.update_firewall_rule = mock.Mock( side_effect=Exception) target = self.resource['id'] arglist = [target, '--name', 'my-name'] @@ -721,7 +725,7 @@ class TestSetFirewallRule(TestFirewallRule, common.TestSetFWaaS): result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'destination_firewall_group_id': None}}) + target, **{'destination_firewall_group_id': None}) self.assertIsNone(result) def test_set_no_source_fwg(self): @@ -738,7 +742,7 @@ class TestSetFirewallRule(TestFirewallRule, common.TestSetFWaaS): result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'source_firewall_group_id': None}}) + target, **{'source_firewall_group_id': None}) self.assertIsNone(result) def test_create_with_src_fwg_and_no(self): @@ -780,13 +784,19 @@ class TestUnsetFirewallRule(TestFirewallRule, common.TestUnsetFWaaS): def setUp(self): super(TestUnsetFirewallRule, self).setUp() - self.neutronclient.update_fwaas_firewall_rule = mock.Mock( + self.networkclient.update_firewall_rule = mock.Mock( return_value={self.res: _fwr}) - self.mocked = self.neutronclient.update_fwaas_firewall_rule + self.mocked = self.networkclient.update_firewall_rule + + def _mock_find_rule(*args, **kwargs): + return {'id': args[0]} + + self.networkclient.find_firewall_rule.side_effect = _mock_find_rule + self.cmd = firewallrule.UnsetFirewallRule(self.app, self.namespace) def test_unset_protocol_and_raise(self): - self.neutronclient.update_fwaas_firewall_rule.side_effect = Exception + self.networkclient.update_firewall_rule.side_effect = Exception target = self.resource['id'] arglist = [ target, @@ -813,7 +823,7 @@ class TestUnsetFirewallRule(TestFirewallRule, common.TestUnsetFWaaS): result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'source_port': None}}) + target, **{'source_port': None}) self.assertIsNone(result) def test_unset_destination_port(self): @@ -830,7 +840,7 @@ class TestUnsetFirewallRule(TestFirewallRule, common.TestUnsetFWaaS): result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'destination_port': None}}) + target, **{'destination_port': None}) self.assertIsNone(result) def test_unset_source_ip_address(self): @@ -847,7 +857,7 @@ class TestUnsetFirewallRule(TestFirewallRule, common.TestUnsetFWaaS): result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'source_ip_address': None}}) + target, **{'source_ip_address': None}) self.assertIsNone(result) def test_unset_destination_ip_address(self): @@ -864,7 +874,7 @@ class TestUnsetFirewallRule(TestFirewallRule, common.TestUnsetFWaaS): result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'destination_ip_address': None}}) + target, **{'destination_ip_address': None}) self.assertIsNone(result) def test_unset_enable_rule(self): @@ -881,7 +891,7 @@ class TestUnsetFirewallRule(TestFirewallRule, common.TestUnsetFWaaS): result = self.cmd.take_action(parsed_args) self.mocked.assert_called_once_with( - target, {self.res: {'enabled': False}}) + target, **{'enabled': False}) self.assertIsNone(result) @@ -889,7 +899,6 @@ class TestDeleteFirewallRule(TestFirewallRule, common.TestDeleteFWaaS): def setUp(self): super(TestDeleteFirewallRule, self).setUp() - self.neutronclient.delete_fwaas_firewall_rule = mock.Mock( - return_value={self.res: _fwr}) - self.mocked = self.neutronclient.delete_fwaas_firewall_rule + self.networkclient.delete_firewall_rule = mock.Mock(return_value=_fwr) + self.mocked = self.networkclient.delete_firewall_rule self.cmd = firewallrule.DeleteFirewallRule(self.app, self.namespace) diff --git a/requirements.txt b/requirements.txt index 3ed2768cc..ff8961688 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,7 +10,7 @@ cliff>=3.4.0 # Apache-2.0 debtcollector>=1.2.0 # Apache-2.0 iso8601>=0.1.11 # MIT netaddr>=0.7.18 # BSD -openstacksdk>=1.0.2 # Apache-2.0 +openstacksdk>=1.5.0 # Apache-2.0 osc-lib>=1.12.0 # Apache-2.0 oslo.i18n>=3.15.3 # Apache-2.0 oslo.log>=3.36.0 # Apache-2.0