[virt-tools-list] [virt-bootstrap] [PATCH v4 15/26] Make UID/GID mapping reusable
Cedric Bosdonnat
cbosdonnat at suse.com
Thu Aug 3 16:06:27 UTC 2017
On Thu, 2017-08-03 at 14:13 +0100, Radostin Stoyanov wrote:
> Move the implementation of UID/GID file ownership mapping for root file
> system in the utils module.
>
> This could be used for the ownership mapping for files in qcow2 images.
>
> Update the unit tests as well.
> ---
> src/virtBootstrap/utils.py | 72 ++++++++++++++++
> src/virtBootstrap/virt_bootstrap.py | 74 +---------------
> tests/test_utils.py | 149 ++++++++++++++++++++++++++++++++
> tests/test_virt_bootstrap.py | 168 ++----------------------------------
> 4 files changed, 230 insertions(+), 233 deletions(-)
>
> diff --git a/src/virtBootstrap/utils.py b/src/virtBootstrap/utils.py
> index 965dcad..071bb3e 100644
> --- a/src/virtBootstrap/utils.py
> +++ b/src/virtBootstrap/utils.py
> @@ -471,3 +471,75 @@ def write_progress(prog):
> # Write message to console
> sys.stdout.write(msg)
> sys.stdout.flush()
> +
> +
> +# The implementation for remapping ownership of all files inside a
> +# container's rootfs is inspired by the tool uidmapshift:
> +#
> +# Original author: Serge Hallyn <serge.hallyn at ubuntu.com>
> +# Original license: GPLv2
> +# http://bazaar.launchpad.net/%7Eserge-hallyn/+junk/nsexec/view/head:/uidmapshift.c
> +
> +def get_map_id(old_id, opts):
> + """
> + Calculate new map_id.
> + """
> + if old_id >= opts['first'] and old_id < opts['last']:
> + return old_id + opts['offset']
> + return -1
> +
> +
> +def get_mapping_opts(mapping):
> + """
> + Get range options from UID/GID mapping
> + """
> + start = mapping[0] if mapping[0] > -1 else 0
> + target = mapping[1] if mapping[1] > -1 else 0
> + count = mapping[2] if mapping[2] > -1 else 1
> +
> + opts = {
> + 'first': start,
> + 'last': start + count,
> + 'offset': target - start
> + }
> + return opts
> +
> +
> +def map_id(path, map_uid, map_gid):
> + """
> + Remapping ownership of all files inside a container's rootfs.
> +
> + map_gid and map_uid: Contain integers in a list with format:
> + [<start>, <target>, <count>]
> + """
> + if map_uid:
> + uid_opts = get_mapping_opts(map_uid)
> + if map_gid:
> + gid_opts = get_mapping_opts(map_gid)
> +
> + for root, _ignore, files in os.walk(os.path.realpath(path)):
> + for name in [root] + files:
> + file_path = os.path.join(root, name)
> +
> + stat_info = os.lstat(file_path)
> + old_uid = stat_info.st_uid
> + old_gid = stat_info.st_gid
> +
> + new_uid = get_map_id(old_uid, uid_opts) if map_uid else -1
> + new_gid = get_map_id(old_gid, gid_opts) if map_gid else -1
> + os.lchown(file_path, new_uid, new_gid)
> +
> +
> +def mapping_uid_gid(dest, uid_map, gid_map):
> + """
> + Mapping ownership for each uid_map and gid_map.
> + """
> + len_diff = len(uid_map) - len(gid_map)
> +
> + if len_diff < 0:
> + uid_map += [None] * abs(len_diff)
> + elif len_diff > 0:
> + gid_map += [None] * len_diff
> +
> + for uid, gid in zip(uid_map, gid_map):
> + map_id(dest, uid, gid)
> diff --git a/src/virtBootstrap/virt_bootstrap.py b/src/virtBootstrap/virt_bootstrap.py
> index ddc5456..0bc2e2b 100755
> --- a/src/virtBootstrap/virt_bootstrap.py
> +++ b/src/virtBootstrap/virt_bootstrap.py
> @@ -69,22 +69,6 @@ def get_source(source_type):
> raise Exception("Invalid image URL scheme: '%s'" % source_type)
>
>
> -# The implementation for remapping ownership of all files inside a
> -# container's rootfs is inspired by the tool uidmapshift:
> -#
> -# Original author: Serge Hallyn <serge.hallyn at ubuntu.com>
> -# Original license: GPLv2
> -# http://bazaar.launchpad.net/%7Eserge-hallyn/+junk/nsexec/view/head:/uidmapshift.c
> -
> -def get_map_id(old_id, opts):
> - """
> - Calculate new map_id.
> - """
> - if old_id >= opts['first'] and old_id < opts['last']:
> - return old_id + opts['offset']
> - return -1
> -
> -
> def parse_idmap(idmap):
> """
> Parse user input to 'start', 'target' and 'count' values.
> @@ -107,62 +91,6 @@ def parse_idmap(idmap):
> raise ValueError("Invalid UID/GID mapping value: %s" % idmap)
>
>
> -def get_mapping_opts(mapping):
> - """
> - Get range options from UID/GID mapping
> - """
> - start = mapping[0] if mapping[0] > -1 else 0
> - target = mapping[1] if mapping[1] > -1 else 0
> - count = mapping[2] if mapping[2] > -1 else 1
> -
> - opts = {
> - 'first': start,
> - 'last': start + count,
> - 'offset': target - start
> - }
> - return opts
> -
> -
> -def map_id(path, map_uid, map_gid):
> - """
> - Remapping ownership of all files inside a container's rootfs.
> -
> - map_gid and map_uid: Contain integers in a list with format:
> - [<start>, <target>, <count>]
> - """
> - if map_uid:
> - uid_opts = get_mapping_opts(map_uid)
> - if map_gid:
> - gid_opts = get_mapping_opts(map_gid)
> -
> - for root, _ignore, files in os.walk(os.path.realpath(path)):
> - for name in [root] + files:
> - file_path = os.path.join(root, name)
> -
> - stat_info = os.lstat(file_path)
> - old_uid = stat_info.st_uid
> - old_gid = stat_info.st_gid
> -
> - new_uid = get_map_id(old_uid, uid_opts) if map_uid else -1
> - new_gid = get_map_id(old_gid, gid_opts) if map_gid else -1
> - os.lchown(file_path, new_uid, new_gid)
> -
> -
> -def mapping_uid_gid(dest, uid_map, gid_map):
> - """
> - Mapping ownership for each uid_map and gid_map.
> - """
> - len_diff = len(uid_map) - len(gid_map)
> -
> - if len_diff < 0:
> - uid_map += [None] * abs(len_diff)
> - elif len_diff > 0:
> - gid_map += [None] * len_diff
> -
> - for uid, gid in zip(uid_map, gid_map):
> - map_id(dest, uid, gid)
> -
> -
> # pylint: disable=too-many-arguments
> def bootstrap(uri, dest,
> fmt=utils.DEFAULT_OUTPUT_FORMAT,
> @@ -206,7 +134,7 @@ def bootstrap(uri, dest,
>
> if fmt == "dir" and uid_map or gid_map:
> logger.info("Mapping UID/GID")
> - mapping_uid_gid(dest, uid_map, gid_map)
> + utils.mapping_uid_gid(dest, uid_map, gid_map)
>
>
> def set_logging_conf(loglevel=None):
> diff --git a/tests/test_utils.py b/tests/test_utils.py
> index e45a2c9..7ce2ba4 100644
> --- a/tests/test_utils.py
> +++ b/tests/test_utils.py
> @@ -603,6 +603,155 @@ class TestUtils(unittest.TestCase):
> default_terminal_width + 1)
> mocked['sys'].stdout.write.assert_called_once()
>
> + ###################################
> + # Tests for: mapping_uid_gid()
> + ###################################
> + def test_mapping_uid_gid(self):
> + """
> + Ensures that mapping_uid_gid() calls map_id() with
> + correct parameters.
> + """
> + dest = '/path'
> + calls = [
> + { # Call 1
> + 'dest': dest,
> + 'uid': [[0, 1000, 10]],
> + 'gid': [[0, 1000, 10]]
> + },
> + { # Call 2
> + 'dest': dest,
> + 'uid': [],
> + 'gid': [[0, 1000, 10]]
> + },
> + { # Call 3
> + 'dest': dest,
> + 'uid': [[0, 1000, 10]],
> + 'gid': []
> + },
> + { # Call 4
> + 'dest': dest,
> + 'uid': [[0, 1000, 10], [500, 500, 10]],
> + 'gid': [[0, 1000, 10]]
> + }
> + ]
> +
> + expected_calls = [
> + # Expected from call 1
> + mock.call(dest, [0, 1000, 10], [0, 1000, 10]),
> + # Expected from call 2
> + mock.call(dest, None, [0, 1000, 10]),
> + # Expected from call 3
> + mock.call(dest, [0, 1000, 10], None),
> + # Expected from call 4
> + mock.call(dest, [0, 1000, 10], [0, 1000, 10]),
> + mock.call(dest, [500, 500, 10], None)
> + ]
> +
> + with mock.patch('virtBootstrap.utils.map_id') as m_map_id:
> + for args in calls:
> + utils.mapping_uid_gid(args['dest'], args['uid'], args['gid'])
> +
> + m_map_id.assert_has_calls(expected_calls)
> +
> + ###################################
> + # Tests for: map_id()
> + ###################################
> + @mock.patch('os.path.realpath')
> + def test_map_id(self, m_realpath):
> + """
> + Ensures that the UID/GID mapping applies to all files
> + and directories in root file system.
> + """
> + root_path = '/root'
> + files = ['foo1', 'foo2']
> + m_realpath.return_value = root_path
> +
> + map_uid = [0, 1000, 10]
> + map_gid = [0, 1000, 10]
> + new_id = 'new_id'
> +
> + expected_calls = [
> + mock.call('/root', new_id, new_id),
> + mock.call('/root/foo1', new_id, new_id),
> + mock.call('/root/foo2', new_id, new_id)
> + ]
> +
> + with mock.patch.multiple('os',
> + lchown=mock.DEFAULT,
> + lstat=mock.DEFAULT,
> + walk=mock.DEFAULT) as mocked:
> +
> + mocked['walk'].return_value = [(root_path, [], files)]
> + mocked['lstat']().st_uid = map_uid[0]
> + mocked['lstat']().st_gid = map_gid[0]
> +
> + get_map_id = 'virtBootstrap.utils.get_map_id'
> + with mock.patch(get_map_id) as m_get_map_id:
> + m_get_map_id.return_value = new_id
> + utils.map_id(root_path, map_uid, map_gid)
> +
> + mocked['lchown'].assert_has_calls(expected_calls)
> +
> + ###################################
> + # Tests for: get_mapping_opts()
> + ###################################
> + def test_get_mapping_opts(self):
> + """
> + Ensures that get_mapping_opts() returns correct options for
> + mapping value.
> + """
> + test_values = [
> + {
> + 'mapping': [0, 1000, 10],
> + 'expected_result': {'first': 0, 'last': 10, 'offset': 1000},
> + },
> + {
> + 'mapping': [0, 1000, 10],
> + 'expected_result': {'first': 0, 'last': 10, 'offset': 1000},
> + },
> + {
> + 'mapping': [500, 1500, 1],
> + 'expected_result': {'first': 500, 'last': 501, 'offset': 1000},
> + },
> + {
> + 'mapping': [-1, -1, -1],
> + 'expected_result': {'first': 0, 'last': 1, 'offset': 0},
> + }
> + ]
> +
> + for test in test_values:
> + res = utils.get_mapping_opts(test['mapping'])
> + self.assertEqual(test['expected_result'], res)
> +
> + ###################################
> + # Tests for: get_map_id()
> + ###################################
> + def test_get_map_id(self):
> + """
> + Ensures that get_map_id() returns correct UID/GID mapping value.
> + """
> + test_values = [
> + {
> + 'old_id': 0,
> + 'mapping': [0, 1000, 10],
> + 'expected_result': 1000
> + },
> + {
> + 'old_id': 5,
> + 'mapping': [0, 500, 10],
> + 'expected_result': 505
> + },
> + {
> + 'old_id': 10,
> + 'mapping': [0, 100, 10],
> + 'expected_result': -1
> + },
> + ]
> + for test in test_values:
> + opts = utils.get_mapping_opts(test['mapping'])
> + res = utils.get_map_id(test['old_id'], opts)
> + self.assertEqual(test['expected_result'], res)
> +
>
> if __name__ == '__main__':
> unittest.main(exit=False)
> diff --git a/tests/test_virt_bootstrap.py b/tests/test_virt_bootstrap.py
> index ff744f7..c0def7e 100644
> --- a/tests/test_virt_bootstrap.py
> +++ b/tests/test_virt_bootstrap.py
> @@ -61,157 +61,6 @@ class TestVirtBootstrap(unittest.TestCase):
> sources.FileSource)
>
> ###################################
> - # Tests for: mapping_uid_gid()
> - ###################################
> - def test_mapping_uid_gid(self):
> - """
> - Ensures that mapping_uid_gid() calls map_id() with
> - correct parameters.
> - """
> - dest = '/path'
> - calls = [
> - { # Call 1
> - 'dest': dest,
> - 'uid': [[0, 1000, 10]],
> - 'gid': [[0, 1000, 10]]
> - },
> - { # Call 2
> - 'dest': dest,
> - 'uid': [],
> - 'gid': [[0, 1000, 10]]
> - },
> - { # Call 3
> - 'dest': dest,
> - 'uid': [[0, 1000, 10]],
> - 'gid': []
> - },
> - { # Call 4
> - 'dest': dest,
> - 'uid': [[0, 1000, 10], [500, 500, 10]],
> - 'gid': [[0, 1000, 10]]
> - }
> - ]
> -
> - expected_calls = [
> - # Expected from call 1
> - mock.call(dest, [0, 1000, 10], [0, 1000, 10]),
> - # Expected from call 2
> - mock.call(dest, None, [0, 1000, 10]),
> - # Expected from call 3
> - mock.call(dest, [0, 1000, 10], None),
> - # Expected from call 4
> - mock.call(dest, [0, 1000, 10], [0, 1000, 10]),
> - mock.call(dest, [500, 500, 10], None)
> -
> - ]
> - with mock.patch('virtBootstrap.virt_bootstrap.map_id') as m_map_id:
> - for args in calls:
> - virt_bootstrap.mapping_uid_gid(args['dest'],
> - args['uid'],
> - args['gid'])
> -
> - m_map_id.assert_has_calls(expected_calls)
> -
> - ###################################
> - # Tests for: map_id()
> - ###################################
> - @mock.patch('os.path.realpath')
> - def test_map_id(self, m_realpath):
> - """
> - Ensures that the UID/GID mapping applies to all files
> - and directories in root file system.
> - """
> - root_path = '/root'
> - files = ['foo1', 'foo2']
> - m_realpath.return_value = root_path
> -
> - map_uid = [0, 1000, 10]
> - map_gid = [0, 1000, 10]
> - new_id = 'new_id'
> -
> - expected_calls = [
> - mock.call('/root', new_id, new_id),
> - mock.call('/root/foo1', new_id, new_id),
> - mock.call('/root/foo2', new_id, new_id)
> - ]
> -
> - with mock.patch.multiple('os',
> - lchown=mock.DEFAULT,
> - lstat=mock.DEFAULT,
> - walk=mock.DEFAULT) as mocked:
> -
> - mocked['walk'].return_value = [(root_path, [], files)]
> - mocked['lstat']().st_uid = map_uid[0]
> - mocked['lstat']().st_gid = map_gid[0]
> -
> - get_map_id = 'virtBootstrap.virt_bootstrap.get_map_id'
> - with mock.patch(get_map_id) as m_get_map_id:
> - m_get_map_id.return_value = new_id
> - virt_bootstrap.map_id(root_path, map_uid, map_gid)
> -
> - mocked['lchown'].assert_has_calls(expected_calls)
> -
> - ###################################
> - # Tests for: get_mapping_opts()
> - ###################################
> - def test_get_mapping_opts(self):
> - """
> - Ensures that get_mapping_opts() returns correct options for
> - mapping value.
> - """
> - test_values = [
> - {
> - 'mapping': [0, 1000, 10],
> - 'expected_result': {'first': 0, 'last': 10, 'offset': 1000},
> - },
> - {
> - 'mapping': [0, 1000, 10],
> - 'expected_result': {'first': 0, 'last': 10, 'offset': 1000},
> - },
> - {
> - 'mapping': [500, 1500, 1],
> - 'expected_result': {'first': 500, 'last': 501, 'offset': 1000},
> - },
> - {
> - 'mapping': [-1, -1, -1],
> - 'expected_result': {'first': 0, 'last': 1, 'offset': 0},
> - }
> - ]
> -
> - for test in test_values:
> - res = virt_bootstrap.get_mapping_opts(test['mapping'])
> - self.assertEqual(test['expected_result'], res)
> -
> - ###################################
> - # Tests for: get_map_id()
> - ###################################
> - def test_get_map_id(self):
> - """
> - Ensures that get_map_id() returns correct UID/GID mapping value.
> - """
> - test_values = [
> - {
> - 'old_id': 0,
> - 'mapping': [0, 1000, 10],
> - 'expected_result': 1000
> - },
> - {
> - 'old_id': 5,
> - 'mapping': [0, 500, 10],
> - 'expected_result': 505
> - },
> - {
> - 'old_id': 10,
> - 'mapping': [0, 100, 10],
> - 'expected_result': -1
> - },
> - ]
> - for test in test_values:
> - opts = virt_bootstrap.get_mapping_opts(test['mapping'])
> - res = virt_bootstrap.get_map_id(test['old_id'], opts)
> - self.assertEqual(test['expected_result'], res)
> -
> - ###################################
> # Tests for: parse_idmap()
> ###################################
> def test_parse_idmap(self):
> @@ -415,9 +264,9 @@ class TestVirtBootstrap(unittest.TestCase):
> def test_if_bootstrap_calls_set_mapping_uid_gid(self):
> """
> Ensures that bootstrap() calls mapping_uid_gid() when the argument
> - uid_map or gid_map is specified.
> + uid_map or gid_map is specified and format='dir'.
> """
> - src, dest, uid_map, gid_map = 'foo', 'bar', 'id', 'id'
> + src, dest, fmt, uid_map, gid_map = 'foo', 'bar', 'dir', 'id', 'id'
> expected_calls = [
> mock.call('bar', None, 'id'),
> mock.call('bar', 'id', None),
> @@ -427,18 +276,17 @@ class TestVirtBootstrap(unittest.TestCase):
> with mock.patch.multiple(virt_bootstrap,
> get_source=mock.DEFAULT,
> os=mock.DEFAULT,
> - mapping_uid_gid=mock.DEFAULT,
> - utils=mock.DEFAULT,
> - sys=mock.DEFAULT) as mocked:
> + utils=mock.DEFAULT) as mocked:
> mocked['os'].path.exists.return_value = True
> mocked['os'].path.isdir.return_value = True
> mocked['os'].access.return_value = True
>
> - virt_bootstrap.bootstrap(src, dest, gid_map=gid_map)
> - virt_bootstrap.bootstrap(src, dest, uid_map=uid_map)
> - virt_bootstrap.bootstrap(src, dest,
> + virt_bootstrap.bootstrap(src, dest, fmt=fmt, gid_map=gid_map)
> + virt_bootstrap.bootstrap(src, dest, fmt=fmt, uid_map=uid_map)
> + virt_bootstrap.bootstrap(src, dest, fmt=fmt,
> uid_map=uid_map, gid_map=gid_map)
> - mocked['mapping_uid_gid'].assert_has_calls(expected_calls)
> +
> + mocked['utils'].mapping_uid_gid.assert_has_calls(expected_calls)
>
> ###################################
> # Tests for: set_logging_conf()
ACK
--
Cedric
More information about the virt-tools-list
mailing list