[virt-tools-list] [virt-bootstrap] [PATCH v5 11/13] Build_QCOW2_Image: Enable setting root password
Radostin Stoyanov
rstoyanov1 at gmail.com
Fri Aug 4 14:30:47 UTC 2017
Remove the functions set_root_password(),
set_root_password_in_image() and replace the unit tests with more
abstract form of testing which creates qcow2 images and test if the
shadow file has been changed.
---
src/virtBootstrap/sources/docker_source.py | 4 +-
src/virtBootstrap/sources/file_source.py | 4 +-
src/virtBootstrap/utils.py | 72 +++++++++++++++++++-----------
src/virtBootstrap/virt_bootstrap.py | 14 +++---
tests/test_build_qcow2_image.py | 57 +++++++++++++++++++++++
tests/test_utils.py | 62 -------------------------
tests/test_virt_bootstrap.py | 12 ++---
7 files changed, 124 insertions(+), 101 deletions(-)
diff --git a/src/virtBootstrap/sources/docker_source.py b/src/virtBootstrap/sources/docker_source.py
index 45e6c1d..207d166 100644
--- a/src/virtBootstrap/sources/docker_source.py
+++ b/src/virtBootstrap/sources/docker_source.py
@@ -65,6 +65,7 @@ class DockerSource(object):
self.password = kwargs.get('password', None)
self.uid_map = kwargs.get('uid_map', None)
self.gid_map = kwargs.get('gid_map', None)
+ self.root_password = kwargs.get('root_password', None)
self.output_format = kwargs.get('fmt', utils.DEFAULT_OUTPUT_FORMAT)
self.insecure = kwargs.get('not_secure', False)
self.no_cache = kwargs.get('no_cache', False)
@@ -279,7 +280,8 @@ class DockerSource(object):
dest=dest,
progress=self.progress,
uid_map=self.uid_map,
- gid_map=self.gid_map
+ gid_map=self.gid_map,
+ root_password=self.root_password
)
else:
raise Exception("Unknown format:" + self.output_format)
diff --git a/src/virtBootstrap/sources/file_source.py b/src/virtBootstrap/sources/file_source.py
index 70b91db..ef03741 100644
--- a/src/virtBootstrap/sources/file_source.py
+++ b/src/virtBootstrap/sources/file_source.py
@@ -49,6 +49,7 @@ class FileSource(object):
self.output_format = kwargs.get('fmt', utils.DEFAULT_OUTPUT_FORMAT)
self.uid_map = kwargs.get('uid_map', None)
self.gid_map = kwargs.get('gid_map', None)
+ self.root_password = kwargs.get('root_password', None)
self.progress = kwargs['progress'].update_progress
def unpack(self, dest):
@@ -72,7 +73,8 @@ class FileSource(object):
dest=dest,
progress=self.progress,
uid_map=self.uid_map,
- gid_map=self.gid_map
+ gid_map=self.gid_map,
+ root_password=self.root_password
)
else:
raise Exception("Unknown format:" + self.output_format)
diff --git a/src/virtBootstrap/utils.py b/src/virtBootstrap/utils.py
index 66679c5..0328bbd 100644
--- a/src/virtBootstrap/utils.py
+++ b/src/virtBootstrap/utils.py
@@ -32,7 +32,6 @@ import sys
import tarfile
import tempfile
import logging
-import re
import guestfs
import passlib.hosts
@@ -65,6 +64,7 @@ class Build_QCOW2_Image(object):
@param tar_files: List of tar files from which to create rootfs
@param uid_map: Mappings for UID of files in rootfs
@param gid_map: Mappings for GID of files in rootfs
+ @param root_password: Root password to be set
@param dest: Destination directory where qcow2 images will be stored
@param progress: Instance of the progress module
@@ -80,16 +80,22 @@ class Build_QCOW2_Image(object):
self.progress = kwargs['progress']
self.uid_map = kwargs.get('uid_map', None)
self.gid_map = kwargs.get('gid_map', None)
+ self.root_password = kwargs.get('root_password', None)
self.fmt = 'qcow2'
+ nlayers = len(self.tar_files)
+ # Add an overlay layer to apply changes in the shadow file
+ if self.root_password:
+ nlayers += 1
self.qcow2_files = [os.path.join(kwargs['dest'], 'layer-%s.qcow2' % i)
- for i in range(len(self.tar_files))]
+ for i in range(nlayers)]
self.g = guestfs.GuestFS(python_return_dict=True)
self.create_base_qcow2_layer(self.tar_files[0], self.qcow2_files[0])
if len(self.tar_files) > 1:
self.create_backing_chains()
self.g.shutdown()
+ self.set_root_password()
def create_disk(self, qcow2_file, backingfile=None, readonly=False):
"""
@@ -184,6 +190,45 @@ class Build_QCOW2_Image(object):
if new_uid != -1 or new_gid != -1:
self.g.lchown(new_uid, new_gid, os.path.join('/', member.name))
+ def set_root_password(self):
+ """
+ Set root password in the shadow file of image.
+
+ Mount the last the layer to update the shadow file with the
+ hash for the root password.
+ """
+ if self.root_password is None:
+ return
+
+ self.progress("Setting root password", logger=logger)
+
+ self.create_disk(
+ self.qcow2_files[-1],
+ backingfile=self.qcow2_files[-2]
+ )
+
+ self.g.launch()
+ self.g.mount(self.g.list_devices()[0], '/')
+
+ if not self.g.is_file('/etc/shadow'):
+ logger.error('shadow file was not found in this image')
+ return
+
+ shadow_content = self.g.read_file('/etc/shadow').split('\n')
+
+ if not shadow_content:
+ logger.error('shadow file was empty')
+ return
+
+ # Note: 'shadow_content' is of type list, thus pass-by-reference
+ # is used here.
+ set_password_in_shadow_content(
+ shadow_content,
+ self.root_password
+ )
+ self.g.write('/etc/shadow', '\n'.join(shadow_content))
+ self.g.umount('/')
+
def get_compression_type(tar_file):
"""
@@ -472,29 +517,6 @@ def set_root_password_in_rootfs(rootfs, password):
os.chmod(shadow_file, shadow_file_permissions)
-def set_root_password_in_image(image, password):
- """
- Set password on the root user within image
- """
- password_hash = passlib.hosts.linux_context.hash(password)
- execute(['virt-edit',
- '-a', image, '/etc/shadow',
- '-e', 's,^root:.*?:,root:%s:,' % re.escape(password_hash)])
-
-
-def set_root_password(fmt, dest, root_password):
- """
- Set root password
- """
- if fmt == "dir":
- set_root_password_in_rootfs(dest, root_password)
- elif fmt == "qcow2":
- layers = [layer for layer in os.listdir(dest)
- if layer.startswith('layer-')]
- set_root_password_in_image(os.path.join(dest, max(layers)),
- root_password)
-
-
def write_progress(prog):
"""
Write progress output to console
diff --git a/src/virtBootstrap/virt_bootstrap.py b/src/virtBootstrap/virt_bootstrap.py
index 6c1e944..cbd9f0c 100755
--- a/src/virtBootstrap/virt_bootstrap.py
+++ b/src/virtBootstrap/virt_bootstrap.py
@@ -128,15 +128,17 @@ def bootstrap(uri, dest,
gid_map=gid_map,
not_secure=not_secure,
no_cache=no_cache,
+ root_password=root_password,
progress=prog).unpack(dest)
- if root_password is not None:
- logger.info("Setting password of the root account")
- utils.set_root_password(fmt, dest, root_password)
+ if fmt == "dir":
+ if root_password is not None:
+ logger.info("Setting password of the root account")
+ utils.set_root_password_in_rootfs(dest, root_password)
- if fmt == "dir" and (uid_map or gid_map):
- logger.info("Mapping UID/GID")
- utils.mapping_uid_gid(dest, uid_map, gid_map)
+ if uid_map or gid_map:
+ logger.info("Mapping UID/GID")
+ utils.mapping_uid_gid(dest, uid_map, gid_map)
def set_logging_conf(loglevel=None):
diff --git a/tests/test_build_qcow2_image.py b/tests/test_build_qcow2_image.py
index 5ed0ade..3f160d9 100644
--- a/tests/test_build_qcow2_image.py
+++ b/tests/test_build_qcow2_image.py
@@ -28,6 +28,11 @@ from tests import mock
from tests import virt_bootstrap
import guestfs
+try:
+ from StringIO import StringIO
+except ImportError:
+ from io import StringIO
+
TAR_DIR = os.path.realpath('tests/tarfiles')
IMAGES_DIR = os.path.realpath('tests/images')
ROOTFS_TREE = {
@@ -276,3 +281,55 @@ class TestIDMapping(TestBuild_Image):
)
self.set_ownership_mapping(idmap, idmap)
self.check_qcow2_images(images)
+
+
+class TestSetRootPassword(TestBuild_Image):
+ """
+ Ensures that setting root password works for qcow2 images.
+ """
+ def create_user_dirs(self, tar_handle):
+ """
+ Create rootfs tree with only /etc/shadow file
+ """
+ data = 'root:*:'
+ t_info = tarfile.TarInfo('/etc/shadow')
+ t_info.size = len(data)
+ t_info.mode = 0o000
+ t_info.type = tarfile.REGTYPE
+ t_info.uid = 0
+ t_info.gid = 0
+ tar_handle.addfile(t_info, StringIO(data.encode('utf-8')))
+
+ def check_members(self, g):
+ """
+ Check if all files and folders exist in the qcow2 image.
+ """
+ name = '/etc/shadow'
+ self.assertTrue(g.is_file(name), "Not file %s" % name)
+ stat = g.stat(name)
+ self.assertEqual(stat['mode'] & 0o777, 0o000)
+ self.assertEqual(stat['uid'], 0)
+ self.assertEqual(stat['gid'], 0)
+ content = g.read_file('/etc/shadow')
+ root_hash = content.split(':')[1]
+ self.assertNotEqual(root_hash, '*')
+ self.assertGreater(len(root_hash), 100)
+
+ def runTest(self):
+ """
+ Create qcow2 image from each dummy tarfile using FileSource
+ and set root password.
+ """
+ images = []
+ for filename in TAR_FILES:
+ dest = os.path.join(IMAGES_DIR, filename.split('.')[0])
+ images.append(os.path.join(dest, "layer-1.qcow2"))
+ uri = os.path.join(TAR_DIR, filename)
+ virt_bootstrap.bootstrap(
+ uri=uri,
+ dest=dest,
+ fmt='qcow2',
+ progress_cb=mock.Mock(),
+ root_password='test'
+ )
+ self.check_qcow2_images(images)
diff --git a/tests/test_utils.py b/tests/test_utils.py
index 7ce2ba4..56f3460 100644
--- a/tests/test_utils.py
+++ b/tests/test_utils.py
@@ -496,68 +496,6 @@ class TestUtils(unittest.TestCase):
% hashed_password)
###################################
- # Tests for: set_root_password_in_image()
- ###################################
- @mock.patch('virtBootstrap.utils.execute')
- def test_utils_set_root_password_in_image(self, m_execute):
- """
- Ensures that set_root_password_in_image() calls virt-edit
- with correct arguments.
- """
- image, password = 'foo', 'password'
- password_hash = ('$6$rounds=656000$PaQ/H4c/k8Ix9YOM$'
- 'cyD47r9PtAE2LhnkpdbVzsiQbM0/h2S/1Bv'
- 'u/sXqUtCg.3Ijp7TQy/8tEVstxMy5k5v4mh'
- 'CGFqnVv7S6wd.Ah/')
-
- expected_call = [
- 'virt-edit',
- '-a', image, '/etc/shadow',
- '-e', 's,^root:.*?:,root:%s:,' % utils.re.escape(password_hash)]
-
- hash_function = 'virtBootstrap.utils.passlib.hosts.linux_context.hash'
- with mock.patch(hash_function) as m_hash:
- m_hash.return_value = password_hash
- utils.set_root_password_in_image(image, password)
-
- m_execute.assert_called_once_with(expected_call)
-
- ###################################
- # Tests for: set_root_password()
- ###################################
- @mock.patch('virtBootstrap.utils.set_root_password_in_rootfs')
- def test_utils_set_root_password_dir(self, m_set_root_password_in_rootfs):
- """
- Ensures that set_root_password() calls set_root_password_in_rootfs()
- when the format is set to "dir".
- """
- fmt, dest, root_password = 'dir', 'dest', 'root_password'
- utils.set_root_password(fmt, dest, root_password)
-
- m_set_root_password_in_rootfs.assert_called_once_with(
- dest, root_password
- )
-
- @mock.patch('virtBootstrap.utils.set_root_password_in_image')
- def test_utils_set_root_password_qcow2(self, m_set_root_password_in_image):
- """
- Ensures that set_root_password() calls set_root_password_in_image()
- when the format is set to "qcow2" with the path to the last
- extracted layer.
- """
- fmt, dest, root_password = 'qcow2', 'dest', 'root_password'
- layers = ['layer-0.qcow2', 'layer-1.qcow2']
-
- with mock.patch('os.listdir') as m_listdir:
- m_listdir.return_value = layers
- utils.set_root_password(fmt, dest, root_password)
-
- m_set_root_password_in_image.assert_called_once_with(
- utils.os.path.join(dest, max(layers)),
- root_password
- )
-
- ###################################
# Tests for: write_progress()
###################################
def test_utils_write_progress_fill_terminal_width(self):
diff --git a/tests/test_virt_bootstrap.py b/tests/test_virt_bootstrap.py
index c0def7e..808dcf8 100644
--- a/tests/test_virt_bootstrap.py
+++ b/tests/test_virt_bootstrap.py
@@ -239,12 +239,12 @@ class TestVirtBootstrap(unittest.TestCase):
for kwarg in params:
self.assertEqual(called_with_kwargs[kwarg], params[kwarg])
- def test_if_bootstrap_calls_set_root_password(self):
+ def test_if_bootstrap_calls_set_root_password_in_rootfs(self):
"""
- Ensures that bootstrap() calls set_root_password() when the argument
- root_password is specified.
+ Ensures that bootstrap() calls set_root_password_in_rootfs()
+ when the argument root_password is specified.
"""
- src, fmt, dest, root_password = 'foo', 'fmt', 'bar', 'root_password'
+ src, fmt, dest, root_password = 'foo', 'dir', 'bar', 'root_password'
with mock.patch.multiple(virt_bootstrap,
get_source=mock.DEFAULT,
os=mock.DEFAULT,
@@ -258,8 +258,8 @@ class TestVirtBootstrap(unittest.TestCase):
fmt=fmt,
root_password=root_password)
- mocked['utils'].set_root_password.assert_called_once_with(
- fmt, dest, root_password)
+ (mocked['utils'].set_root_password_in_rootfs
+ .assert_called_once_with(dest, root_password))
def test_if_bootstrap_calls_set_mapping_uid_gid(self):
"""
--
2.13.4
More information about the virt-tools-list
mailing list