Login
[x]
Log in using an account from:
Fedora Account System
Red Hat Associate
Red Hat Customer
Or login using a Red Hat Bugzilla account
Forgot Password
Login:
Hide Forgot
Create an Account
Red Hat Bugzilla – Attachment 892762 Details for
Bug 1063141
CVE-2014-0056 openstack-neutron: insufficient authorization checks when creating ports
[?]
New
Simple Search
Advanced Search
My Links
Browse
Requests
Reports
Current State
Search
Tabular reports
Graphical reports
Duplicates
Other Reports
User Changes
Plotly Reports
Bug Status
Bug Severity
Non-Defaults
|
Product Dashboard
Help
Page Help!
Bug Writing Guidelines
What's new
Browser Support Policy
5.0.4.rh83 Release notes
FAQ
Guides index
User guide
Web Services
Contact
Legal
This site requires JavaScript to be enabled to function correctly, please enable it.
[patch]
havana patch
havana.patch (text/plain), 9.98 KB, created by
Garth Mollett
on 2014-05-06 05:25:06 UTC
(
hide
)
Description:
havana patch
Filename:
MIME Type:
Creator:
Garth Mollett
Created:
2014-05-06 05:25:06 UTC
Size:
9.98 KB
patch
obsolete
>diff --git a/neutron/common/exceptions.py b/neutron/common/exceptions.py >index 7b02647..88fa6e4 100644 >--- a/neutron/common/exceptions.py >+++ b/neutron/common/exceptions.py >@@ -301,3 +301,8 @@ class NetworkVlanRangeError(NeutronException): > > class NetworkVxlanPortRangeError(object): > message = _("Invalid network VXLAN port range: '%(vxlan_range)s'") >+ >+ >+class DeviceIDNotOwnedByTenant(Conflict): >+ message = _("The following device_id %(device_id)s is not owned by your " >+ "tenant or matches another tenants router.") >diff --git a/neutron/db/db_base_plugin_v2.py b/neutron/db/db_base_plugin_v2.py >index 2afbac5..872463f 100644 >--- a/neutron/db/db_base_plugin_v2.py >+++ b/neutron/db/db_base_plugin_v2.py >@@ -27,14 +27,18 @@ from sqlalchemy.orm import exc > from neutron.api.v2 import attributes > from neutron.common import constants > from neutron.common import exceptions as q_exc >+from neutron import context as ctx > from neutron.db import api as db > from neutron.db import models_v2 > from neutron.db import sqlalchemyutils >+from neutron.extensions import l3 >+from neutron import manager > from neutron import neutron_plugin_base_v2 > from neutron.openstack.common import excutils > from neutron.openstack.common import log as logging > from neutron.openstack.common import timeutils > from neutron.openstack.common import uuidutils >+from neutron.plugins.common import constants as service_constants > > > LOG = logging.getLogger(__name__) >@@ -1311,6 +1315,9 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2, > # NOTE(jkoelker) Get the tenant_id outside of the session to avoid > # unneeded db action if the operation raises > tenant_id = self._get_tenant_id_for_create(context, p) >+ if p.get('device_owner') == constants.DEVICE_OWNER_ROUTER_INTF: >+ self._enforce_device_owner_not_router_intf_or_device_id(context, p, >+ tenant_id) > > with context.session.begin(subtransactions=True): > network = self._get_network(context, network_id) >@@ -1374,6 +1381,23 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2, > changed_ips = False > with context.session.begin(subtransactions=True): > port = self._get_port(context, id) >+ if 'device_owner' in p: >+ current_device_owner = p['device_owner'] >+ changed_device_owner = True >+ else: >+ current_device_owner = port['device_owner'] >+ changed_device_owner = False >+ if p.get('device_id') != port['device_id']: >+ changed_device_id = True >+ >+ # if the current device_owner is ROUTER_INF and the device_id or >+ # device_owner changed check device_id is not another tenants >+ # router >+ if ((current_device_owner == constants.DEVICE_OWNER_ROUTER_INTF) >+ and (changed_device_id or changed_device_owner)): >+ self._enforce_device_owner_not_router_intf_or_device_id( >+ context, p, port['tenant_id'], port) >+ > # Check if the IPs need to be updated > if 'fixed_ips' in p: > changed_ips = True >@@ -1483,3 +1507,41 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2, > > def get_ports_count(self, context, filters=None): > return self._get_ports_query(context, filters).count() >+ >+ def _enforce_device_owner_not_router_intf_or_device_id(self, context, >+ port_request, >+ tenant_id, >+ db_port=None): >+ if not context.is_admin: >+ # find the device_id. If the call was update_port and the >+ # device_id was not passed in we use the device_id from the >+ # db. >+ device_id = port_request.get('device_id') >+ if not device_id and db_port: >+ device_id = db_port.get('device_id') >+ # check to make sure device_id does not match another tenants >+ # router. >+ if device_id: >+ if hasattr(self, 'get_router'): >+ try: >+ ctx_admin = ctx.get_admin_context() >+ router = self.get_router(ctx_admin, device_id) >+ except l3.RouterNotFound: >+ return >+ else: >+ l3plugin = ( >+ manager.NeutronManager.get_service_plugins().get( >+ service_constants.L3_ROUTER_NAT)) >+ if l3plugin: >+ try: >+ ctx_admin = ctx.get_admin_context() >+ router = l3plugin.get_router(ctx_admin, >+ device_id) >+ except l3.RouterNotFound: >+ return >+ else: >+ # raise as extension doesn't support L3 anyways. >+ raise q_exc.DeviceIDNotOwnedByTenant( >+ device_id=device_id) >+ if tenant_id != router['tenant_id']: >+ raise q_exc.DeviceIDNotOwnedByTenant(device_id=device_id) >diff --git a/neutron/tests/unit/test_l3_plugin.py b/neutron/tests/unit/test_l3_plugin.py >index 4f75b57..9cc5cf9 100644 >--- a/neutron/tests/unit/test_l3_plugin.py >+++ b/neutron/tests/unit/test_l3_plugin.py >@@ -379,7 +379,8 @@ class L3NatTestCaseMixin(object): > > def _router_interface_action(self, action, router_id, subnet_id, port_id, > expected_code=exc.HTTPOk.code, >- expected_body=None): >+ expected_body=None, >+ tenant_id=None): > interface_data = {} > if subnet_id: > interface_data.update({'subnet_id': subnet_id}) >@@ -388,6 +389,10 @@ class L3NatTestCaseMixin(object): > > req = self.new_action_request('routers', interface_data, router_id, > "%s_router_interface" % action) >+ # if tenant_id was specified, create a tenant context for this request >+ if tenant_id: >+ req.environ['neutron.context'] = context.Context( >+ '', tenant_id) > res = req.get_response(self.ext_api) > self.assertEqual(res.status_int, expected_code) > response = self.deserialize(self.fmt, res) >@@ -968,6 +973,62 @@ class L3NatTestCaseBase(L3NatTestCaseMixin): > gw_info = body['router']['external_gateway_info'] > self.assertEqual(gw_info, None) > >+ def test_create_router_port_with_device_id_of_other_teants_router(self): >+ with self.router() as admin_router: >+ with self.network(tenant_id='tenant_a', >+ set_context=True) as n: >+ with self.subnet(network=n): >+ self._create_port( >+ self.fmt, n['network']['id'], >+ tenant_id='tenant_a', >+ device_id=admin_router['router']['id'], >+ device_owner='network:router_interface', >+ set_context=True, >+ expected_res_status=exc.HTTPConflict.code) >+ >+ def test_create_non_router_port_device_id_of_other_teants_router_update( >+ self): >+ # This tests that HTTPConflict is raised if we create a non-router >+ # port that matches the device_id of another tenants router and then >+ # we change the device_owner to be network:router_interface. >+ with self.router() as admin_router: >+ with self.network(tenant_id='tenant_a', >+ set_context=True) as n: >+ with self.subnet(network=n): >+ port_res = self._create_port( >+ self.fmt, n['network']['id'], >+ tenant_id='tenant_a', >+ device_id=admin_router['router']['id'], >+ set_context=True) >+ port = self.deserialize(self.fmt, port_res) >+ neutron_context = context.Context('', 'tenant_a') >+ data = {'port': {'device_owner': >+ 'network:router_interface'}} >+ self._update('ports', port['port']['id'], data, >+ neutron_context=neutron_context, >+ expected_code=exc.HTTPConflict.code) >+ self._delete('ports', port['port']['id']) >+ >+ def test_update_port_device_id_to_different_tenants_router(self): >+ with self.router() as admin_router: >+ with self.router(tenant_id='tenant_a', >+ set_context=True) as tenant_router: >+ with self.network(tenant_id='tenant_a', >+ set_context=True) as n: >+ with self.subnet(network=n) as s: >+ port = self._router_interface_action( >+ 'add', tenant_router['router']['id'], >+ s['subnet']['id'], None, tenant_id='tenant_a') >+ neutron_context = context.Context('', 'tenant_a') >+ data = {'port': >+ {'device_id': admin_router['router']['id']}} >+ self._update('ports', port['port_id'], data, >+ neutron_context=neutron_context, >+ expected_code=exc.HTTPConflict.code) >+ self._router_interface_action( >+ 'remove', tenant_router['router']['id'], >+ s['subnet']['id'], None, tenant_id='tenant_a') >+ > def test_router_add_gateway_invalid_network_returns_404(self): > with self.router() as r: > self._add_external_gateway_to_router(
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 1063141
: 892762