[virt-tools-list] [virt-bootstrap] [PATCH v7 03/26] Add regression tests
Cedric Bosdonnat
cbosdonnat at suse.com
Mon Aug 28 14:34:49 UTC 2017
On Sat, 2017-08-26 at 21:41 +0100, Radostin Stoyanov wrote:
> These tests aim to verify the output of virt-bootstrap by creating tar
> archives which contain dummy root file system and call the function
> bootstrap(). The check the extracted root file system.
> ---
> tests/__init__.py | 439 +++++++++++++++++++++++++++++++++++++++++++++++++
> tests/docker_source.py | 243 ++++++++++++++++++++++++++-
> tests/file_source.py | 78 +++++++++
> 3 files changed, 757 insertions(+), 3 deletions(-)
> create mode 100644 tests/file_source.py
>
> diff --git a/tests/__init__.py b/tests/__init__.py
> index 1b06616..9bbae53 100644
> --- a/tests/__init__.py
> +++ b/tests/__init__.py
> @@ -20,8 +20,18 @@
> Test suite for virt-bootstrap
> """
>
> +import copy
> +import hashlib
> +import io
> +import os
> +import shutil
> import sys
> +import tarfile
> +import tempfile
> import unittest
> +import passlib.hosts
> +
> +import guestfs
>
> try:
> import mock
> @@ -37,3 +47,432 @@ from virtBootstrap import progress
> from virtBootstrap import utils
>
> __all__ = ['virt_bootstrap', 'sources', 'progress', 'utils']
> +
> +
> +DEFAULT_FILE_MODE = 0o755
> +SHADOW_FILE_MODE = 0o600
> +NOT_ROOT = (os.geteuid() != 0)
> +
> +ROOTFS_TREE = {
> + 'root': {
> + 'uid': 0,
> + 'gid': 0,
> + 'dirs': [
> + 'bin', 'etc', 'home', 'lib', 'opt', 'root', 'run', 'sbin',
> + 'srv', 'tmp', 'usr', 'var'
> + ],
> + 'files': [
> + 'etc/hosts',
> + 'etc/fstab',
> + (
> + 'etc/shadow',
> + SHADOW_FILE_MODE,
> + 'root:*::0:99999:7:::'
> + )
> + ]
> + },
> + 'user1': {
> + 'uid': 500,
> + 'gid': 500,
> + 'dirs': ['home/user1'],
> + 'files': [
> + ('home/user1/test_file', 0o644, 'test data')
> + ]
> + },
> +
> + 'user2': {
> + 'uid': 1000,
> + 'gid': 1000,
> + 'dirs': [
> + 'home/user2',
> + 'home/user2/test_dir'
> + ],
> + 'files': [
> + 'home/user2/test_dir/test_file'
> + ]
> + }
> +}
> +
> +
> +# pylint: disable=invalid-name,too-many-arguments
> +class BuildTarFiles(object):
> + """
> + Create dummy tar files used for testing.
> + """
> +
> + def __init__(self, tar_dir, rootfs_tree=None):
> + """
> + Create dummy tar files
> + """
> + self.tar_dir = tar_dir
> + self.rootfs_tree = rootfs_tree or copy.deepcopy(ROOTFS_TREE)
> +
> + def create_tar_file(self):
> + """
> + Use temporary name to create uncompressed tarball with dummy file
> + system. Get checksum of the content and rename the file to
> + "<checksum>.tar". In this way, we can easily generate manifest for
> + Docker image and provide it to virt-bootstrap.
> + """
> + filepath = tempfile.mkstemp(dir=self.tar_dir)[1]
> + with tarfile.open(filepath, 'w') as tar:
> + self.create_user_dirs(tar)
> + # Get sha256 checksum of the archive
> + with open(filepath, 'rb') as file_handle:
> + file_hash = hashlib.sha256(file_handle.read()).hexdigest()
> + # Rename the archive to <checksum>.tar
> + new_filepath = os.path.join(self.tar_dir, "%s.tar" % file_hash)
> + os.rename(filepath, new_filepath)
> + os.chmod(new_filepath, 0o644)
> + return new_filepath
> +
> + def create_user_dirs(self, tar_handle):
> + """
> + Create root file system tree in tar archive.
> + """
> + tar_members = [
> + ['dirs', tarfile.DIRTYPE],
> + ['files', tarfile.REGTYPE],
> + ]
> +
> + for user in self.rootfs_tree:
> + for members, tar_type in tar_members:
> + self.create_tar_members(
> + tar_handle,
> + self.rootfs_tree[user][members],
> + tar_type,
> + uid=self.rootfs_tree[user]['uid'],
> + gid=self.rootfs_tree[user]['gid']
> + )
> +
> + def create_tar_members(self, tar_handle, members, m_type, uid=0, gid=0):
> + """
> + Add members to tar file.
> + """
> + for member_name in members:
> + member_data = ''
> + permissions = DEFAULT_FILE_MODE
> + if isinstance(member_name, tuple):
> + if len(member_name) == 3:
> + member_data = member_name[2]
> + member_name, permissions = member_name[:2]
> + data_encoded = member_data.encode('utf-8')
> +
> + t_info = tarfile.TarInfo(member_name)
> + t_info.type = m_type
> + t_info.mode = permissions
> + t_info.uid = uid
> + t_info.gid = gid
> + t_info.size = len(data_encoded)
> +
> + tar_handle.addfile(t_info, io.BytesIO(data_encoded))
> +
> +
> +class ImageAccessor(unittest.TestCase):
> + """
> + The perpose of this class is to gather methods used to verify content
s/perpose/purpose/
> + of extracted root file system. This class can be exteded for different
> + test cases.
> + """
> + def setUp(self):
> + """
> + Set initial values, create temporary directories and tar archive which
> + contains dummy root file system.
> + """
> + self.dest_dir = tempfile.mkdtemp('_bootstrap_dest')
> + self.tar_dir = tempfile.mkdtemp('_bootstrap_tarfiles')
> + # Set permissions of temporary directories to avoid "Permission denied"
> + # error from Libvirt.
> + os.chmod(self.dest_dir, 0o755)
> + os.chmod(self.tar_dir, 0o755)
> + self.uid_map = []
> + self.gid_map = []
> + self.root_password = None
> + self.checked_members = set()
> + self.rootfs_tree = copy.deepcopy(ROOTFS_TREE)
> + # Create tar archive
> + self.tar_file = BuildTarFiles(self.tar_dir).create_tar_file()
> +
> + def tearDown(self):
> + """
> + Clean up.
> + """
> + shutil.rmtree(self.dest_dir)
> + shutil.rmtree(self.tar_dir)
> +
> + def apply_mapping(self):
> + """
> + This method applies UID/GID mapping to all users defined in
> + self.rootfs_tree.
> + """
> +
> + for user in self.rootfs_tree:
> + user_uid = self.rootfs_tree[user]['uid']
> + user_gid = self.rootfs_tree[user]['gid']
> +
> + if self.uid_map:
> + for start, target, count in self.uid_map:
> + if user_uid >= start and user_uid <= start + count:
> + diff = user_uid - start
> + self.rootfs_tree[user]['uid'] = target + diff
> +
> + if self.gid_map:
> + for start, target, count in self.gid_map:
> + if user_gid >= start and user_gid <= start + count:
> + diff = user_gid - start
> + self.rootfs_tree[user]['gid'] = target + diff
> +
> + def check_rootfs(self, skip_ownership=False):
> + """
> + Check if the root file system was extracted correctly.
> + """
> + existence_check_func = {
> + 'files': os.path.isfile,
> + 'dirs': os.path.isdir
> + }
> + for user in self.rootfs_tree:
> + for m_type in existence_check_func:
> + self.check_rootfs_members(
> + user,
> + m_type,
> + existence_check_func[m_type],
> + skip_ownership
> + )
> +
> + def check_rootfs_members(self, user, members, check_existence,
> + skip_ownership=False):
> + """
> + Verify permissions, ownership and content of files or
> + directories in extracted root file system.
> +
> + @param user: user name defined in self.rootfs_tree.
> + @param members: The string 'dirs' or 'files'.
> + @param check_existence: Function used to check the existence of member.
> + @param skip_ownership: Boolean whether to skip ownership check.
> + """
> + user_uid = self.rootfs_tree[user]['uid']
> + user_gid = self.rootfs_tree[user]['gid']
> +
> + for member_name in self.rootfs_tree[user][members]:
> + member_data = ''
> + # Unpack member if tuple. Allow to be specified permissions and
> + # data for each file.
> + permissions = DEFAULT_FILE_MODE
> + if isinstance(member_name, tuple):
> + if len(member_name) == 3:
> + member_data = member_name[2]
> + member_name, permissions = member_name[:2]
> +
> + # Skip already checked members. E.g. when multiple layers were
> + # extracted we want to check only the latest version of file.
> + if member_name in self.checked_members:
> + continue
> + else:
> + self.checked_members.add(member_name)
> +
> + #########################
> + # Assertion functions
> + #########################
> + member_path = os.path.join(self.dest_dir, member_name)
> + self.assertTrue(
> + check_existence(member_path),
> + 'Member was not extracted: %s' % member_path
> + )
> + stat = os.stat(member_path)
> +
> + self.validate_file_mode(member_path, stat.st_mode, permissions)
> +
> + if not skip_ownership:
> + self.validate_file_ownership(
> + member_path, stat.st_uid, stat.st_gid, user_uid, user_gid
> + )
> +
> + if member_data:
> + with open(member_path, 'r') as content:
> + file_content = content.read()
> +
> + self.assertEqual(
> + member_data, file_content,
> + 'Incorrect file content: %s\n'
> + 'Found: %s\n'
> + 'Expected: %s' % (member_path, file_content, member_data)
> + )
> +
> + def validate_shadow_file(self):
> + """
> + Ensure that the extracted /etc/shadow file has correct ownership,
> + permissions and contains valid hash of the root password.
> + """
> + shadow_path = os.path.join(self.dest_dir, 'etc/shadow')
> +
> + self.assertTrue(
> + os.path.isfile(shadow_path),
> + 'Does not exist: %s' % shadow_path
> + )
> + stat = os.stat(shadow_path)
> +
> + self.validate_file_mode(shadow_path, stat.st_mode, SHADOW_FILE_MODE)
> +
> + if not NOT_ROOT:
> + self.validate_file_ownership(
> + shadow_path,
> + stat.st_uid, stat.st_gid,
> + self.rootfs_tree['root']['uid'],
> + self.rootfs_tree['root']['gid']
> + )
> +
> + with open(shadow_path, 'r') as content:
> + shadow_content = content.readlines()
> + if not shadow_content:
> + raise Exception("File is empty: %s" % shadow_path)
> + self.validate_shadow_hash(shadow_content)
> +
> + def validate_shadow_hash(self, shadow_content):
> + """
> + Validate root password hash of shadow file.
> +
> + Note: For simplicity we assume that the first line of /etc/shadow
> + contains the root entry.
> + """
> + self.assertTrue(
> + passlib.hosts.linux_context.verify(
> + self.root_password,
> + shadow_content[0].split(':')[1]
> + ),
> + "Invalid root password hash."
> + )
> +
> + def validate_file_mode(self, member_name, mode, expected):
> + """
> + Verify permissions of rootfs member.
> + """
> + self.assertEqual(
> + mode & 0o777, expected, 'Incorrect permissions: %s' % member_name
> + )
> +
> + def validate_file_ownership(self, member_name, uid, gid,
> + expected_uid, expected_gid):
> + """
> + Validate UID/GID of rootfs member.
> + """
> + self.assertEqual(
> + uid, expected_uid,
> + "Incorrect UID: %s\n"
> + "Found: %s\n"
> + "Expected: %s" % (member_name, uid, expected_uid)
> + )
> + self.assertEqual(
> + gid, expected_gid,
> + "Incorrect GID: %s\n"
> + "Found: %s\n"
> + "Expected: %s" % (member_name, gid, expected_gid,)
> + )
> +
> +
> +class Qcow2ImageAccessor(ImageAccessor):
> + """
> + This class gathers methods for verification of root file system content
> + within extracted qcow2 image.
> + """
> +
> + def validate_shadow_file_in_image(self, g):
> + """
> + Ensures that /etc/shadow file of disk image has correct permission,
> + ownership and contains valid hash of the root password.
> + """
> + self.assertTrue(
> + g.is_file('/etc/shadow'),
> + "Shadow file does not exist"
> + )
> +
> + stat = g.stat('/etc/shadow')
> +
> + self.validate_file_mode('/etc/shadow', stat['mode'], SHADOW_FILE_MODE)
> +
> + self.validate_file_ownership(
> + '/etc/shadow',
> + stat['uid'], stat['gid'],
> + self.rootfs_tree['root']['uid'],
> + self.rootfs_tree['root']['gid']
> + )
> +
> + self.validate_shadow_hash(g.cat('/etc/shadow').split('\n'))
> +
> + def check_image_content(self, g, user, members, check_existence):
> + """
> + Verify the existence, permissions and ownership of members in qcow2
> + image.
> +
> + @param g: guestfs handle
> + @param user: Name of user defined in self.rootfs_tree
> + @param members: The string 'dirs' or 'files'.
> + @param check_existence: Function to confirm existence of member.
> + """
> + permissions = DEFAULT_FILE_MODE
> + user_uid = self.rootfs_tree[user]['uid']
> + user_gid = self.rootfs_tree[user]['gid']
> +
> + for member_name in self.rootfs_tree[user][members]:
> + # Get specified permissions of file.
> + if isinstance(member_name, tuple):
> + member_name, permissions = member_name[:2]
> +
> + # Skip already checked files.
> + if member_name in self.checked_members:
> + continue
> + else:
> + self.checked_members.add(member_name)
> +
> + # When using guestfs all names should start with '/'
> + if not member_name.startswith('/'):
> + member_name = '/' + member_name
> +
> + self.assertTrue(
> + check_existence(member_name),
> + "Member was not found: %s" % member_name
> + )
> + stat = g.stat(member_name)
> +
> + self.validate_file_mode(member_name, stat['mode'], permissions)
> +
> + self.validate_file_ownership(
> + member_name, stat['uid'], stat['gid'], user_uid, user_gid
> + )
> +
> + def check_image(self, g):
> + """
> + Check the presence of files and folders in qcow2 image.
> + """
> + for user in self.rootfs_tree:
> + self.check_image_content(g, user, 'dirs', g.is_dir)
> + self.check_image_content(g, user, 'files', g.is_file)
> +
> + def check_qcow2_image(self, image_path):
> + """
> + Ensures that qcow2 images contain all files.
> + """
> + g = guestfs.GuestFS(python_return_dict=True)
> + g.add_drive_opts(image_path, readonly=True)
> + g.launch()
> + g.mount('/dev/sda', '/')
> + self.check_image(g)
> + g.umount('/')
> + g.shutdown()
> +
> + def get_image_path(self, n=0):
> + """
> + Returns the path of stored qcow2 image.
> + """
> + return os.path.join(self.dest_dir, "layer-%d.qcow2" % n)
> +
> + def check_qcow2_images(self, image_path):
> + """
> + Ensures that qcow2 images contain all files.
> + """
> + g = guestfs.GuestFS(python_return_dict=True)
> + g.add_drive_opts(image_path, readonly=True)
> + g.launch()
> + g.mount('/dev/sda', '/')
> + self.check_image(g)
> + g.umount('/')
> + g.shutdown()
Just thinking about it now... we could simply call guestmount -a layer-XXX.qcow2 -m /dev/sda,/ /tmp/mountpoint
and then run exactly the same tests than for the normal directory case.
> diff --git a/tests/docker_source.py b/tests/docker_source.py
> index 60404e6..5f9e6fe 100644
> --- a/tests/docker_source.py
> +++ b/tests/docker_source.py
> @@ -18,15 +18,252 @@
>
> """
> Tests which aim is to exercise creation of root file system with DockerSource.
> +
> +To avoid fetching network resources we mock out the functions:
> +- utils.get_image_details(): Returns manifest content
> +- utils.get_image_dir(): Returns the directory which contains the tar files
> +
> +Brief description of this tests:
> +1. Create dummy tar files named <checksum>.tar used as image layers.
> +2. Generate manifest content.
> +3. Mock out get_image_details() and get_image_dir().
> +4. Call bootstrap().
> +5. Check the result.
> """
>
> +import copy
> +import os
> +import subprocess
> import unittest
> +import guestfs
>
> from . import mock
> from . import sources
> +from . import virt_bootstrap
> +from . import BuildTarFiles
> +from . import ImageAccessor
> +from . import Qcow2ImageAccessor
> +from . import NOT_ROOT
>
>
> # pylint: disable=invalid-name
> +class CreateLayers(object):
> + """
> + Create tar files to mimic image layers and generate manifest content.
> + """
> + def __init__(self, initial_tar_file, initial_rootfs_tree, dest_dir):
> + """
> + Create dummy tar files used as image layers.
> +
> + The variables:
> + - layers: Store a lists of paths to created archives.
> + - layers_rootfs: Store self.rootfs_tree value used to generate tarball.
> + """
> + self.layers = [initial_tar_file]
> + self.layers_rootfs = [copy.deepcopy(initial_rootfs_tree)]
> +
> + tar_builder = BuildTarFiles(dest_dir)
> + tar_builder.rootfs_tree['root']['dirs'] = []
> + tar_builder.rootfs_tree['root']['files'] = [
> + ('etc/foo/bar', 0o644, "This should be overwritten")
> + ]
> +
> + self.layers.append(tar_builder.create_tar_file())
> + self.layers_rootfs.append(copy.deepcopy(tar_builder.rootfs_tree))
> +
> + tar_builder.rootfs_tree['root']['files'] = [
> + ('etc/foo/bar', 0o644, "Content of etc/foo/bar"),
> + ('bin/foobar', 0o755, "My executable script")
> + ]
> + self.layers.append(tar_builder.create_tar_file())
> + self.layers_rootfs.append(copy.deepcopy(tar_builder.rootfs_tree))
> +
> + def get_layers_rootfs(self):
> + """
> + Return root file systems used to create layers.
> + """
> + return self.layers_rootfs
> +
> + def generate_manifest(self):
> + """
> + Generate Manifest content for layers.
> + """
> + return {
> + "schemaVersion": 2,
> + "layers": [
> + {
> + "digest":
> + "sha256:" + os.path.basename(layer).split('.')[0]
> + }
> + for layer in self.layers
> + ]
> + }
> +
> +
> +class TestDirDockerSource(ImageAccessor):
> + """
> + Ensures that all layers extracted correctly in destination folder.
> + """
> + def check_result(self, layers_rootfs, dest):
> + """
> + Iterates trough values of layers_rootfs in reverse order (from the last
> + layer to first) and calls check_extracted_files().
> + """
> +
> + def call_bootstrap(self, manifest):
> + """
> + Mock get_image_details() and get_image_dir() and call the function
> + virt_bootstrap.bootstrap() with root_password value.
> + """
> + with mock.patch.multiple('virtBootstrap.utils',
> + get_image_details=mock.DEFAULT,
> + get_image_dir=mock.DEFAULT) as mocked:
> +
> + mocked['get_image_details'].return_value = manifest
> + mocked['get_image_dir'].return_value = self.tar_dir
> +
> + virt_bootstrap.bootstrap(
> + progress_cb=mock.Mock(),
> + uri='docker://foo',
> + fmt='dir',
> + uid_map=self.uid_map,
> + gid_map=self.gid_map,
> + dest=self.dest_dir,
> + root_password=self.root_password
> + )
> +
> + def test_dir_extract_rootfs(self):
> + """
> + Ensures that all layers were extracted correctly.
> + """
> + layers = CreateLayers(self.tar_file, self.rootfs_tree, self.tar_dir)
> + self.call_bootstrap(layers.generate_manifest())
> + layers_rootfs = layers.get_layers_rootfs()
> + for rootfs_tree in layers_rootfs[::-1]:
> + self.rootfs_tree = rootfs_tree
> + self.check_rootfs(skip_ownership=(os.geteuid != 0))
> +
> + @unittest.skipIf(NOT_ROOT, "Root privileges required")
> + def test_dir_ownership_mapping(self):
> + """
> + Ensures that the UID/GID mapping was applied correctly to extracted
> + root file system of all layers.
> + """
> + self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
> + self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
> + layers = CreateLayers(self.tar_file, self.rootfs_tree, self.tar_dir)
> + self.call_bootstrap(layers.generate_manifest())
> + layers_rootfs = layers.get_layers_rootfs()
> + for rootfs_tree in layers_rootfs[::-1]:
> + self.rootfs_tree = rootfs_tree
> + self.apply_mapping()
> + self.check_rootfs()
> +
> + def test_dir_setting_root_password(self):
> + """
> + Ensures that the root password is set correctly.
> + """
> + layers = CreateLayers(self.tar_file, self.rootfs_tree, self.tar_dir)
> + self.root_password = "My secret root password"
> + self.call_bootstrap(layers.generate_manifest())
> + self.validate_shadow_file()
> +
> +
> +class TestQcow2DockerSource(Qcow2ImageAccessor):
> + """
> + Ensures that the conversion of tar files to qcow2 image with backing chains
> + works as expected.
> + """
> + def get_image_info(self, image_path):
> + """
> + Wrapper around "qemu-img info" used to information about disk image.
> + """
> + cmd = ['qemu-img', 'info', image_path]
> + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
> + output, _ignore = proc.communicate()
> + return output.decode('utf-8').split('\n')
> +
> + def call_bootstrap(self):
> + """
> + Generate tar files which mimic container layers and manifest content.
> + Mock get_image_details() and get_image_dir() and call the function
> + virt_bootstrap.bootstrap() for qcow2 format.
> + Return the root file systems used to generate the tar archives.
> + """
> + layers = CreateLayers(self.tar_file, self.rootfs_tree, self.tar_dir)
> + manifest = layers.generate_manifest()
> +
> + with mock.patch.multiple('virtBootstrap.utils',
> + get_image_details=mock.DEFAULT,
> + get_image_dir=mock.DEFAULT) as mocked:
> +
> + mocked['get_image_details'].return_value = manifest
> + mocked['get_image_dir'].return_value = self.tar_dir
> +
> + virt_bootstrap.bootstrap(
> + progress_cb=mock.Mock(),
> + uri='docker://foobar',
> + dest=self.dest_dir,
> + fmt='qcow2',
> + uid_map=self.uid_map,
> + gid_map=self.gid_map,
> + root_password=self.root_password
> + )
> +
> + return layers.get_layers_rootfs()
> +
> + def test_qcow2_build_image(self):
> + """
> + Ensures that the root file system is copied correctly to single
> + partition qcow2 image and layers are converted correctly to qcow2
> + images.
> + """
> + layers_rootfs = self.call_bootstrap()
> +
> + ###################
> + # Check base layer
> + ###################
> + base_layer_path = self.get_image_path()
> + img_format = self.get_image_info(base_layer_path)[1]
> + self.assertEqual(img_format, 'file format: qcow2')
> + images = [base_layer_path]
> + ###########################
> + # Check backing chains
> + ###########################
> + for i in range(1, len(layers_rootfs)):
> + img_path = self.get_image_path(i)
> + # img_info contains the output of "qemu-img info"
> + img_info = self.get_image_info(img_path)
> + self.assertEqual(
> + img_info[1],
> + 'file format: qcow2',
> + 'Invalid qcow2 disk image: %s' % img_path
> + )
> + backing_file = self.get_image_path(i - 1)
> + self.assertEqual(
> + img_info[5],
> + 'backing file: %s' % backing_file,
> + "Incorrect backing file for: %s\n"
> + "Expected: %s\n"
> + "Found: %s" % (img_info, backing_file, img_info[5])
> + )
> + images.append(img_path)
> + ###############################
> + # Check extracted files/folders
> + ###############################
> + g = guestfs.GuestFS(python_return_dict=True)
> + for path in images:
> + g.add_drive_opts(path, readonly=True)
> + g.launch()
> + devices = g.list_filesystems()
> + for dev, rootfs in zip(sorted(devices), layers_rootfs):
> + self.rootfs_tree = rootfs
> + g.mount(dev, '/')
> + self.check_image(g)
> + g.umount('/')
> + g.shutdown()
> +
> +
> class TestDockerSource(unittest.TestCase):
> """
> Unit tests for DockerSource
> @@ -36,9 +273,9 @@ class TestDockerSource(unittest.TestCase):
> ###################################
> def _mock_retrieve_layers_info(self, manifest, kwargs):
> """
> - This method is gather common test pattern used in the following
> - two test cases which aim to return an instance of the class
> - DockerSource with some util functions being mocked.
> + This method is gather common test pattern used in the following cases
> + which aim is to return an instance of the class DockerSource with
> + get_image_details() and get_image_dir() being mocked.
> """
> with mock.patch.multiple('virtBootstrap.utils',
> get_image_details=mock.DEFAULT,
> diff --git a/tests/file_source.py b/tests/file_source.py
> new file mode 100644
> index 0000000..79bb234
> --- /dev/null
> +++ b/tests/file_source.py
> @@ -0,0 +1,78 @@
> +# -*- coding: utf-8 -*-
> +# Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
> +#
> +# Copyright (C) 2017 Radostin Stoyanov
> +#
> +# This program is free software: you can redistribute it and/or modify
> +# it under the terms of the GNU General Public License as published by
> +# the Free Software Foundation, either version 3 of the License, or
> +# (at your option) any later version.
> +
> +# This program is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> +# GNU General Public License for more details.
> +
> +# You should have received a copy of the GNU General Public License
> +# along with this program. If not, see <http://www.gnu.org/licenses/>.
> +
> +
> +"""
> +Regression tests which aim to exercise the creation of root file system
> +with FileSource.
> +"""
> +
> +import unittest
> +
> +from . import mock
> +from . import virt_bootstrap
> +from . import ImageAccessor
> +from . import NOT_ROOT
> +
> +
> +# pylint: disable=invalid-name
> +class TestDirFileSource(ImageAccessor):
> + """
> + Ensures that files from rootfs tarball are extracted correctly.
> + """
> + def call_bootstrap(self):
> + """
> + Execute the bootstrap() method of virt_bootstrap.
> + """
> + virt_bootstrap.bootstrap(
> + uri=self.tar_file,
> + dest=self.dest_dir,
> + fmt='dir',
> + progress_cb=mock.Mock(),
> + uid_map=self.uid_map,
> + gid_map=self.gid_map,
> + root_password=self.root_password
> + )
> +
> + def test_dir_extract_rootfs(self):
> + """
> + Extract rootfs from each dummy tarfile.
> + """
> + self.call_bootstrap()
> + self.check_rootfs(skip_ownership=NOT_ROOT)
> +
> + @unittest.skipIf(NOT_ROOT, "Root privileges required")
> + def test_dir_ownership_mapping(self):
> + """
> + Ensures that UID/GID mapping for extracted root file system are applied
> + correctly.
> + """
> + self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
> + self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
> + self.call_bootstrap()
> + self.apply_mapping()
> + self.check_rootfs()
> +
> + def test_dir_setting_root_password(self):
> + """
> + Ensures that the root password is set correctly when FileSource is used
> + with fmt='dir'.
> + """
> + self.root_password = 'my secret root password'
> + self.call_bootstrap()
> + self.validate_shadow_file()
The whole patch looks way better than the previous version of it. Would be good to simplify
some more the qcow2 accessor as mentioned in the above comment.
--
Cedric
More information about the virt-tools-list
mailing list