[virt-tools-list] [virt-bootstrap] [PATCH v6 03/26] Add regression tests
Radostin Stoyanov
rstoyanov1 at gmail.com
Thu Aug 17 09:39:41 UTC 2017
These tests aim to verify the output of virt-bootstrap in more
abstract manner by creating tar files calling bootstrap() with
them to check the result when output format is set to "dir" or
"qcow2".
---
tests/__init__.py | 436 ++++++++++++++++++++++++++++++++++++++++
tests/docker_source.py | 535 +++++++++++++++++++++++++++++++++++++++++++++++++
tests/file_source.py | 184 +++++++++++++++++
tests/test_utils.py | 143 +++++++++++++
4 files changed, 1298 insertions(+)
create mode 100644 tests/__init__.py
create mode 100644 tests/docker_source.py
create mode 100644 tests/file_source.py
create mode 100644 tests/test_utils.py
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..7a53c38
--- /dev/null
+++ b/tests/__init__.py
@@ -0,0 +1,436 @@
+# -*- coding: utf-8 -*-
+# Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
+#
+# Copyright (C) 2017 Radostin Stoyanov
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Regression tests for virt-bootstrap
+"""
+
+import hashlib
+import io
+import os
+import shutil
+import sys
+import tarfile
+import unittest
+import passlib.hosts
+
+try:
+ import mock
+except ImportError:
+ import unittest.mock as mock
+
+sys.path.insert(0, '../src') # noqa: E402
+
+# pylint: disable=import-error, wrong-import-position
+from virtBootstrap import virt_bootstrap
+from virtBootstrap import sources
+from virtBootstrap import progress
+from virtBootstrap import utils
+
+__all__ = ['virt_bootstrap', 'sources', 'progress', 'utils']
+
+
+# pylint: disable=invalid-name,too-many-instance-attributes,too-many-arguments
+class BuildTarFiles(unittest.TestCase):
+ """
+ Create dummy tar files used for testing.
+ """
+
+ def setUp(self):
+ """
+ Create dummy rootfs tar files
+ """
+ self.tar_dir = os.path.realpath('tests/tarfiles')
+ self.dest_dir = os.path.realpath('tests/filesystems')
+ self.default_permissions = 0o755
+ self.shadow_file_permissions = 0o600
+ self.shadow_file_content = "root:*:"
+ self.tmp_folders = [self.tar_dir, self.dest_dir]
+ self.tar_file = None
+ self.uid_map = []
+ self.gid_map = []
+ self.root_password = ''
+ self.checked_members = set()
+ self.rootfs_tree = {
+ 'root': {
+ 'uid': 0,
+ 'gid': 0,
+ 'dirs': [
+ 'bin',
+ 'etc',
+ 'home',
+ 'lib',
+ 'opt',
+ 'root',
+ 'run',
+ 'sbin',
+ 'srv',
+ 'tmp',
+ 'usr',
+ 'var',
+ ],
+ 'files': [
+ 'etc/hosts',
+ (
+ 'etc/shadow',
+ self.shadow_file_permissions,
+ self.shadow_file_content
+ )
+ ]
+ },
+ 'user1': {
+ 'uid': 500,
+ 'gid': 500,
+ 'dirs': ['home/user1'],
+ 'files': [
+ ('home/user1/test_file', 0o644, 'test data')
+ ]
+ },
+
+ 'user2': {
+ 'uid': 1000,
+ 'gid': 1000,
+ 'dirs': [
+ 'home/user2',
+ 'home/user2/test_dir'
+ ],
+ 'files': [
+ 'home/user2/test_dir/test_file'
+ ]
+ }
+ }
+
+ # Create test folders
+ for test_dir in self.tmp_folders:
+ if not os.path.exists(test_dir):
+ os.makedirs(test_dir)
+
+ self.create_tar_file()
+
+ def create_tar_file(self):
+ """
+ Use temporary name to create uncomressed tarball with dummy file
+ system. Then get checksum of the content and rename the tarfile to
+ <checksum>.tar In this way we can easily generate Manifest of Docker
+ registry and pass it to virt-bootstrap.
+ """
+ filepath = os.path.join(self.tar_dir, 'tmp_file.tar')
+ with tarfile.open(filepath, 'w') as tar:
+ self.create_user_dirs(tar)
+ # Get sha256 checksum of the archive
+ with open(filepath, 'rb') as file_handle:
+ file_hash = hashlib.sha256(file_handle.read()).hexdigest()
+ # Rename the archive to <checksum>.tar
+ new_filepath = os.path.join(self.tar_dir, "%s.tar" % file_hash)
+ os.rename(filepath, new_filepath)
+ self.tar_file = new_filepath
+
+ def tearDown(self):
+ """
+ Clean up.
+ """
+ for test_dir in self.tmp_folders:
+ shutil.rmtree(test_dir)
+
+ def create_tar_members(self, tar_handle, members, m_type, uid=0, gid=0,
+ permissions=None):
+ """
+ Add members to tar file.
+ """
+ if permissions is None:
+ permissions = self.default_permissions
+
+ for name in members:
+ data = ''
+ if isinstance(name, tuple):
+ name, permissions, data = name
+ data_encoded = data.encode('utf-8')
+
+ t_info = tarfile.TarInfo(name)
+ t_info.type = m_type
+ t_info.mode = permissions
+ t_info.uid = uid
+ t_info.gid = gid
+ t_info.size = len(data_encoded)
+
+ tar_handle.addfile(t_info, io.BytesIO(data_encoded))
+
+ def create_user_dirs(self, tar_handle):
+ """
+ Create root file system tree in tar archive.
+ """
+ tar_members = [
+ ['dirs', tarfile.DIRTYPE],
+ ['files', tarfile.REGTYPE],
+ ]
+
+ for user in self.rootfs_tree:
+ for members, tar_type in tar_members:
+ self.create_tar_members(
+ tar_handle,
+ self.rootfs_tree[user][members],
+ tar_type,
+ uid=self.rootfs_tree[user]['uid'],
+ gid=self.rootfs_tree[user]['gid']
+ )
+
+ def apply_mapping(self):
+ """
+ This method applies UID/GID mapping to all users defined in
+ self.rootfs_tree.
+ """
+
+ for user in self.rootfs_tree:
+ user_uid = self.rootfs_tree[user]['uid']
+ user_gid = self.rootfs_tree[user]['gid']
+
+ if self.uid_map:
+ for start, tartget, rng in self.uid_map:
+ if user_uid >= start and user_uid <= start + rng:
+ diff = user_uid - start
+ self.rootfs_tree[user]['uid'] = tartget + diff
+
+ if self.gid_map:
+ for start, tartget, rng in self.gid_map:
+ if user_gid >= start and user_gid <= start + rng:
+ diff = user_gid - start
+ self.rootfs_tree[user]['gid'] = tartget + diff
+
+ def check_rootfs(self, skip_ownership=False):
+ """
+ Check if the root file system was extracted correctly.
+ """
+ for user in self.rootfs_tree:
+ self.check_extracted_members(
+ user, 'files', os.path.isfile, skip_ownership
+ )
+ self.check_extracted_members(
+ user, 'dirs', os.path.isdir, skip_ownership
+ )
+
+ def check_extracted_members(self, user, members, check_existance,
+ skip_ownership=False):
+ """
+ Check permissions, ownership and content of
+ extracted files or directories.
+
+ @param user: user name defined in self.rootfs_tree
+ @param members: The string 'dirs' or 'files'
+ @param check_existance: Function used to confirm the existnace of
+ member. (E.g. os.path.isdir or os.path.isfile)
+ @param skip_ownership: Whther to skip verification of ownership. Useful
+ when members are extracted with unprivileged user.
+ """
+ permissions = self.default_permissions
+ user_uid = self.rootfs_tree[user]['uid']
+ user_gid = self.rootfs_tree[user]['gid']
+
+ for member_name in self.rootfs_tree[user][members]:
+ # If unpack member if it is tuple. Allow us to specify permissions
+ # and data per file.
+ member_data = ''
+ if isinstance(member_name, tuple):
+ if len(member_name) == 3:
+ member_data = member_name[2]
+ member_name, permissions = member_name[:2]
+
+ # Skip already checked members. E.g. when multiple layers were
+ # extracted we want to check only the latest version of file.
+ if member_name in self.checked_members:
+ continue
+ else:
+ self.checked_members.add(member_name)
+
+ #########################
+ # Assertion functions
+ #########################
+ member_path = os.path.join(self.dest_dir, member_name)
+ self.assertTrue(
+ check_existance(member_path),
+ 'Member was not extracted: %s' % member_path
+ )
+ stat = os.stat(member_path)
+ self.assertEqual(
+ stat.st_mode & 0o777, permissions,
+ 'Incorrect permissions: %s' % member_path
+ )
+ if not skip_ownership:
+ self.assertEqual(
+ stat.st_uid, user_uid,
+ 'Incorrect UID: %s' % member_path
+ )
+ self.assertEqual(
+ stat.st_gid, user_gid,
+ 'Incorrect GID: %s' % member_path
+ )
+
+ if member_data:
+ with open(member_path, 'r') as content:
+ file_content = content.read()
+ self.assertEqual(
+ member_data, file_content,
+ 'Incorrect file content: %s\n'
+ 'Found: %s\n'
+ 'Expected: %s' % (member_path, file_content, member_data)
+ )
+
+ def validate_shadow_file(self, shadow_path,
+ skip_hash=False, skip_ownership=False):
+ """
+ Ensure that extracted /etc/shadow file has correct ownership,
+ permissions and hashed root password.
+ """
+ self.assertTrue(
+ os.path.isfile(shadow_path),
+ 'Does not exist: %s' % shadow_path
+ )
+ stat = os.stat(shadow_path)
+ self.assertEqual(
+ stat.st_mode & 0o777,
+ self.shadow_file_permissions,
+ 'Shadow file has incorrect permissions: %s' % shadow_path
+ )
+ if not skip_ownership:
+ self.assertEqual(
+ stat.st_uid,
+ self.rootfs_tree['root']['uid'],
+ 'Shadow file has incorrect UID: %s' % shadow_path
+ )
+ self.assertEqual(
+ stat.st_gid,
+ self.rootfs_tree['root']['gid'],
+ 'Shadow file has incorrect GID: %s' % shadow_path
+ )
+
+ if not skip_hash:
+ # Note: For simplicity we assume that the first line of the file
+ # contains the root entry.
+ with open(shadow_path, 'r') as content:
+ shadow_content = content.readlines()
+
+ if not shadow_content:
+ raise Exception("File is empty: %s" % shadow_path)
+
+ self.assertTrue(
+ passlib.hosts.linux_context.verify(
+ self.root_password,
+ shadow_content[0].split(':')[1]
+ ),
+ "Root password hash is invalid."
+ )
+
+ def validate_shadow_file_in_image(self, g):
+ """
+ Validate permission, ownership and root password hash for /etc/shadow
+ file stored in qcow2 image.
+ """
+ self.assertTrue(
+ g.is_file('/etc/shadow'),
+ "Shadow file does not exist"
+ )
+
+ stat = g.stat('/etc/shadow')
+ self.assertEqual(
+ stat['mode'] & 0o777,
+ self.shadow_file_permissions,
+ 'Shadow file has incorrect permissions'
+ )
+ self.assertEqual(
+ stat['uid'],
+ self.rootfs_tree['root']['uid'],
+ 'Shadow file has incorrect UID'
+ )
+ self.assertEqual(
+ stat['gid'],
+ self.rootfs_tree['root']['gid'],
+ 'Shadow file has incorrect GID'
+ )
+
+ # Note: For simplicity we assume that the first line of the file
+ # contains the root entry.
+ shadow_content = g.cat('/etc/shadow').split('\n')
+ self.assertTrue(
+ passlib.hosts.linux_context.verify(
+ self.root_password,
+ shadow_content[0].split(':')[1]
+ ),
+ "Root password hash is invalid."
+ )
+
+ def check_image_content(self, g, user, members, check_existance):
+ """
+ Verify the existance, permissions, ownership of members in qcow2 image.
+
+ @param g: guestfs handle
+ @param user: User defined in self.rootfs_tree
+ @param members: The string 'dirs' or 'files'
+ @param check_existance: Function used to confirm the existnace of
+ member. (E.g. os.path.isdir or os.path.isfile)
+ """
+ permissions = self.default_permissions
+ user_uid = self.rootfs_tree[user]['uid']
+ user_gid = self.rootfs_tree[user]['gid']
+
+ for member_name in self.rootfs_tree[user][members]:
+ # Get specified permissions of file.
+ if isinstance(member_name, tuple):
+ member_name, permissions = member_name[:2]
+
+ # Skip already checked files.
+ if member_name in self.checked_members:
+ continue
+ else:
+ self.checked_members.add(member_name)
+
+ # When using guestfs all names should start with '/'
+ if not member_name.startswith('/'):
+ member_name = '/' + member_name
+
+ self.assertTrue(
+ check_existance(member_name),
+ "Member was not found: %s" % member_name
+ )
+ stat = g.stat(member_name)
+ self.assertEqual(
+ stat['mode'] & 0o777, permissions,
+ 'Incorrect permissions: %s' % member_name
+ )
+ self.assertEqual(
+ stat['uid'],
+ user_uid,
+ "Incorrect UID: %s\n"
+ "Found: %s\n"
+ "Expected: %s" % (member_name, stat['uid'], user_uid)
+ )
+ self.assertEqual(
+ stat['gid'],
+ user_gid,
+ "Incorrect GID: %s\n"
+ "Found: %s\n"
+ "Expected: %s" % (member_name, stat['gid'], user_gid)
+ )
+
+ def check_image(self, g):
+ """
+ Check the presence of files and folders in qcow2 image.
+ """
+ for user in self.rootfs_tree:
+ # Check folders
+ self.check_image_content(g, user, 'dirs', g.is_dir)
+ # Check files
+ self.check_image_content(g, user, 'files', g.is_file)
diff --git a/tests/docker_source.py b/tests/docker_source.py
new file mode 100644
index 0000000..cf2b2d2
--- /dev/null
+++ b/tests/docker_source.py
@@ -0,0 +1,535 @@
+# -*- coding: utf-8 -*-
+# Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
+#
+# Copyright (C) 2017 Radostin Stoyanov
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Regression tests which aim to excercise creation of root file system
+with DockerSource.
+
+To avoid fetching network resources we mock out the functions:
+- utils.get_image_details(): Returns manifest content
+- utils.get_image_dir(): Returns the directory which contains the tar files
+
+Brief description of this tests:
+1. Create 3 dummy tar files named <checksum>.tar used as image layers.
+2. Generate manifest content.
+3. Mock out get_image_details() and get_image_dir().
+4. Call bootstrap().
+5. Check the result.
+"""
+
+import copy
+import os
+import subprocess
+import unittest
+import guestfs
+
+from . import mock
+from . import sources
+from . import virt_bootstrap
+from . import BuildTarFiles
+
+
+# pylint: disable=invalid-name
+class CreateLayers(BuildTarFiles):
+ """
+ Create tarfiles to used as image layers and generate manifest for them.
+ """
+ def check_result(self, layers_rootfs, dest):
+ """
+ This method is used to check extracted root file system.
+ """
+ pass
+
+ def call_bootstrap(self, manifest, dest):
+ """
+ This method is used to call vurtBootstrap.bootstrap()
+ """
+ pass
+
+ def generate_manifest(self, layers, layers_rootfs):
+ """
+ Generate Manifest content and then call self.call_bootstrap() and
+ self.check_result()
+ """
+ manifest = {
+ "schemaVersion": 2,
+ "layers": [
+ {
+ "digest":
+ "sha256:" + os.path.basename(layer).split('.')[0]
+ }
+ for layer in layers
+ ]
+ }
+ self.call_bootstrap(manifest, self.dest_dir)
+ self.check_result(layers_rootfs, self.dest_dir)
+
+ def main(self):
+ """
+ Create dummy tar files used to simulate layers of image
+ then call generate_manifest().
+
+ The variable:
+ - "layers" store a lists of paths to created archives.
+
+ - "layers_rootfs" store the value of self.rootfs_tree used to generate
+ tarball.
+ """
+ # Store base layer info
+ layers = [self.tar_file]
+ layers_rootfs = [copy.deepcopy(self.rootfs_tree)]
+
+ # Create Layer 1
+ self.rootfs_tree['root']['files'] = [
+ ('etc/foo/bar', 0o644, "This should be overwritten")
+ ]
+ self.create_tar_file()
+ # Store layer 1 info
+ layers.append(self.tar_file)
+ layers_rootfs.append(copy.deepcopy(self.rootfs_tree))
+
+ # Create Layer 2
+ self.rootfs_tree['root']['files'] = [
+ ('etc/foo/bar', 0o644, "Content of etc/foo/bar"),
+ ('bin/foobar', 0o755, "My executable script")
+ ]
+ self.create_tar_file()
+ # Store layer 2 info
+ layers.append(self.tar_file)
+ layers_rootfs.append(copy.deepcopy(self.rootfs_tree))
+
+ self.generate_manifest(layers, layers_rootfs)
+
+
+class DirExtractRootFS(CreateLayers):
+ """
+ Ensures that all layers extracted correctly in destination folder.
+ """
+ def check_result(self, layers_rootfs, dest):
+ """
+ Iterates trough values of layers_rootfs in reverse order (from the last
+ layer to first) and calls check_extracted_files().
+ """
+ for rootfs_tree in layers_rootfs[::-1]:
+ self.rootfs_tree = rootfs_tree
+ self.check_rootfs(skip_ownership=(os.geteuid != 0))
+
+ def call_bootstrap(self, manifest, dest):
+ """
+ Mock get_image_details() and get_image_dir() then call the function
+ virt-bootstra.bootstrap().
+ """
+ with mock.patch.multiple('virtBootstrap.utils',
+ get_image_details=mock.DEFAULT,
+ get_image_dir=mock.DEFAULT) as mocked:
+
+ mocked['get_image_details'].return_value = manifest
+ mocked['get_image_dir'].return_value = self.tar_dir
+
+ virt_bootstrap.bootstrap(
+ uri='docker://foobar',
+ dest=dest,
+ fmt='dir',
+ progress_cb=mock.Mock()
+ )
+
+ def runTest(self):
+ """
+ Execute this test.
+ """
+ self.main()
+
+
+ at unittest.skipIf(os.geteuid() != 0, "Root privileges required")
+class DirOwnershipMapping(CreateLayers):
+ """
+ Ensures that the mapping of ownership works as expected.
+ """
+
+ def check_result(self, layers_rootfs, dest):
+ """
+ Iterate trough all values of layers_rootfs in reverse order apply the
+ mapping self.rootfs_tree used to create the test archive and verify
+ extracted files.
+ """
+ for rootfs_tree in layers_rootfs[::-1]:
+ self.rootfs_tree = rootfs_tree
+ self.apply_mapping()
+ self.check_rootfs()
+
+ def call_bootstrap(self, manifest, dest):
+ """
+ Mock get_image_details() and get_image_dir() and call the function
+ virt_bootstrap.bootstrap() with UID/GID mapping values.
+ """
+ self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
+ self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
+ with mock.patch.multiple('virtBootstrap.utils',
+ get_image_details=mock.DEFAULT,
+ get_image_dir=mock.DEFAULT) as mocked:
+ mocked['get_image_details'].return_value = manifest
+ mocked['get_image_dir'].return_value = self.tar_dir
+ virt_bootstrap.bootstrap(
+ progress_cb=mock.Mock(),
+ uri='docker://foo',
+ dest=dest,
+ fmt='dir',
+ uid_map=self.uid_map,
+ gid_map=self.gid_map
+ )
+
+ def runTest(self):
+ """
+ Execute this test.
+ """
+ self.main()
+
+
+ at unittest.skipIf(os.geteuid() != 0, "Root privileges required")
+class DirSettingRootPassword(CreateLayers):
+ """
+ Ensures that the root password is set correctly.
+ """
+ def check_result(self, layers_rootfs, dest):
+ """
+ Validate shadow file
+ """
+ self.validate_shadow_file(os.path.join(dest, 'etc/shadow'))
+
+ def call_bootstrap(self, manifest, dest):
+ """
+ Mock get_image_details() and get_image_dir() and call the function
+ virt_bootstrap.bootstrap() with root_password value.
+ """
+ self.root_password = "My secret root password"
+ with mock.patch.multiple('virtBootstrap.utils',
+ get_image_details=mock.DEFAULT,
+ get_image_dir=mock.DEFAULT) as mocked:
+
+ mocked['get_image_details'].return_value = manifest
+ mocked['get_image_dir'].return_value = self.tar_dir
+
+ virt_bootstrap.bootstrap(
+ progress_cb=mock.Mock(),
+ uri='docker://foo',
+ dest=dest,
+ fmt='dir',
+ root_password=self.root_password
+ )
+
+ def runTest(self):
+ """
+ Execute this test.
+ """
+ self.main()
+
+
+class Qcow2BuildImage(CreateLayers):
+ """
+ Ensures that the convertion of tar files to qcow2 image with backing chains
+ works as expected.
+ """
+ def get_image_info(self, image_path):
+ """
+ Returns information about the disk image using "qemu-img".
+ """
+ cmd = ['qemu-img', 'info', image_path]
+ proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
+ output, _ignore = proc.communicate()
+ return output.decode('utf-8').split('\n')
+
+ def check_result(self, layers_rootfs, dest):
+ """
+ Verify that layers were converted correctly to qcow2 images.
+ """
+ ###################
+ # Check base layer
+ ###################
+ base_layer_path = os.path.join(dest, "layer-0.qcow2")
+ base_info = self.get_image_info(base_layer_path)
+ self.assertEqual(base_info[1], 'file format: qcow2')
+ images = [base_layer_path]
+ ###########################
+ # Check backing chains
+ ###########################
+ for i in range(1, len(layers_rootfs)):
+ img_path = os.path.join(dest, "layer-%d.qcow2" % i)
+ img_info = self.get_image_info(img_path)
+ self.assertEqual(
+ img_info[1],
+ 'file format: qcow2',
+ 'Invalid qcow2 disk image: %s' % img_path
+ )
+ backing_file = os.path.join(dest, "layer-%d.qcow2" % (i - 1))
+ self.assertEqual(
+ img_info[5],
+ 'backing file: %s' % backing_file,
+ "Incorrect backing file for: %s\n"
+ "Expected: %s\n"
+ "Found: %s" % (img_info, backing_file, img_info[5])
+ )
+ images.append(img_path)
+ ###############################
+ # Check extracted files/folders
+ ###############################
+ g = guestfs.GuestFS(python_return_dict=True)
+ for path in images:
+ g.add_drive_opts(path, readonly=True)
+ g.launch()
+ devices = g.list_filesystems()
+ for dev, rootfs in zip(sorted(devices), layers_rootfs):
+ self.rootfs_tree = rootfs
+ g.mount(dev, '/')
+ self.check_image(g)
+ g.umount('/')
+ g.shutdown()
+
+ def call_bootstrap(self, manifest, dest):
+ """
+ Mock get_image_details() and get_image_dir() and call the function
+ virt_bootstrap.bootstrap() for qcow2 format.
+ """
+ with mock.patch.multiple('virtBootstrap.utils',
+ get_image_details=mock.DEFAULT,
+ get_image_dir=mock.DEFAULT) as mocked:
+
+ mocked['get_image_details'].return_value = manifest
+ mocked['get_image_dir'].return_value = self.tar_dir
+
+ virt_bootstrap.bootstrap(
+ uri='docker://foobar',
+ dest=dest,
+ fmt='qcow2',
+ progress_cb=mock.Mock()
+ )
+
+ def runTest(self):
+ """
+ Execute this test.
+ """
+ self.main()
+
+
+ at unittest.skip("Need fix for this test to pass")
+class Qcow2OwnershipMapping(Qcow2BuildImage):
+ """
+ Ensures that UID/GID mapping works correctly for qcow2 conversion.
+ """
+ def check_result(self, layers_rootfs, dest):
+ """
+ Iterate through values of layers_rootfs in reverse order and apply the
+ mapping values to self.rootfs_tree. Then verify the ownership of
+ files/folders created in the last backing chain of qcow2 image.
+ """
+ g = guestfs.GuestFS(python_return_dict=True)
+ image_path = os.path.join(dest, "layer-%d.qcow2" % len(layers_rootfs))
+ g.add_drive_opts(image_path, readonly=True)
+
+ g.launch()
+ for rootfs in layers_rootfs[::-1]:
+ self.rootfs_tree = rootfs
+ self.apply_mapping()
+ g.mount('/dev/sda', '/')
+ self.check_image(g)
+ g.umount('/')
+ g.shutdown()
+
+ def call_bootstrap(self, manifest, dest):
+ """
+ Mock the functions get_image_details() and get_image_dir() then call
+ virtbootstrap.bootstrap() with UID/GID mapping values and fmt="qcow2".
+ """
+ self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
+ self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
+ with mock.patch.multiple('virtBootstrap.utils',
+ get_image_details=mock.DEFAULT,
+ get_image_dir=mock.DEFAULT) as mocked:
+ mocked['get_image_details'].return_value = manifest
+ mocked['get_image_dir'].return_value = self.tar_dir
+ virt_bootstrap.bootstrap(
+ uri='docker://foobar',
+ progress_cb=mock.Mock(),
+ fmt='qcow2',
+ dest=dest,
+ uid_map=self.uid_map,
+ gid_map=self.gid_map
+ )
+
+
+ at unittest.skip("Need fix for this test to pass")
+class Qcow2SettingRootPassword(Qcow2BuildImage):
+ """
+ Ensures that the root password is set correctly in the last backing chain
+ of image qcow2 image.
+ """
+ def check_result(self, layers_rootfs, dest):
+ """
+ Load last backing chain and validate shadow file.
+ """
+ g = guestfs.GuestFS(python_return_dict=True)
+ g.add_drive_opts(
+ os.path.join(dest, "layer-%d.qcow2" % len(layers_rootfs)),
+ readonly=True
+ )
+ g.launch()
+ g.mount('/dev/sda', '/')
+ self.validate_shadow_file_in_image(g)
+ g.umount('/')
+ g.shutdown()
+
+ def call_bootstrap(self, manifest, dest):
+ """
+ Mock the functions get_image_details() and get_image_dir() then
+ call virt_bootstrap.bootstrap() with root password value and qcow2
+ output format.
+ """
+ self.root_password = "My secret password"
+ with mock.patch.multiple('virtBootstrap.utils',
+ get_image_details=mock.DEFAULT,
+ get_image_dir=mock.DEFAULT) as mocked:
+ mocked['get_image_details'].return_value = manifest
+ mocked['get_image_dir'].return_value = self.tar_dir
+ virt_bootstrap.bootstrap(
+ uri='docker://foobar',
+ dest=dest,
+ fmt='qcow2',
+ progress_cb=mock.Mock(),
+ root_password=self.root_password
+ )
+
+
+class TestDockerSource(unittest.TestCase):
+ """
+ Unit tests for DockerSource
+ """
+ ###################################
+ # Tests for: retrieve_layers_info()
+ ###################################
+ def _mock_retrieve_layers_info(self, manifest, kwargs):
+ """
+ This method is gather common test pattern used in the following
+ two test cases which aim to return an instance of the class
+ DockerSource with some util functions being mocked.
+ """
+ with mock.patch.multiple('virtBootstrap.utils',
+ get_image_details=mock.DEFAULT,
+ get_image_dir=mock.DEFAULT) as m_utils:
+
+ m_utils['get_image_details'].return_value = manifest
+ m_utils['get_image_dir'].return_value = '/images_path'
+
+ patch_method = 'virtBootstrap.sources.DockerSource.gen_valid_uri'
+ with mock.patch(patch_method) as m_uri:
+ src_instance = sources.DockerSource(**kwargs)
+ return (src_instance, m_uri, m_utils)
+
+ def test_retrieve_layers_info_pass_arguments_to_get_image_details(self):
+ """
+ Ensures that retrieve_layers_info() calls get_image_details()
+ with all passed arguments.
+ """
+ src_kwargs = {
+ 'uri': '',
+ 'progress': mock.Mock()
+ }
+
+ manifest = {'schemaVersion': 2, 'layers': []}
+ (src_instance,
+ m_uri, m_utils) = self._mock_retrieve_layers_info(manifest,
+ src_kwargs)
+
+ kwargs = {
+ 'insecure': src_instance.insecure,
+ 'username': src_instance.username,
+ 'password': src_instance.password,
+ 'raw': True
+ }
+ m_utils['get_image_details'].assert_called_once_with(m_uri(), **kwargs)
+
+ def test_retrieve_layers_info_schema_version_1(self):
+ """
+ Ensures that retrieve_layers_info() extracts the layers' information
+ from manifest with schema version 1 a list with format:
+ ["digest", "sum_type", "file_path", "size"].
+ """
+ kwargs = {
+ 'uri': '',
+ 'progress': mock.Mock()
+ }
+
+ manifest = {
+ 'schemaVersion': 1,
+ 'fsLayers': [
+ {'blobSum': 'sha256:75c416ea'},
+ {'blobSum': 'sha256:c6ff40b6'},
+ {'blobSum': 'sha256:a7050fc1'}
+ ]
+ }
+
+ expected_result = [
+ ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', None],
+ ['sha256', 'c6ff40b6', '/images_path/c6ff40b6.tar', None],
+ ['sha256', '75c416ea', '/images_path/75c416ea.tar', None]
+ ]
+
+ src_instance = self._mock_retrieve_layers_info(manifest, kwargs)[0]
+ self.assertEqual(src_instance.layers, expected_result)
+
+ def test_retrieve_layers_info_schema_version_2(self):
+ """
+ Ensures that retrieve_layers_info() extracts the layers' information
+ from manifest with schema version 2 a list with format:
+ ["digest", "sum_type", "file_path", "size"].
+ """
+ kwargs = {
+ 'uri': '',
+ 'progress': mock.Mock()
+ }
+
+ manifest = {
+ 'schemaVersion': 2,
+ "layers": [
+ {"size": 47103294, "digest": "sha256:75c416ea"},
+ {"size": 814, "digest": "sha256:c6ff40b6"},
+ {"size": 513, "digest": "sha256:a7050fc1"}
+ ]
+ }
+
+ expected_result = [
+ ['sha256', '75c416ea', '/images_path/75c416ea.tar', 47103294],
+ ['sha256', 'c6ff40b6', '/images_path/c6ff40b6.tar', 814],
+ ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', 513]
+ ]
+
+ src_instance = self._mock_retrieve_layers_info(manifest, kwargs)[0]
+ self.assertEqual(src_instance.layers, expected_result)
+
+ def test_retrieve_layers_info_raise_error_on_invalid_schema_version(self):
+ """
+ Ensures that retrieve_layers_info() calls get_image_details()
+ with all passed arguments.
+ """
+ kwargs = {
+ 'uri': '',
+ 'progress': mock.Mock()
+ }
+
+ manifest = {'schemaVersion': 3}
+ with self.assertRaises(ValueError):
+ self._mock_retrieve_layers_info(manifest, kwargs)
diff --git a/tests/file_source.py b/tests/file_source.py
new file mode 100644
index 0000000..dd7fd00
--- /dev/null
+++ b/tests/file_source.py
@@ -0,0 +1,184 @@
+# -*- coding: utf-8 -*-
+# Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
+#
+# Copyright (C) 2017 Radostin Stoyanov
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Regression tests which aim to excercise the creation of root file system
+with FileSource.
+"""
+
+import os
+import unittest
+import guestfs
+
+from . import virt_bootstrap
+from . import BuildTarFiles
+from . import mock
+
+
+# pylint: disable=invalid-name
+class Qcow2BuildImage(BuildTarFiles):
+ """
+ Ensures that building qcow2 image from tarball with root file system
+ works as expected.
+ """
+
+ def check_qcow2_images(self, image_path):
+ """
+ Ensures that qcow2 images contain all files.
+ """
+ g = guestfs.GuestFS(python_return_dict=True)
+ g.add_drive_opts(image_path, readonly=True)
+ g.launch()
+ g.mount('/dev/sda', '/')
+ self.check_image(g)
+ g.umount('/')
+ g.shutdown()
+
+ def get_image_path(self):
+ """
+ Returns the path where the qcow2 image will be stored.
+ """
+ return os.path.join(
+ self.dest_dir,
+ "%s.qcow2" % os.path.basename(self.tar_file)
+ )
+
+ def runTest(self):
+ """
+ Create qcow2 image from each dummy tarfile.
+ """
+ virt_bootstrap.bootstrap(
+ uri=self.tar_file,
+ dest=self.dest_dir,
+ fmt='qcow2',
+ progress_cb=mock.Mock()
+ )
+ self.check_qcow2_images(self.get_image_path())
+
+
+ at unittest.skip("Not implemented")
+class Qcow2OwnershipMapping(Qcow2BuildImage):
+ """
+ Ensures that UID/GID mapping works correctly for qcow2 conversion.
+ """
+ def runTest(self):
+ """
+ Create qcow2 image from each dummy tarfile.
+ """
+ self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
+ self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
+ virt_bootstrap.bootstrap(
+ progress_cb=mock.Mock(),
+ uri=self.tar_file,
+ dest=self.dest_dir,
+ fmt='qcow2',
+ uid_map=self.uid_map,
+ gid_map=self.gid_map
+ )
+ self.apply_mapping()
+ self.check_qcow2_images(self.get_image_path())
+
+
+ at unittest.skip("Not implemented")
+class Qcow2SettingRootPassword(Qcow2BuildImage):
+ """
+ Ensures that the root password is set correctly in the backing file
+ of image qcow2 image.
+ """
+ def runTest(self):
+ """
+ Create qcow2 image from each dummy tarfile.
+ """
+ self.root_password = "My secret password"
+ virt_bootstrap.bootstrap(
+ progress_cb=mock.Mock(),
+ uri=self.tar_file,
+ dest=self.dest_dir,
+ fmt='qcow2',
+ root_password=self.root_password
+ )
+ self.check_image = self.validate_shadow_file_in_image
+ self.check_qcow2_images(self.get_image_path())
+
+
+ at unittest.skipIf(os.geteuid() != 0, "Root privileges required")
+class DirExtractRootFS(BuildTarFiles):
+ """
+ Ensures that files from rootfs tarball are extracted correctly.
+ """
+ def runTest(self):
+ """
+ Extract rootfs from each dummy tarfile.
+ """
+ dest = self.dest_dir
+ virt_bootstrap.bootstrap(
+ uri=self.tar_file,
+ dest=dest,
+ fmt='dir',
+ progress_cb=mock.Mock()
+ )
+ self.check_rootfs()
+
+
+ at unittest.skipIf(os.geteuid() != 0, "Root privileges required")
+class DirOwnershipMapping(DirExtractRootFS):
+ """
+ Ensures that UID/GID mapping for extracted root file system are applied
+ correctly.
+ """
+ def runTest(self):
+ """
+ Extract the dummy tarfiles using FileSource and apply UID/GID mappings.
+ """
+ self.uid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
+ self.gid_map = [[1000, 2000, 10], [0, 1000, 10], [500, 500, 10]]
+ # Create qcow2 images
+ dest = self.dest_dir
+ virt_bootstrap.bootstrap(
+ uri=self.tar_file,
+ dest=dest,
+ fmt='dir',
+ progress_cb=mock.Mock(),
+ uid_map=self.uid_map,
+ gid_map=self.gid_map
+ )
+ self.apply_mapping()
+ self.check_rootfs()
+
+
+ at unittest.skipIf(os.geteuid() != 0, "Root privileges required")
+class DirSettingRootPassword(DirExtractRootFS):
+ """
+ Ensures that the root password is set correctly when FileSource is used
+ with fmt='dir'.
+ """
+
+ def runTest(self):
+ """
+ Extract rootfs from each dummy tarfile and set root password.
+ """
+ self.root_password = 'my secret root password'
+ virt_bootstrap.bootstrap(
+ uri=self.tar_file,
+ dest=self.dest_dir,
+ fmt='dir',
+ progress_cb=mock.Mock(),
+ root_password=self.root_password
+ )
+ self.validate_shadow_file(os.path.join(self.dest_dir, 'etc/shadow'))
diff --git a/tests/test_utils.py b/tests/test_utils.py
new file mode 100644
index 0000000..c2f55b5
--- /dev/null
+++ b/tests/test_utils.py
@@ -0,0 +1,143 @@
+# -*- coding: utf-8 -*-
+# Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
+#
+# Copyright (C) 2017 Radostin Stoyanov
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Unit tests for functions defined in virtBootstrap.utils
+"""
+import unittest
+from . import utils
+
+
+# pylint: disable=invalid-name
+class TestUtils(unittest.TestCase):
+ """
+ Ensures that functions defined in the utils module of virtBootstrap
+ work as expected.
+ """
+ ###################################
+ # Tests for: bytes_to_size()
+ ###################################
+ def test_utils_bytes_to_size(self):
+ """
+ Validates the output of bytes_to_size() for some test cases.
+ """
+ test_values = {
+ 0: '0', 1: '1', 512: '512', 1000: '0.98 KiB', 1024: '1 KiB',
+ 4096: '4 KiB', 5120: '5 KiB', 10 ** 10: '9.31 GiB'
+ }
+ for value in test_values:
+ self.assertEqual(utils.bytes_to_size(value), test_values[value])
+
+ ###################################
+ # Tests for: size_to_bytes()
+ ###################################
+ def test_utils_size_to_bytes(self):
+ """
+ Validates the output of size_to_bytes() for some test cases.
+ """
+ test_values = [1, '0']
+ test_formats = ['TB', 'GB', 'MB', 'KB', 'B']
+ expected_output = [1099511627776, 1073741824, 1048576, 1024, 1,
+ 0, 0, 0, 0, 0]
+ i = 0
+ for value in test_values:
+ for fmt in test_formats:
+ self.assertEqual(utils.size_to_bytes(value, fmt),
+ expected_output[i])
+ i += 1
+
+ ###################################
+ # Tests for: is_new_layer_message()
+ ###################################
+ def test_utils_is_new_layer_message(self):
+ """
+ Ensures that is_new_layer_message() returns True when message
+ from the skopeo's stdout indicates processing of new layer
+ and False otherwise.
+ """
+
+ valid_msgs = [
+ "Copying blob sha256:be232718519c940b04bc57",
+ "Skipping fetch of repeat blob sha256:75c416ea735c42a4a0b2"
+ ]
+
+ invalid_msgs = [
+ 'Copying config sha256', 'test', ''
+ ]
+
+ for msg in valid_msgs:
+ self.assertTrue(utils.is_new_layer_message(msg))
+ for msg in invalid_msgs:
+ self.assertFalse(utils.is_new_layer_message(msg))
+
+ ###################################
+ # Tests for: is_layer_config_message()
+ ###################################
+ def test_utils_is_layer_config_message(self):
+ """
+ Ensures that is_layer_config_message() returns True when message
+ from the skopeo's stdout indicates processing of manifest file
+ of container image and False otherwise.
+ """
+ invalid_msgs = [
+ "Copying blob sha256:be232718519c940b04bc57",
+ "Skipping fetch of repeat blob sha256:75c416ea735c42a4a0b2",
+ ''
+ ]
+
+ valid_msg = 'Copying config sha256:d355ed3537e94e76389fd78b7724'
+
+ self.assertTrue(utils.is_layer_config_message(valid_msg))
+ for msg in invalid_msgs:
+ self.assertFalse(utils.is_layer_config_message(msg))
+
+ ###################################
+ # Tests for: make_async()
+ ###################################
+ def test_utils_make_async(self):
+ """
+ Ensures that make_async() sets O_NONBLOCK flag on PIPE.
+ """
+
+ proc = utils.subprocess.Popen(
+ ["echo"],
+ stdout=utils.subprocess.PIPE
+ )
+ pipe = proc.stdout
+
+ fd = pipe.fileno()
+ F_GETFL = utils.fcntl.F_GETFL
+ O_NONBLOCK = utils.os.O_NONBLOCK
+
+ self.assertFalse(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK)
+ utils.make_async(fd)
+ self.assertTrue(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK)
+ proc.wait()
+ pipe.close()
+
+ ###################################
+ # Tests for: str2float()
+ ###################################
+ def test_utils_str2float(self):
+ """
+ Validates the output of str2float() for some test cases.
+ """
+ test_values = {'1': 1.0, 'test': None, '0': 0.0, '1.25': 1.25}
+ for test in test_values:
+ self.assertEqual(utils.str2float(test), test_values[test])
--
2.13.5
More information about the virt-tools-list
mailing list