[virt-tools-list] [virt-bootstrap] [PATCH v7 01/26] Drop unit tests
Cedric Bosdonnat
cbosdonnat at suse.com
Mon Aug 28 13:50:48 UTC 2017
On Sat, 2017-08-26 at 21:41 +0100, Radostin Stoyanov wrote:
> Unit tests were used to ensure that functions and methods work as
> expected. However, these tests are closely related to the
> implementation and will result in major changes after some refactoring.
> To reduce the amount of work needed to add new features or changes to
> the code most of these tests will be replaced with more abstract form
> of testing introduced in the following commits.
> ---
> tests/__init__.py | 44 ++--
> tests/docker_source.py | 150 +++++++++++
> tests/test_docker_source.py | 607 -------------------------------------------
> tests/test_file_source.py | 171 ------------
> tests/test_progress.py | 112 --------
> tests/test_utils.py | 580 +----------------------------------------
> tests/test_virt_bootstrap.py | 464 ---------------------------------
> 7 files changed, 180 insertions(+), 1948 deletions(-)
> create mode 100644 tests/docker_source.py
> delete mode 100644 tests/test_docker_source.py
> delete mode 100644 tests/test_file_source.py
> delete mode 100644 tests/test_progress.py
> delete mode 100644 tests/test_virt_bootstrap.py
>
> diff --git a/tests/__init__.py b/tests/__init__.py
> index e82c6d5..1b06616 100644
> --- a/tests/__init__.py
> +++ b/tests/__init__.py
> @@ -1,22 +1,23 @@
> -"""
> - Test suite for virt-bootstrap
> -
> - Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
> -
> - Copyright (C) 2017 Radostin Stoyanov
> +# -*- coding: utf-8 -*-
> +# Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
> +#
> +# Copyright (C) 2017 Radostin Stoyanov
> +#
> +# This program is free software: you can redistribute it and/or modify
> +# it under the terms of the GNU General Public License as published by
> +# the Free Software Foundation, either version 3 of the License, or
> +# (at your option) any later version.
> +
> +# This program is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> +# GNU General Public License for more details.
> +
> +# You should have received a copy of the GNU General Public License
> +# along with this program. If not, see <http://www.gnu.org/licenses/>.
>
> - This program is free software: you can redistribute it and/or modify
> - it under the terms of the GNU General Public License as published by
> - the Free Software Foundation, either version 3 of the License, or
> - (at your option) any later version.
> -
> - This program is distributed in the hope that it will be useful,
> - but WITHOUT ANY WARRANTY; without even the implied warranty of
> - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> - GNU General Public License for more details.
> -
> - You should have received a copy of the GNU General Public License
> - along with this program. If not, see <http://www.gnu.org/licenses/>.
> +"""
> +Test suite for virt-bootstrap
> """
>
> import sys
> @@ -27,13 +28,12 @@ try:
> except ImportError:
> import unittest.mock as mock
>
> -sys.path += '../src' # noqa: E402
> +sys.path.insert(0, '../src') # noqa: E402
>
> -# pylint: disable=import-error
> +# pylint: disable=import-error, wrong-import-position
> from virtBootstrap import virt_bootstrap
> from virtBootstrap import sources
> from virtBootstrap import progress
> from virtBootstrap import utils
>
> -__all__ = ['unittest', 'mock',
> - 'virt_bootstrap', 'sources', 'progress', 'utils']
> +__all__ = ['virt_bootstrap', 'sources', 'progress', 'utils']
> diff --git a/tests/docker_source.py b/tests/docker_source.py
> new file mode 100644
> index 0000000..60404e6
> --- /dev/null
> +++ b/tests/docker_source.py
> @@ -0,0 +1,150 @@
> +# -*- coding: utf-8 -*-
> +# Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
> +#
> +# Copyright (C) 2017 Radostin Stoyanov
> +#
> +# This program is free software: you can redistribute it and/or modify
> +# it under the terms of the GNU General Public License as published by
> +# the Free Software Foundation, either version 3 of the License, or
> +# (at your option) any later version.
> +
> +# This program is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> +# GNU General Public License for more details.
> +
> +# You should have received a copy of the GNU General Public License
> +# along with this program. If not, see <http://www.gnu.org/licenses/>.
> +
> +"""
> +Tests which aim is to exercise creation of root file system with DockerSource.
> +"""
> +
> +import unittest
> +
> +from . import mock
> +from . import sources
> +
> +
> +# pylint: disable=invalid-name
> +class TestDockerSource(unittest.TestCase):
> + """
> + Unit tests for DockerSource
> + """
> + ###################################
> + # Tests for: retrieve_layers_info()
> + ###################################
> + def _mock_retrieve_layers_info(self, manifest, kwargs):
> + """
> + This method is gather common test pattern used in the following
> + two test cases which aim to return an instance of the class
> + DockerSource with some util functions being mocked.
> + """
> + with mock.patch.multiple('virtBootstrap.utils',
> + get_image_details=mock.DEFAULT,
> + get_image_dir=mock.DEFAULT) as m_utils:
> +
> + m_utils['get_image_details'].return_value = manifest
> + m_utils['get_image_dir'].return_value = '/images_path'
> +
> + patch_method = 'virtBootstrap.sources.DockerSource.gen_valid_uri'
> + with mock.patch(patch_method) as m_uri:
> + src_instance = sources.DockerSource(**kwargs)
> + return (src_instance, m_uri, m_utils)
> +
> + def test_retrieve_layers_info_pass_arguments_to_get_image_details(self):
> + """
> + Ensures that retrieve_layers_info() calls get_image_details()
> + with all passed arguments.
> + """
> + src_kwargs = {
> + 'uri': '',
> + 'progress': mock.Mock()
> + }
> +
> + manifest = {'schemaVersion': 2, 'layers': []}
> + (src_instance,
> + m_uri, m_utils) = self._mock_retrieve_layers_info(manifest,
> + src_kwargs)
> +
> + kwargs = {
> + 'insecure': src_instance.insecure,
> + 'username': src_instance.username,
> + 'password': src_instance.password,
> + 'raw': True
> + }
> + m_utils['get_image_details'].assert_called_once_with(m_uri(), **kwargs)
> +
> + def test_retrieve_layers_info_schema_version_1(self):
> + """
> + Ensures that retrieve_layers_info() extracts the layers' information
> + from manifest with schema version 1 a list with format:
> + ["digest", "sum_type", "file_path", "size"].
> + """
> + kwargs = {
> + 'uri': '',
> + 'progress': mock.Mock()
> + }
> +
> + manifest = {
> + 'schemaVersion': 1,
> + 'fsLayers': [
> + {'blobSum': 'sha256:75c416ea'},
> + {'blobSum': 'sha256:c6ff40b6'},
> + {'blobSum': 'sha256:a7050fc1'}
> + ]
> + }
> +
> + expected_result = [
> + ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', None],
> + ['sha256', 'c6ff40b6', '/images_path/c6ff40b6.tar', None],
> + ['sha256', '75c416ea', '/images_path/75c416ea.tar', None]
> + ]
> +
> + with mock.patch('os.path.getsize') as m_getsize:
> + m_getsize.return_value = None
> + src_instance = self._mock_retrieve_layers_info(manifest, kwargs)[0]
> + self.assertEqual(src_instance.layers, expected_result)
> +
> + def test_retrieve_layers_info_schema_version_2(self):
> + """
> + Ensures that retrieve_layers_info() extracts the layers' information
> + from manifest with schema version 2 a list with format:
> + ["digest", "sum_type", "file_path", "size"].
> + """
> + kwargs = {
> + 'uri': '',
> + 'progress': mock.Mock()
> + }
> +
> + manifest = {
> + 'schemaVersion': 2,
> + "layers": [
> + {"size": 47103294, "digest": "sha256:75c416ea"},
> + {"size": 814, "digest": "sha256:c6ff40b6"},
> + {"size": 513, "digest": "sha256:a7050fc1"}
> + ]
> + }
> +
> + expected_result = [
> + ['sha256', '75c416ea', '/images_path/75c416ea.tar', 47103294],
> + ['sha256', 'c6ff40b6', '/images_path/c6ff40b6.tar', 814],
> + ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', 513]
> + ]
> +
> + src_instance = self._mock_retrieve_layers_info(manifest, kwargs)[0]
> + self.assertEqual(src_instance.layers, expected_result)
> +
> + def test_retrieve_layers_info_raise_error_on_invalid_schema_version(self):
> + """
> + Ensures that retrieve_layers_info() calls get_image_details()
> + with all passed arguments.
> + """
> + kwargs = {
> + 'uri': '',
> + 'progress': mock.Mock()
> + }
> +
> + manifest = {'schemaVersion': 3}
> + with self.assertRaises(ValueError):
> + self._mock_retrieve_layers_info(manifest, kwargs)
> diff --git a/tests/test_docker_source.py b/tests/test_docker_source.py
> deleted file mode 100644
> index 4859e1b..0000000
> --- a/tests/test_docker_source.py
> +++ /dev/null
> @@ -1,607 +0,0 @@
> -# Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
> -#
> -# Copyright (C) 2017 Radostin Stoyanov
> -#
> -# This program is free software: you can redistribute it and/or modify
> -# it under the terms of the GNU General Public License as published by
> -# the Free Software Foundation, either version 3 of the License, or
> -# (at your option) any later version.
> -
> -# This program is distributed in the hope that it will be useful,
> -# but WITHOUT ANY WARRANTY; without even the implied warranty of
> -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> -# GNU General Public License for more details.
> -
> -# You should have received a copy of the GNU General Public License
> -# along with this program. If not, see <http://www.gnu.org/licenses/>.
> -
> -
> -"""
> -Unit tests for methods defined in virtBootstrap.sources.DockerSource
> -"""
> -
> -from tests import unittest
> -from tests import mock
> -from tests import sources
> -
> -try:
> - from urlparse import urlparse
> -except ImportError:
> - from urllib.parse import urlparse
> -
> -
> -# pylint: disable=invalid-name
> -# pylint: disable=too-many-public-methods
> -class TestDockerSource(unittest.TestCase):
> - """
> - Test cases for DockerSource
> - """
> - def _mock_docker_source(self):
> - """
> - This method returns an instance of Mock object
> - that acts as the specification for the DockerSource.
> - """
> - m_self = mock.Mock(spec=sources.DockerSource)
> - m_self.progress = mock.Mock()
> - m_self.no_cache = False
> - m_self.url = "docker://test"
> - m_self.images_dir = "/images_path"
> - m_self.insecure = True
> - m_self.username = 'user'
> - m_self.password = 'password'
> - m_self.layers = [
> - ['sha256', '75c416ea', '/images_path/75c416ea.tar', ''],
> - ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', '']
> - ]
> - return m_self
> -
> - ###################################
> - # Tests for: __init__()
> - ###################################
> - def test_argument_assignment(self):
> - """
> - Ensures that __init__() assigns the arguments' values to instance
> - variables.
> - """
> - kwargs = {'uri': '',
> - 'fmt': 'dir',
> - 'not_secure': False,
> - 'no_cache': False,
> - 'progress': mock.Mock(),
> - 'username': 'username',
> - 'password': 'password'}
> -
> - with mock.patch('virtBootstrap.utils'
> - '.get_image_dir') as m_get_image_dir:
> - with mock.patch.multiple('virtBootstrap.sources.DockerSource',
> - retrieve_layers_info=mock.DEFAULT,
> - gen_valid_uri=mock.DEFAULT) as mocked:
> - src_instance = sources.DockerSource(**kwargs)
> -
> - test_values = {
> - src_instance.url: mocked['gen_valid_uri'].return_value,
> - src_instance.progress: kwargs['progress'].update_progress,
> - src_instance.username: kwargs['username'],
> - src_instance.password: kwargs['password'],
> - src_instance.output_format: kwargs['fmt'],
> - src_instance.no_cache: kwargs['no_cache'],
> - src_instance.insecure: kwargs['not_secure'],
> - src_instance.images_dir: m_get_image_dir()
> - }
> - for value in test_values:
> - self.assertIs(value, test_values[value])
> -
> - def test_source_password_is_required_if_username_specifed(self):
> - """
> - Ensures that __init__() calls getpass() to request password
> - when username is specified and password is not.
> - """
> - test_password = 'secret'
> -
> - kwargs = {arg: '' for arg
> - in ['uri', 'fmt', 'not_secure', 'password', 'no_cache']}
> - kwargs['progress'] = mock.Mock()
> - kwargs['username'] = 'test'
> -
> - with mock.patch('virtBootstrap.utils.get_image_dir'):
> - with mock.patch('getpass.getpass') as m_getpass:
> - m_getpass.return_value = test_password
> - with mock.patch.multiple('virtBootstrap.sources.DockerSource',
> - retrieve_layers_info=mock.DEFAULT,
> - gen_valid_uri=mock.DEFAULT):
> - src_instance = sources.DockerSource(**kwargs)
> -
> - m_getpass.assert_called_once()
> - self.assertIs(test_password, src_instance.password)
> -
> - ###################################
> - # Tests for: retrieve_layers_info()
> - ###################################
> - def _mock_retrieve_layers_info(self, manifest, kwargs):
> - """
> - This method is gather common test pattern used in the following
> - two test cases.
> - """
> - with mock.patch.multiple('virtBootstrap.utils',
> - get_image_details=mock.DEFAULT,
> - get_image_dir=mock.DEFAULT) as m_utils:
> -
> - m_utils['get_image_details'].return_value = manifest
> - m_utils['get_image_dir'].return_value = '/images_path'
> -
> - patch_method = 'virtBootstrap.sources.DockerSource.gen_valid_uri'
> - with mock.patch(patch_method) as m_uri:
> - src_instance = sources.DockerSource(**kwargs)
> - return (src_instance, m_uri, m_utils)
> -
> - def test_retrieve_layers_info_pass_arguments_to_get_image_details(self):
> - """
> - Ensures that retrieve_layers_info() calls get_image_details()
> - with all passed arguments.
> - """
> - src_kwargs = {
> - 'uri': '',
> - 'progress': mock.Mock()
> - }
> -
> - manifest = {'schemaVersion': 2, 'layers': []}
> - (src_instance,
> - m_uri, m_utils) = self._mock_retrieve_layers_info(manifest,
> - src_kwargs)
> -
> - kwargs = {
> - 'insecure': src_instance.insecure,
> - 'username': src_instance.username,
> - 'password': src_instance.password,
> - 'raw': True
> - }
> - m_utils['get_image_details'].assert_called_once_with(m_uri(), **kwargs)
> -
> - def test_retrieve_layers_info_schema_version_1(self):
> - """
> - Ensures that retrieve_layers_info() extracts the layers' information
> - from manifest with schema version 1 a list with format:
> - ["digest", "sum_type", "file_path", "size"].
> - """
> - kwargs = {
> - 'uri': '',
> - 'progress': mock.Mock()
> - }
> -
> - manifest = {
> - 'schemaVersion': 1,
> - 'fsLayers': [
> - {'blobSum': 'sha256:75c416ea'},
> - {'blobSum': 'sha256:c6ff40b6'},
> - {'blobSum': 'sha256:a7050fc1'}
> - ]
> - }
> -
> - expected_result = [
> - ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', None],
> - ['sha256', 'c6ff40b6', '/images_path/c6ff40b6.tar', None],
> - ['sha256', '75c416ea', '/images_path/75c416ea.tar', None]
> - ]
> -
> - src_instance = self._mock_retrieve_layers_info(manifest, kwargs)[0]
> - self.assertEqual(src_instance.layers, expected_result)
> -
> - def test_retrieve_layers_info_schema_version_2(self):
> - """
> - Ensures that retrieve_layers_info() extracts the layers' information
> - from manifest with schema version 2 a list with format:
> - ["digest", "sum_type", "file_path", "size"].
> - """
> - kwargs = {
> - 'uri': '',
> - 'progress': mock.Mock()
> - }
> -
> - manifest = {
> - 'schemaVersion': 2,
> - "layers": [
> - {"size": 47103294, "digest": "sha256:75c416ea"},
> - {"size": 814, "digest": "sha256:c6ff40b6"},
> - {"size": 513, "digest": "sha256:a7050fc1"}
> - ]
> - }
> -
> - expected_result = [
> - ['sha256', '75c416ea', '/images_path/75c416ea.tar', 47103294],
> - ['sha256', 'c6ff40b6', '/images_path/c6ff40b6.tar', 814],
> - ['sha256', 'a7050fc1', '/images_path/a7050fc1.tar', 513]
> - ]
> -
> - src_instance = self._mock_retrieve_layers_info(manifest, kwargs)[0]
> - self.assertEqual(src_instance.layers, expected_result)
> -
> - def test_retrieve_layers_info_raise_error_on_invalid_schema_version(self):
> - """
> - Ensures that retrieve_layers_info() calls get_image_details()
> - with all passed arguments.
> - """
> - kwargs = {
> - 'uri': '',
> - 'progress': mock.Mock()
> - }
> -
> - manifest = {'schemaVersion': 3}
> - with self.assertRaises(ValueError):
> - self._mock_retrieve_layers_info(manifest, kwargs)
> -
> - ###################################
> - # Tests for: gen_valid_uri()
> - ###################################
> - def test_gen_valid_uri(self):
> - """
> - Validates the output of gen_valid_uri() for some test cases.
> - """
> - m_self = self._mock_docker_source()
> - test_values = {
> - 'docker:///repo': 'docker://repo',
> - 'docker:/repo': 'docker://repo',
> - 'docker://repo/': 'docker://repo',
> - 'docker://repo/image/': 'docker://repo/image',
> - 'docker:///repo/image/': 'docker://repo/image',
> - }
> - for uri in test_values:
> - uri_obj = urlparse(uri)
> - result = sources.DockerSource.gen_valid_uri(m_self, uri_obj)
> - expected = test_values[uri]
> - self.assertEqual(result, expected)
> -
> - ###################################
> - # Tests for: download_image()
> - ###################################
> - def test_download_image(self):
> - """
> - Ensures that download_image() calls read_skopeo_progress() with
> - expected skopeo copy command and removes tha leftover manifest file.
> - """
> - m_self = self._mock_docker_source()
> - m_self.read_skopeo_progress = mock.Mock()
> - manifest_path = "%s/manifest.json" % m_self.images_dir
> - with mock.patch('os.remove') as m_remove:
> - sources.DockerSource.download_image(m_self)
> -
> - expected_call = ["skopeo", "copy", m_self.url,
> - "dir:" + m_self.images_dir,
> - '--src-tls-verify=false',
> - '--src-creds={}:{}'.format(m_self.username,
> - m_self.password)]
> - m_self.read_skopeo_progress.assert_called_once_with(expected_call)
> - m_remove.assert_called_once_with(manifest_path)
> -
> - ###################################
> - # Tests for: parse_output()
> - ###################################
> - def test_parse_output_return_false_on_fail(self):
> - """
> - Ensures that parse_output() returns False when process call
> - exits with non-zero code.
> - """
> - m_self = mock.Mock(spec=sources.DockerSource)
> - m_self.layers = []
> - m_proc = mock.Mock()
> - m_proc.returncode = 1
> - self.assertFalse(sources.DockerSource.parse_output(m_self, m_proc))
> -
> - def test_parse_output(self):
> - """
> - Ensures that parse_output() recognises processing of different
> - layers from the skopeo's output.
> - """
> - m_self = self._mock_docker_source()
> - m_proc = mock.Mock()
> - m_proc.poll.return_value = None
> - m_proc.returncode = 0
> - test_values = '\n'.join([
> - 'Skipping fetch of repeat blob sha256:c6ff40',
> - 'Copying blob sha256:75c416ea735c4',
> - '40.00 MB / 44.92 MB [======================>------]',
> - 'Copying config sha256:d355ed35',
> - '40.00 MB / 44.92 MB [======================>------]'
> - ])
> -
> - expected_progress_calls = [
> - mock.call("Downloading layer (1/2)"),
> - mock.call("Downloading layer (2/2)"),
> - ]
> -
> - with mock.patch('select.select') as m_select:
> - m_select.return_value = [[test_values], [], []]
> - with mock.patch('virtBootstrap.utils.read_async') as m_read_async:
> - m_read_async.return_value = test_values
> - self.assertTrue(sources.DockerSource.parse_output(m_self,
> - m_proc))
> - m_select.assert_called_once_with([m_proc.stdout], [], [])
> - m_read_async.assert_called_once_with(test_values)
> - m_self.progress.assert_has_calls(expected_progress_calls)
> - m_self.update_progress_from_output.assert_called_once()
> - m_proc.wait.assert_called_once()
> -
> - ###################################
> - # Tests for: update_progress_from_output()
> - ###################################
> - def _mock_update_progress_from_output(self, test_values):
> - """
> - This method is gather common test pattern used in the following
> - two test cases.
> - """
> - m_self = self._mock_docker_source()
> - test_method = sources.DockerSource.update_progress_from_output
> - for line in test_values:
> - test_method(m_self, line.split(), 1, len(test_values))
> -
> - return m_self.progress.call_args_list
> -
> - def test_update_progress_from_output(self):
> - """
> - Ensures that update_progress_from_output() recognises the current
> - downloaded size, the total layer's size and calculates correct
> - percentage value.
> - """
> - test_values = [
> - '500.00 KB / 4.00 MB [======>------]',
> - '25.00 MB / 24.10 MB [======>------]',
> - '40.00 MB / 50.00 MB [======>------]',
> - ]
> - expected_values = [2, 17.33, 13.33]
> -
> - calls = self._mock_update_progress_from_output(test_values)
> - for call, expected in zip(calls, expected_values):
> - self.assertAlmostEqual(call[1]['value'], expected, places=1)
> -
> - def test_update_progress_from_output_ignore_failures(self):
> - """
> - Ensures that update_progress_from_output() ignores invalid lines
> - from skopeo's output.
> - """
> - test_values = [
> - 'a ',
> - '1 ' * 5,
> - '500.00 MB / 0.00 MB [======>------]',
> - '00.00 MB / 00.00 MB [======>------]',
> - ]
> - self._mock_update_progress_from_output(test_values)
> -
> - ###################################
> - # Tests for: read_skopeo_progress()
> - ###################################
> - def _mock_read_skopeo_progress(self, test_cmd, parse_output_return):
> - """
> - This method is gather common test pattern used in the following
> - two test cases.
> - """
> - m_self = mock.Mock(spec=sources.DockerSource)
> - m_self.parse_output.return_value = parse_output_return
> - with mock.patch.multiple('virtBootstrap.sources.'
> - 'docker_source.subprocess',
> - Popen=mock.DEFAULT,
> - PIPE=mock.DEFAULT) as mocked:
> - with mock.patch('virtBootstrap.utils.make_async') as m_make_async:
> - sources.DockerSource.read_skopeo_progress(m_self, test_cmd)
> -
> - return (mocked, m_make_async)
> -
> - def test_read_skopeo_progress(self):
> - """
> - Ensures that read_skopeo_progress() calls make_async() with
> - the stdout pipe of skopeo's process.
> - """
> - test_cmd = 'test'
> - mocked, m_make_async = self._mock_read_skopeo_progress(test_cmd, True)
> -
> - mocked['Popen'].assert_called_once_with(test_cmd,
> - stdout=mocked['PIPE'],
> - stderr=mocked['PIPE'],
> - universal_newlines=True)
> - m_make_async.assert_called_once_with(mocked['Popen']().stdout)
> -
> - def test_read_skopeo_progress_raise_error(self):
> - """
> - Ensures that read_skopeo_progress() raise CalledProcessError
> - when parse_output() returns false.
> - """
> - with self.assertRaises(sources.docker_source
> - .subprocess.CalledProcessError):
> - self._mock_read_skopeo_progress('test', False)
> -
> - ###################################
> - # Tests for: validate_image_layers()
> - ###################################
> - def _mock_validate_image_layers(self,
> - checksum_return,
> - path_exists_return,
> - expected_result,
> - check_calls=False):
> - """
> - This method is gather common test pattern used in the following
> - three test cases.
> - """
> - m_self = self._mock_docker_source()
> -
> - with mock.patch('os.path.exists') as m_path_exists:
> - with mock.patch('virtBootstrap.utils.checksum') as m_checksum:
> - m_checksum.return_value = checksum_return
> - m_path_exists.return_value = path_exists_return
> - result = sources.DockerSource.validate_image_layers(m_self)
> - self.assertEqual(result, expected_result)
> -
> - if check_calls:
> - path_exists_expected_calls = []
> - checksum_expected_calls = []
> - # Generate expected calls
> - for sum_type, hash_sum, path, _ignore in m_self.layers:
> - path_exists_expected_calls.append(mock.call(path))
> - checksum_expected_calls.append(
> - mock.call(path, sum_type, hash_sum))
> -
> - m_path_exists.assert_has_calls(path_exists_expected_calls)
> - m_checksum.assert_has_calls(checksum_expected_calls)
> -
> - def test_validate_image_layers_should_return_true(self):
> - """
> - Ensures that validate_image_layers() returns True when:
> - - checksum() returns True for all layers
> - - the file path of all layers exist
> - - all layers are validated
> - """
> - self._mock_validate_image_layers(True, True, True, True)
> -
> - def test_validate_image_layers_return_false_if_path_not_exist(self):
> - """
> - Ensures that validate_image_layers() returns False when
> - checksum() returns False.
> - """
> - self._mock_validate_image_layers(False, True, False)
> -
> - def test_validate_image_layers_return_false_if_checksum_fail(self):
> - """
> - Ensures that validate_image_layers() returns False when
> - the file path of layer does not exist.
> - """
> - self._mock_validate_image_layers(True, False, False)
> -
> - ###################################
> - # Tests for: fetch_layers()
> - ###################################
> - def _mock_fetch_layers(self, validate_return):
> - """
> - This method is gather common test pattern used in the following
> - two test cases.
> - """
> - m_self = mock.Mock(spec=sources.DockerSource)
> - m_self.validate_image_layers.return_value = validate_return
> - sources.DockerSource.fetch_layers(m_self)
> - return m_self
> -
> - def test_fetch_layers_should_call_download_image(self):
> - """
> - Ensures that fetch_layers() calls download_image()
> - when validate_image_layers() returns False.
> - """
> - m_self = self._mock_fetch_layers(False)
> - m_self.download_image.assert_called_once()
> -
> - def test_fetch_layers_should_not_call_download_image(self):
> - """
> - Ensures that fetch_layers() does not call download_image()
> - when validate_image_layers() returns True.
> - """
> - m_self = self._mock_fetch_layers(True)
> - m_self.download_image.assert_not_called()
> -
> - ###################################
> - # Tests for: unpack()
> - ###################################
> - def _unpack_test_fmt(self, output_format, patch_method=None,
> - side_effect=None, m_self=None):
> - """
> - This method is gather common test pattern used in the following
> - two test cases.
> - """
> - m_self = m_self if m_self else self._mock_docker_source()
> - m_self.output_format = output_format
> - dest = 'foo'
> -
> - if patch_method:
> - with mock.patch(patch_method) as mocked:
> - if side_effect:
> - mocked.side_effect = side_effect
> - sources.DockerSource.unpack(m_self, dest)
> -
> - mocked.assert_called_once_with(m_self.layers, dest,
> - m_self.progress)
> - else:
> - sources.DockerSource.unpack(m_self, dest)
> -
> - m_self.fetch_layers.assert_called_once()
> -
> - def test_unpack_dir_format(self):
> - """
> - Ensures that unpack() calls untar_layers() when the output format
> - is set to 'dir'.
> - """
> - self._unpack_test_fmt('dir', 'virtBootstrap.utils.untar_layers')
> -
> - def test_unpack_qcow2_format(self):
> - """
> - Ensures that unpack() calls extract_layers_in_qcow2() when the
> - output format is set to 'qcow2'.
> - """
> - self._unpack_test_fmt('qcow2',
> - 'virtBootstrap.utils.extract_layers_in_qcow2')
> -
> - def unpack_raise_error_test(self,
> - output_format,
> - patch_method,
> - side_effect=None,
> - msg=None):
> - """
> - This method is gather common test pattern used in the following
> - four test cases.
> - """
> - with self.assertRaises(Exception) as err:
> - self._unpack_test_fmt(output_format, patch_method,
> - side_effect)
> - if msg:
> - self.assertEqual(msg, str(err.exception))
> -
> - def test_unpack_raise_error_for_unknown_format(self):
> - """
> - Ensures that unpack() throws an Exception when called with
> - invalid output format.
> - """
> - msg = 'Unknown format:foo'
> - self.unpack_raise_error_test('foo', None, None, msg)
> -
> - def test_unpack_raise_error_if_untar_fail(self):
> - """
> - Ensures that unpack() throws an Exception when untar_layers()
> - fails.
> - """
> - msg = 'Caught untar failure'
> - side_effect = Exception(msg)
> - patch_method = 'virtBootstrap.utils.untar_layers'
> - self.unpack_raise_error_test('dir', patch_method, side_effect, msg)
> -
> - def test_unpack_raise_error_if_extract_in_qcow2_fail(self):
> - """
> - Ensures that unpack() throws an Exception when
> - extract_layers_in_qcow2() fails.
> - """
> - msg = 'Caught extract_layers_in_qcow2 failure'
> - side_effect = Exception(msg)
> - patch_method = 'virtBootstrap.utils.extract_layers_in_qcow2'
> - self.unpack_raise_error_test('qcow2', patch_method, side_effect, msg)
> -
> - def test_unpack_no_cache_clean_up(self):
> - """
> - Ensures that unpack() removes the folder which stores tar archives
> - of image layers when no_cache is set to True.
> - """
> - output_formats = ['dir', 'qcow2']
> - patch_methods = [
> - 'virtBootstrap.utils.untar_layers',
> - 'virtBootstrap.utils.extract_layers_in_qcow2'
> - ]
> - for fmt, patch_mthd in zip(output_formats, patch_methods):
> - m_self = self._mock_docker_source()
> - m_self.no_cache = True
> - with mock.patch('shutil.rmtree') as m_shutil:
> - self._unpack_test_fmt(fmt, patch_mthd, m_self=m_self)
> - m_shutil.assert_called_once_with(m_self.images_dir)
> -
> - def test_unpack_no_cache_clean_up_on_failure(self):
> - """
> - Ensures that unpack() removes the folder which stores tar archives
> - of image layers when no_cache is set to True and exception was
> - raised.
> - """
> - m_self = self._mock_docker_source()
> - m_self.no_cache = True
> - with self.assertRaises(Exception):
> - with mock.patch('shutil.rmtree') as m_rmtree:
> - self._unpack_test_fmt('foo', None, m_self=m_self)
> - m_rmtree.assert_called_once_with(m_self.images_dir)
> diff --git a/tests/test_file_source.py b/tests/test_file_source.py
> deleted file mode 100644
> index 6e89aa2..0000000
> --- a/tests/test_file_source.py
> +++ /dev/null
> @@ -1,171 +0,0 @@
> -# Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
> -#
> -# Copyright (C) 2017 Radostin Stoyanov
> -#
> -# This program is free software: you can redistribute it and/or modify
> -# it under the terms of the GNU General Public License as published by
> -# the Free Software Foundation, either version 3 of the License, or
> -# (at your option) any later version.
> -
> -# This program is distributed in the hope that it will be useful,
> -# but WITHOUT ANY WARRANTY; without even the implied warranty of
> -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> -# GNU General Public License for more details.
> -
> -# You should have received a copy of the GNU General Public License
> -# along with this program. If not, see <http://www.gnu.org/licenses/>.
> -
> -
> -"""
> -Unit tests for methods defined in virtBootstrap.sources.FileSource
> -"""
> -
> -from tests import unittest
> -from tests import mock
> -from tests import sources
> -
> -
> -# pylint: disable=invalid-name
> -class TestFileSource(unittest.TestCase):
> - """
> - Test cases for FileSource
> - """
> -
> - ###################################
> - # Tests for: __init__()
> - ###################################
> - def test_argument_assignment(self):
> - """
> - Ensures that __init__() assigns the arguments' values to instance
> - variables.
> - """
> - kwargs = {'uri': mock.Mock(),
> - 'fmt': 'dir',
> - 'progress': mock.Mock()}
> -
> - src_instance = sources.FileSource(**kwargs)
> -
> - test_values = {
> - src_instance.path: kwargs['uri'].path,
> - src_instance.output_format: kwargs['fmt'],
> - src_instance.progress: kwargs['progress'].update_progress
> - }
> - for value in test_values:
> - self.assertIs(value, test_values[value])
> -
> - ###################################
> - # Tests for: unpack()
> - ###################################
> - def test_unpack_invalid_source_raise_exception(self):
> - """
> - Ensures that unpack() throws an Exception when called with
> - invalid file source.
> - """
> - m_self = mock.Mock(spec=sources.FileSource)
> - m_self.path = 'foo'
> - with mock.patch('os.path.isfile') as m_isfile:
> - m_isfile.return_value = False
> - with self.assertRaises(Exception) as err:
> - sources.FileSource.unpack(m_self, 'bar')
> - self.assertIn('Invalid file source', str(err.exception))
> -
> - def test_unpack_to_dir(self):
> - """
> - Ensures that unpack() calls safe_untar() when the output format
> - is set to 'dir'.
> - """
> - m_self = mock.Mock(spec=sources.FileSource)
> - m_self.progress = mock.Mock()
> - m_self.path = 'foo'
> - m_self.output_format = 'dir'
> - dest = 'bar'
> -
> - with mock.patch('os.path.isfile') as m_isfile:
> - m_isfile.return_value = True
> - with mock.patch('virtBootstrap.utils.safe_untar') as m_untar:
> - sources.FileSource.unpack(m_self, dest)
> -
> - m_untar.assert_called_once_with(m_self.path, dest)
> -
> - def test_unpack_to_qcow2(self):
> - """
> - Ensures that unpack() calls create_qcow2() when the output
> - format is set to 'qcow2'.
> - """
> - m_self = mock.Mock(spec=sources.FileSource)
> - m_self.progress = mock.Mock()
> - m_self.path = 'foo'
> - m_self.output_format = 'qcow2'
> - dest = 'bar'
> - qcow2_file_path = 'foobar'
> -
> - with mock.patch.multiple('os.path',
> - isfile=mock.DEFAULT,
> - realpath=mock.DEFAULT) as mocked:
> -
> - mocked['isfile'].return_value = True
> - mocked['realpath'].return_value = qcow2_file_path
> - with mock.patch('virtBootstrap.utils.create_qcow2') as m_qcow2:
> - sources.FileSource.unpack(m_self, dest)
> -
> - m_qcow2.assert_called_once_with(m_self.path, qcow2_file_path)
> -
> - def _unpack_raise_error_test(self,
> - output_format,
> - side_effect=None,
> - patch_method=None,
> - msg=None):
> - """
> - This method is gather common test pattern used in the following
> - three test cases.
> - """
> - m_self = mock.Mock(spec=sources.FileSource)
> - m_self.progress = mock.Mock()
> - m_self.path = 'foo'
> - m_self.output_format = output_format
> - dest = 'bar'
> -
> - with mock.patch.multiple('os.path',
> - isfile=mock.DEFAULT,
> - realpath=mock.DEFAULT) as m_path:
> - m_path['isfile'].return_value = True
> - with self.assertRaises(Exception) as err:
> - if patch_method:
> - with mock.patch(patch_method) as mocked_method:
> - mocked_method.side_effect = side_effect
> - sources.FileSource.unpack(m_self, dest)
> - else:
> - sources.FileSource.unpack(m_self, dest)
> - if msg:
> - self.assertEqual(msg, str(err.exception))
> -
> - def test_unpack_invalid_format_raise_exception(self):
> - """
> - Ensures that unpack() throws an Exception when called with
> - invalid output format.
> - """
> - self._unpack_raise_error_test('foo', msg='Unknown format:foo')
> -
> - def test_unpack_raise_error_if_untar_fail(self):
> - """
> - Ensures that unpack() throws an Exception when safe_untar()
> - fails.
> - """
> - msg = 'Caught untar failure'
> - patch_method = 'virtBootstrap.utils.safe_untar'
> - self._unpack_raise_error_test(output_format='dir',
> - side_effect=Exception(msg),
> - patch_method=patch_method,
> - msg=msg)
> -
> - def test_unpack_raise_error_if_extract_in_qcow2_fail(self):
> - """
> - Ensures that unpack() throws an Exception when create_qcow2()
> - fails.
> - """
> - msg = 'Caught extract_layers_in_qcow2 failure'
> - patch_method = 'virtBootstrap.utils.create_qcow2'
> - self._unpack_raise_error_test(output_format='qcow2',
> - side_effect=Exception(msg),
> - patch_method=patch_method,
> - msg=msg)
> diff --git a/tests/test_progress.py b/tests/test_progress.py
> deleted file mode 100644
> index 1f609d5..0000000
> --- a/tests/test_progress.py
> +++ /dev/null
> @@ -1,112 +0,0 @@
> -# Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
> -#
> -# Copyright (C) 2017 Radostin Stoyanov
> -#
> -# This program is free software: you can redistribute it and/or modify
> -# it under the terms of the GNU General Public License as published by
> -# the Free Software Foundation, either version 3 of the License, or
> -# (at your option) any later version.
> -
> -# This program is distributed in the hope that it will be useful,
> -# but WITHOUT ANY WARRANTY; without even the implied warranty of
> -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> -# GNU General Public License for more details.
> -
> -# You should have received a copy of the GNU General Public License
> -# along with this program. If not, see <http://www.gnu.org/licenses/>.
> -
> -
> -"""
> -Unit tests for methods defined in virtBootstrap.progress
> -"""
> -
> -from tests import unittest
> -from tests import mock
> -from tests import progress
> -
> -
> -# pylint: disable=invalid-name
> -class TestFileSource(unittest.TestCase):
> - """
> - Test cases for Progress module
> - """
> -
> - ###################################
> - # Tests for: __init__()
> - ###################################
> - def test_progress_init(self):
> - """
> - Ensures that __init__() assigns the collback value to instance
> - variable and creates dictionary with 'status', 'value' keys.
> - """
> - callback = mock.Mock()
> - test_instance = progress.Progress(callback)
> - for key in ['status', 'value']:
> - self.assertIn(key, test_instance.progress)
> - self.assertIs(callback, test_instance.callback)
> -
> - ###################################
> - # Tests for: get_progress()
> - ###################################
> - def test_get_progress(self):
> - """
> - Ensures that get_progress() returns copy of the progress dictionary
> - which has the same keys and values.
> - """
> - test_instance = progress.Progress()
> - test_result = test_instance.get_progress()
> - self.assertIsNot(test_instance.progress, test_result)
> - self.assertDictEqual(test_instance.progress, test_result)
> -
> - ###################################
> - # Tests for: update_progress()
> - ###################################
> - def test_update_progress_creates_log_record(self):
> - """
> - Ensures that update_progress() creates log record with info level
> - and pass the status value as message.
> - """
> - test_instance = progress.Progress()
> - logger = mock.Mock()
> - status = "Test"
> - test_instance.update_progress(status=status, logger=logger)
> - logger.info.assert_called_once_with(status)
> -
> - def test_update_progress_update_status_and_value(self):
> - """
> - Ensures that update_progress() creates log record with info level
> - and pass the status value as message.
> - """
> - test_instance = progress.Progress()
> - test_instance.progress = {'status': '', 'value': 0}
> - new_status = 'Test'
> - new_value = 100
> - new_progress = {'status': new_status, 'value': new_value}
> - test_instance.update_progress(status=new_status, value=new_value)
> - self.assertDictEqual(test_instance.progress, new_progress)
> -
> - def test_update_progress_update_raise_logger_error(self):
> - """
> - Ensures that update_progress() raise ValueError when creating
> - log record has failed.
> - """
> - msg = 'test'
> - test_instance = progress.Progress()
> - logger = mock.Mock()
> - logger.info.side_effect = Exception(msg)
> - with self.assertRaises(ValueError) as err:
> - test_instance.update_progress(logger=logger)
> - self.assertIn(msg, str(err.exception))
> -
> - def test_update_progress_update_raise_callback_error(self):
> - """
> - Ensures that update_progress() raise ValueError when calling
> - callback failed.
> - """
> - msg = 'test'
> - callback = mock.Mock()
> - callback.side_effect = Exception(msg)
> - test_instance = progress.Progress(callback)
> - with self.assertRaises(ValueError) as err:
> - test_instance.update_progress('foo', 'bar')
> - self.assertIn(msg, str(err.exception))
> diff --git a/tests/test_utils.py b/tests/test_utils.py
> index 0b6ccc0..c2f55b5 100644
> --- a/tests/test_utils.py
> +++ b/tests/test_utils.py
> @@ -1,3 +1,4 @@
> +# -*- coding: utf-8 -*-
> # Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
> #
> # Copyright (C) 2017 Radostin Stoyanov
> @@ -19,126 +20,16 @@
> """
> Unit tests for functions defined in virtBootstrap.utils
> """
> -
> -from tests import unittest
> -from tests import mock
> -from tests import utils
> -try:
> - # pylint: disable=redefined-builtin
> - from importlib import reload
> -except ImportError:
> - pass
> +import unittest
> +from . import utils
>
>
> # pylint: disable=invalid-name
> -# pylint: disable=too-many-public-methods
> class TestUtils(unittest.TestCase):
> """
> Ensures that functions defined in the utils module of virtBootstrap
> work as expected.
> """
> -
> - ###################################
> - # Tests for: checksum()
> - ###################################
> - def test_utils_checksum_return_false_on_invalid_hash(self):
> - """
> - Ensures that checksum() returns False if the actual and expected
> - hash sum of file are not equal.
> - """
> - with mock.patch.multiple(utils,
> - open=mock.DEFAULT,
> - logger=mock.DEFAULT,
> - hashlib=mock.DEFAULT) as mocked:
> - path, sum_type, sum_expected = '/foo', 'sha256', 'bar'
> - mocked['hashlib'].sha256.hexdigest.return_value = False
> - self.assertFalse(utils.checksum(path, sum_type, sum_expected))
> -
> - def test_utils_checksum_return_false_if_file_could_not_be_opened(self):
> - """
> - Ensures that checksum() returns False if the file to be checked
> - cannot be open for read.
> - """
> - with mock.patch.multiple(utils,
> - open=mock.DEFAULT,
> - logger=mock.DEFAULT,
> - hashlib=mock.DEFAULT) as mocked:
> - mocked['open'].side_effect = IOError()
> - self.assertFalse(utils.checksum('foo', 'sha256', 'bar'))
> -
> - def test_utils_checksum_return_true_on_valid_hash(self):
> - """
> - Ensures that checksum() returns True when the actual and expected
> - hash sum of file are equal.
> - """
> - with mock.patch.multiple(utils,
> - open=mock.DEFAULT,
> - logger=mock.DEFAULT,
> - hashlib=mock.DEFAULT) as mocked:
> - path, sum_type, sum_expected = '/foo', 'sha256', 'bar'
> - mocked['hashlib'].sha256.return_value.hexdigest.return_value \
> - = sum_expected
> - self.assertTrue(utils.checksum(path, sum_type, sum_expected))
> -
> - ###################################
> - # Tests for: execute()
> - ###################################
> - def test_utils_execute_logging_on_successful_proc_call(self):
> - """
> - Ensures that execute() creates log record of cmd, stdout and stderr
> - when the exit code of process is 0.
> - """
> - with mock.patch.multiple(utils,
> - logger=mock.DEFAULT,
> - subprocess=mock.DEFAULT) as mocked:
> - cmd = ['foo']
> - output, err = 'test_out', 'test_err'
> -
> - mocked['subprocess'].Popen.return_value.returncode = 0
> - (mocked['subprocess'].Popen.return_value
> - .communicate.return_value) = (output.encode(), err.encode())
> -
> - utils.execute(cmd)
> - mocked['logger'].debug.assert_any_call("Call command:\n%s", cmd[0])
> - mocked['logger'].debug.assert_any_call("Stdout:\n%s", output)
> - mocked['logger'].debug.assert_any_call("Stderr:\n%s", err)
> -
> - def test_utils_execute_raise_error_on_unsuccessful_proc_call(self):
> - """
> - Ensures that execute() raise CalledProcessError exception when the
> - exit code of process is not 0.
> - """
> - with mock.patch('virtBootstrap.utils.subprocess.Popen') as m_popen:
> - m_popen.return_value.returncode = 1
> - m_popen.return_value.communicate.return_value = (b'output', b'err')
> - with self.assertRaises(utils.subprocess.CalledProcessError):
> - utils.execute(['foo'])
> -
> - ###################################
> - # Tests for: safe_untar()
> - ###################################
> - def test_utils_safe_untar_calls_execute(self):
> - """
> - Ensures that safe_untar() calls execute with virt-sandbox
> - command to extract source files to destination folder.
> - Test for users with EUID 0 and 1000.
> - """
> - with mock.patch('virtBootstrap.utils.os.geteuid') as m_geteuid:
> - for uid in [0, 1000]:
> - m_geteuid.return_value = uid
> - reload(utils)
> - with mock.patch('virtBootstrap.utils.execute') as m_execute:
> - src, dest = 'foo', 'bar'
> - utils.safe_untar('foo', 'bar')
> - cmd = ['virt-sandbox',
> - '-c', utils.LIBVIRT_CONN,
> - '-m', 'host-bind:/mnt=' + dest,
> - '--',
> - '/bin/tar', 'xf', src,
> - '-C', '/mnt',
> - '--exclude', 'dev/*']
> - m_execute.assert_called_once_with(cmd)
> -
> ###################################
> # Tests for: bytes_to_size()
> ###################################
> @@ -172,241 +63,6 @@ class TestUtils(unittest.TestCase):
> i += 1
>
> ###################################
> - # Tests for: log_layer_extract()
> - ###################################
> - def test_utils_log_layer_extract(self):
> - """
> - Ensures that log_layer_extract() updates the progress and creates
> - log record with debug level.
> - """
> - m_progress = mock.Mock()
> - layer = ['sum_type', 'sum_value', 'layer_file', 'layer_size']
> - with mock.patch.multiple(utils, logger=mock.DEFAULT,
> - bytes_to_size=mock.DEFAULT) as mocked:
> - utils.log_layer_extract(layer, 'foo', 'bar', m_progress)
> - mocked['bytes_to_size'].assert_called_once_with('layer_size')
> - mocked['logger'].debug.assert_called_once()
> - m_progress.assert_called_once()
> -
> - ###################################
> - # Tests for: get_mime_type()
> - ###################################
> - @mock.patch('virtBootstrap.utils.subprocess.Popen')
> - def test_utils_get_mime_type(self, m_popen):
> - """
> - Ensures that get_mime_type() returns the detected MIME type
> - of /usr/bin/file.
> - """
> - path = "foo"
> - mime = "application/x-gzip"
> - stdout = ('%s: %s' % (path, mime)).encode()
> - m_popen.return_value.stdout.read.return_value = stdout
> - self.assertEqual(utils.get_mime_type(path), mime)
> - m_popen.assert_called_once_with(
> - ["/usr/bin/file", "--mime-type", path],
> - stdout=utils.subprocess.PIPE
> - )
> -
> - ###################################
> - # Tests for: untar_layers()
> - ###################################
> - def test_utils_untar_all_layers_in_order(self):
> - """
> - Ensures that untar_layers() iterates through all passed layers
> - in order.
> - """
> - layers = ['l1', 'l2', 'l3']
> - layers_list = [['', '', layer] for layer in layers]
> - dest_dir = '/foo'
> - expected_calls = [mock.call(layer, dest_dir) for layer in layers]
> - with mock.patch.multiple(utils,
> - safe_untar=mock.DEFAULT,
> - log_layer_extract=mock.DEFAULT) as mocked:
> - utils.untar_layers(layers_list, dest_dir, mock.Mock())
> - mocked['safe_untar'].assert_has_calls(expected_calls)
> -
> - ###################################
> - # Tests for: create_qcow2()
> - ###################################
> - def _apply_test_to_create_qcow2(self, expected_calls, *args):
> - """
> - This method contains common test pattern used in the next two
> - test cases.
> - """
> - with mock.patch.multiple(utils,
> - execute=mock.DEFAULT,
> - logger=mock.DEFAULT,
> - get_mime_type=mock.DEFAULT) as mocked:
> - mocked['get_mime_type'].return_value = 'application/x-gzip'
> - utils.create_qcow2(*args)
> - mocked['execute'].assert_has_calls(expected_calls)
> -
> - def test_utils_create_qcow2_base_layer(self):
> - """
> - Ensures that create_qcow2() creates base layer when
> - backing_file = None.
> - """
> - tar_file = 'foo'
> - layer_file = 'bar'
> - size = '5G'
> - backing_file = None
> -
> - expected_calls = [
> - mock.call(["qemu-img", "create", "-f", "qcow2", layer_file, size]),
> -
> - mock.call(['virt-format',
> - '--format=qcow2',
> - '--partition=none',
> - '--filesystem=ext3',
> - '-a', layer_file]),
> -
> - mock.call(['guestfish',
> - '-a', layer_file,
> - '-m', '/dev/sda',
> - 'tar-in', tar_file, '/', 'compress:gzip'])
> - ]
> -
> - self._apply_test_to_create_qcow2(expected_calls, tar_file, layer_file,
> - backing_file, size)
> -
> - def test_utils_create_qcow2_layer_with_backing_chain(self):
> - """
> - Ensures that create_qcow2() creates new layer with backing chains
> - when backing_file is specified.
> - """
> - tar_file = 'foo'
> - layer_file = 'bar'
> - backing_file = 'base'
> - size = '5G'
> -
> - expected_calls = [
> - mock.call(['qemu-img', 'create',
> - '-b', backing_file,
> - '-f', 'qcow2',
> - layer_file, size]),
> -
> - mock.call(['guestfish',
> - '-a', layer_file,
> - '-m', '/dev/sda',
> - 'tar-in', tar_file, '/', 'compress:gzip'])
> - ]
> -
> - self._apply_test_to_create_qcow2(expected_calls, tar_file, layer_file,
> - backing_file, size)
> -
> - ###################################
> - # Tests for: extract_layers_in_qcow2()
> - ###################################
> - def test_utils_if_all_layers_extracted_in_order_in_qcow2(self):
> - """
> - Ensures that extract_layers_in_qcow2() iterates through all
> - layers in order.
> - """
> - layers = ['l1', 'l2', 'l3']
> - layers_list = [['', '', layer] for layer in layers]
> - dest_dir = '/foo'
> -
> - # Generate expected calls
> - expected_calls = []
> - qcow2_backing_file = None
> - for index, layer in enumerate(layers):
> - qcow2_layer_file = dest_dir + "/layer-%s.qcow2" % index
> - expected_calls.append(
> - mock.call(layer, qcow2_layer_file, qcow2_backing_file))
> - qcow2_backing_file = qcow2_layer_file
> -
> - # Mocking out and execute
> - with mock.patch.multiple(utils,
> - create_qcow2=mock.DEFAULT,
> - log_layer_extract=mock.DEFAULT) as mocked:
> - utils.extract_layers_in_qcow2(layers_list, dest_dir, mock.Mock())
> -
> - # Check actual calls
> - mocked['create_qcow2'].assert_has_calls(expected_calls)
> -
> - ###################################
> - # Tests for: get_image_dir()
> - ###################################
> - def test_utils_getimage_dir(self):
> - """
> - Ensures that get_image_dir() returns path to DEFAULT_IMG_DIR
> - if the no_cache argument is set to False and create it if
> - does not exist.
> - """
> - # Perform this test for UID 0 and 1000
> - for uid in [0, 1000]:
> - with mock.patch('os.geteuid') as m_geteuid:
> - m_geteuid.return_value = uid
> - reload(utils)
> - with mock.patch('os.makedirs') as m_makedirs:
> - with mock.patch('os.path.exists') as m_path_exists:
> - m_path_exists.return_value = False
> - self.assertEqual(utils.get_image_dir(False),
> - utils.DEFAULT_IMG_DIR)
> - m_makedirs.assert_called_once_with(utils.DEFAULT_IMG_DIR)
> -
> - @mock.patch('tempfile.mkdtemp')
> - def test_utils_getimage_dir_no_cache(self, m_mkdtemp):
> - """
> - Ensures that get_image_dir() returns temporary file path created
> - by tempfile.mkdtemp.
> - """
> - m_mkdtemp.return_value = 'foo'
> - self.assertEqual(utils.get_image_dir(True), 'foo')
> - m_mkdtemp.assert_called_once()
> -
> - ###################################
> - # Tests for: get_image_details()
> - ###################################
> - @mock.patch('virtBootstrap.utils.subprocess.Popen')
> - def test_utils_get_image_details_raise_error_on_fail(self, m_popen):
> - """
> - Ensures that get_image_details() throws ValueError exception
> - when stderr from skopeo is provided.
> - """
> - src = 'docker://foo'
> - m_popen.return_value.communicate.return_value = [b'', b'Error']
> - with self.assertRaises(ValueError):
> - utils.get_image_details(src)
> -
> - @mock.patch('virtBootstrap.utils.subprocess.Popen')
> - def test_utils_get_image_details_return_json_obj_on_success(self, m_popen):
> - """
> - Ensures that get_image_details() returns python dictionary which
> - represents the data provided from stdout of skopeo when stderr
> - is not present.
> - """
> - src = 'docker://foo'
> - json_dict = {'foo': 'bar'}
> - stdout = utils.json.dumps(json_dict).encode()
> - m_popen.return_value.communicate.return_value = [stdout, '']
> - self.assertDictEqual(utils.get_image_details(src), json_dict)
> -
> - def test_utils_get_image_details_all_argument_passed(self):
> - """
> - Ensures that get_image_details() pass all argument values to
> - skopeo inspect.
> - """
> - src = 'docker://foo'
> - raw, insecure = True, True
> - username, password = 'user', 'password'
> - cmd = ['skopeo', 'inspect', src,
> - '--raw',
> - '--tls-verify=false',
> - "--creds=%s:%s" % (username, password)]
> -
> - with mock.patch.multiple(utils.subprocess,
> - Popen=mock.DEFAULT,
> - PIPE=mock.DEFAULT) as mocked:
> - mocked['Popen'].return_value.communicate.return_value = [b'{}',
> - b'']
> - utils.get_image_details(src, raw, insecure, username, password)
> -
> - mocked['Popen'].assert_called_once_with(cmd,
> - stdout=mocked['PIPE'],
> - stderr=mocked['PIPE'])
> -
> - ###################################
> # Tests for: is_new_layer_message()
> ###################################
> def test_utils_is_new_layer_message(self):
> @@ -459,10 +115,11 @@ class TestUtils(unittest.TestCase):
> Ensures that make_async() sets O_NONBLOCK flag on PIPE.
> """
>
> - pipe = utils.subprocess.Popen(
> + proc = utils.subprocess.Popen(
> ["echo"],
> stdout=utils.subprocess.PIPE
> - ).stdout
> + )
> + pipe = proc.stdout
>
> fd = pipe.fileno()
> F_GETFL = utils.fcntl.F_GETFL
> @@ -471,36 +128,8 @@ class TestUtils(unittest.TestCase):
> self.assertFalse(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK)
> utils.make_async(fd)
> self.assertTrue(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK)
> -
> - ###################################
> - # Tests for: read_async()
> - ###################################
> - def test_utils_read_async_successful_read(self):
> - """
> - Ensures that read_async() calls read() of passed file descriptor.
> - """
> - m_fd = mock.MagicMock()
> - utils.read_async(m_fd)
> - m_fd.read.assert_called_once()
> -
> - def test_utils_read_async_return_empty_str_on_EAGAIN_error(self):
> - """
> - Ensures that read_async() ignores EAGAIN errors and returns
> - empty string.
> - """
> - m_fd = mock.MagicMock()
> - m_fd.read.side_effect = IOError(utils.errno.EAGAIN, '')
> - self.assertEqual(utils.read_async(m_fd), '')
> -
> - def test_utils_read_async_raise_errors(self):
> - """
> - Ensures that read_async() does not ignore IOError which is different
> - than EAGAIN and throws an exception.
> - """
> - m_fd = mock.MagicMock()
> - m_fd.read.side_effect = IOError()
> - with self.assertRaises(IOError):
> - utils.read_async(m_fd)
> + proc.wait()
> + pipe.close()
>
> ###################################
> # Tests for: str2float()
> @@ -512,196 +141,3 @@ class TestUtils(unittest.TestCase):
> test_values = {'1': 1.0, 'test': None, '0': 0.0, '1.25': 1.25}
> for test in test_values:
> self.assertEqual(utils.str2float(test), test_values[test])
> -
> - ###################################
> - # Tests for: set_root_password_in_rootfs()
> - ###################################
> - def test_utils_set_root_password_in_rootfs_restore_permissions(self):
> - """
> - Ensures that set_root_password_in_rootfs() restore shadow
> - file permissions after edit.
> - """
> - permissions = 700
> - rootfs_path = '/foo'
> - shadow_file = '%s/etc/shadow' % rootfs_path
> -
> - m_open = mock.mock_open(read_data='')
> - with mock.patch('virtBootstrap.utils.open', m_open, create=True):
> - with mock.patch('virtBootstrap.utils.os') as m_os:
> - m_os.stat.return_value = [permissions]
> - m_os.path.join.return_value = shadow_file
> - utils.set_root_password_in_rootfs(rootfs_path, 'password')
> -
> - expected_calls = [
> - mock.call.path.join(rootfs_path, 'etc/shadow'),
> - mock.call.stat(shadow_file),
> - mock.call.chmod(shadow_file, 438),
> - mock.call.chmod(shadow_file, permissions)
> - ]
> - m_os.assert_has_calls(expected_calls)
> -
> - def test_utils_set_root_password_in_rootfs_restore_permissions_fail(self):
> - """
> - Ensures that set_root_password_in_rootfs() restore shadow file
> - permissions in case of failure.
> - """
> - permissions = 700
> - rootfs_path = '/foo'
> - shadow_file = '%s/etc/shadow' % rootfs_path
> -
> - m_open = mock.mock_open(read_data='')
> - with mock.patch('virtBootstrap.utils.open', m_open, create=True):
> - with mock.patch('virtBootstrap.utils.os') as m_os:
> - m_os.stat.return_value = [permissions]
> - m_os.path.join.return_value = shadow_file
> -
> - with self.assertRaises(Exception):
> - m_open.side_effect = Exception
> - utils.set_root_password_in_rootfs(rootfs_path, 'password')
> -
> - expected_calls = [
> - mock.call.path.join(rootfs_path, 'etc/shadow'),
> - mock.call.stat(shadow_file),
> - mock.call.chmod(shadow_file, 438),
> - mock.call.chmod(shadow_file, permissions)
> - ]
> - m_os.assert_has_calls(expected_calls)
> -
> - def test_utils_set_root_password_in_rootfs_store_hash(self):
> - """
> - Ensures that set_root_password_in_rootfs() stores the hashed
> - root password in shadow file.
> - """
> - rootfs_path = '/foo'
> - password = 'secret'
> - initial_value = '!locked'
> - hashed_password = 'hashed_password'
> - shadow_content = '\n'.join([
> - "root:%s::0:99999:7:::",
> - "bin:*:17004:0:99999:7:::"
> - "daemon:*:17004:0:99999:7:::",
> - "adm:*:17004:0:99999:7:::"
> - ])
> -
> - m_open = mock.mock_open(read_data=shadow_content % initial_value)
> - with mock.patch('virtBootstrap.utils.open', m_open, create=True):
> - with mock.patch('virtBootstrap.utils.os'):
> - with mock.patch('passlib.hosts.linux_context.hash') as m_hash:
> - m_hash.return_value = hashed_password
> - utils.set_root_password_in_rootfs(rootfs_path, password)
> -
> - m_hash.assert_called_once_with(password)
> - m_open().write.assert_called_once_with(shadow_content
> - % hashed_password)
> -
> - ###################################
> - # Tests for: set_root_password_in_image()
> - ###################################
> - @mock.patch('virtBootstrap.utils.execute')
> - def test_utils_set_root_password_in_image(self, m_execute):
> - """
> - Ensures that set_root_password_in_image() calls virt-edit
> - with correct arguments.
> - """
> - image, password = 'foo', 'password'
> - password_hash = ('$6$rounds=656000$PaQ/H4c/k8Ix9YOM$'
> - 'cyD47r9PtAE2LhnkpdbVzsiQbM0/h2S/1Bv'
> - 'u/sXqUtCg.3Ijp7TQy/8tEVstxMy5k5v4mh'
> - 'CGFqnVv7S6wd.Ah/')
> -
> - expected_call = [
> - 'virt-edit',
> - '-a', image, '/etc/shadow',
> - '-e', 's,^root:.*?:,root:%s:,' % utils.re.escape(password_hash)]
> -
> - hash_function = 'virtBootstrap.utils.passlib.hosts.linux_context.hash'
> - with mock.patch(hash_function) as m_hash:
> - m_hash.return_value = password_hash
> - utils.set_root_password_in_image(image, password)
> -
> - m_execute.assert_called_once_with(expected_call)
> -
> - ###################################
> - # Tests for: set_root_password()
> - ###################################
> - @mock.patch('virtBootstrap.utils.set_root_password_in_rootfs')
> - def test_utils_set_root_password_dir(self, m_set_root_password_in_rootfs):
> - """
> - Ensures that set_root_password() calls set_root_password_in_rootfs()
> - when the format is set to "dir".
> - """
> - fmt, dest, root_password = 'dir', 'dest', 'root_password'
> - utils.set_root_password(fmt, dest, root_password)
> -
> - m_set_root_password_in_rootfs.assert_called_once_with(
> - dest, root_password
> - )
> -
> - @mock.patch('virtBootstrap.utils.set_root_password_in_image')
> - def test_utils_set_root_password_qcow2(self, m_set_root_password_in_image):
> - """
> - Ensures that set_root_password() calls set_root_password_in_image()
> - when the format is set to "qcow2" with the path to the last
> - extracted layer.
> - """
> - fmt, dest, root_password = 'qcow2', 'dest', 'root_password'
> - layers = ['layer-0.qcow2', 'layer-1.qcow2']
> -
> - with mock.patch('os.listdir') as m_listdir:
> - m_listdir.return_value = layers
> - utils.set_root_password(fmt, dest, root_password)
> -
> - m_set_root_password_in_image.assert_called_once_with(
> - utils.os.path.join(dest, max(layers)),
> - root_password
> - )
> -
> - ###################################
> - # Tests for: write_progress()
> - ###################################
> - def test_utils_write_progress_fill_terminal_width(self):
> - """
> - Ensures that write_progress() outputs a message with length
> - equal to terminal width and last symbol '\r'.
> - """
> - terminal_width = 120
> - prog = {'status': 'status', 'value': 0}
> - with mock.patch.multiple(utils,
> - subprocess=mock.DEFAULT,
> - sys=mock.DEFAULT) as mocked:
> -
> - (mocked['subprocess'].Popen.return_value.stdout
> - .read.return_value) = ("20 %s" % terminal_width).encode()
> -
> - utils.write_progress(prog)
> -
> - mocked['subprocess'].Popen.assert_called_once_with(
> - ["stty", "size"],
> - stdout=mocked['subprocess'].PIPE
> - )
> - output_message = mocked['sys'].stdout.write.call_args[0][0]
> - mocked['sys'].stdout.write.assert_called_once()
> - self.assertEqual(len(output_message), terminal_width + 1)
> - self.assertEqual(output_message[-1], '\r')
> -
> - def test_utils_write_progress_use_default_term_width_on_failure(self):
> - """
> - Ensures that write_progress() outputs a message with length equal
> - to default terminal width (80) when the detecting terminal width
> - has failed.
> - """
> - default_terminal_width = 80
> - prog = {'status': 'status', 'value': 0}
> - with mock.patch.multiple(utils,
> - subprocess=mock.DEFAULT,
> - sys=mock.DEFAULT) as mocked:
> - mocked['subprocess'].Popen.side_effect = Exception()
> - utils.write_progress(prog)
> -
> - self.assertEqual(len(mocked['sys'].stdout.write.call_args[0][0]),
> - default_terminal_width + 1)
> - mocked['sys'].stdout.write.assert_called_once()
> -
> -
> -if __name__ == '__main__':
> - unittest.main(exit=False)
> diff --git a/tests/test_virt_bootstrap.py b/tests/test_virt_bootstrap.py
> deleted file mode 100644
> index ff744f7..0000000
> --- a/tests/test_virt_bootstrap.py
> +++ /dev/null
> @@ -1,464 +0,0 @@
> -# Authors: Radostin Stoyanov <rstoyanov1 at gmail.com>
> -#
> -# Copyright (C) 2017 Radostin Stoyanov
> -#
> -# This program is free software: you can redistribute it and/or modify
> -# it under the terms of the GNU General Public License as published by
> -# the Free Software Foundation, either version 3 of the License, or
> -# (at your option) any later version.
> -
> -# This program is distributed in the hope that it will be useful,
> -# but WITHOUT ANY WARRANTY; without even the implied warranty of
> -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> -# GNU General Public License for more details.
> -
> -# You should have received a copy of the GNU General Public License
> -# along with this program. If not, see <http://www.gnu.org/licenses/>.
> -
> -
> -"""
> -Unit tests for functions defined in virtBootstrap.virt-bootstrap
> -"""
> -
> -from tests import unittest
> -from tests import mock
> -from tests import virt_bootstrap
> -from tests import sources
> -
> -
> -# pylint: disable=invalid-name
> -class TestVirtBootstrap(unittest.TestCase):
> - """
> - Test cases for virt_bootstrap module
> - """
> -
> - ###################################
> - # Tests for: get_source(source_type)
> - ###################################
> - def test_get_invaid_source_type_should_fail(self):
> - """
> - Ensures that get_source() throws an Exception when invalid source
> - name was specified.
> - """
> - with self.assertRaises(Exception) as source:
> - virt_bootstrap.get_source('invalid')
> - self.assertIn('invalid', str(source.exception))
> -
> - def test_get_docker_source(self):
> - """
> - Ensures that get_source() returns DockerSource when source name
> - "docker" is requested.
> - """
> - self.assertIs(virt_bootstrap.get_source('docker'),
> - sources.DockerSource)
> -
> - def test_get_file_source(self):
> - """
> - Ensures that get_source() returns FileSource when source name
> - "file" is requested.
> - """
> - self.assertIs(virt_bootstrap.get_source('file'),
> - sources.FileSource)
> -
> - ###################################
> - # Tests for: mapping_uid_gid()
> - ###################################
> - def test_mapping_uid_gid(self):
> - """
> - Ensures that mapping_uid_gid() calls map_id() with
> - correct parameters.
> - """
> - dest = '/path'
> - calls = [
> - { # Call 1
> - 'dest': dest,
> - 'uid': [[0, 1000, 10]],
> - 'gid': [[0, 1000, 10]]
> - },
> - { # Call 2
> - 'dest': dest,
> - 'uid': [],
> - 'gid': [[0, 1000, 10]]
> - },
> - { # Call 3
> - 'dest': dest,
> - 'uid': [[0, 1000, 10]],
> - 'gid': []
> - },
> - { # Call 4
> - 'dest': dest,
> - 'uid': [[0, 1000, 10], [500, 500, 10]],
> - 'gid': [[0, 1000, 10]]
> - }
> - ]
> -
> - expected_calls = [
> - # Expected from call 1
> - mock.call(dest, [0, 1000, 10], [0, 1000, 10]),
> - # Expected from call 2
> - mock.call(dest, None, [0, 1000, 10]),
> - # Expected from call 3
> - mock.call(dest, [0, 1000, 10], None),
> - # Expected from call 4
> - mock.call(dest, [0, 1000, 10], [0, 1000, 10]),
> - mock.call(dest, [500, 500, 10], None)
> -
> - ]
> - with mock.patch('virtBootstrap.virt_bootstrap.map_id') as m_map_id:
> - for args in calls:
> - virt_bootstrap.mapping_uid_gid(args['dest'],
> - args['uid'],
> - args['gid'])
> -
> - m_map_id.assert_has_calls(expected_calls)
> -
> - ###################################
> - # Tests for: map_id()
> - ###################################
> - @mock.patch('os.path.realpath')
> - def test_map_id(self, m_realpath):
> - """
> - Ensures that the UID/GID mapping applies to all files
> - and directories in root file system.
> - """
> - root_path = '/root'
> - files = ['foo1', 'foo2']
> - m_realpath.return_value = root_path
> -
> - map_uid = [0, 1000, 10]
> - map_gid = [0, 1000, 10]
> - new_id = 'new_id'
> -
> - expected_calls = [
> - mock.call('/root', new_id, new_id),
> - mock.call('/root/foo1', new_id, new_id),
> - mock.call('/root/foo2', new_id, new_id)
> - ]
> -
> - with mock.patch.multiple('os',
> - lchown=mock.DEFAULT,
> - lstat=mock.DEFAULT,
> - walk=mock.DEFAULT) as mocked:
> -
> - mocked['walk'].return_value = [(root_path, [], files)]
> - mocked['lstat']().st_uid = map_uid[0]
> - mocked['lstat']().st_gid = map_gid[0]
> -
> - get_map_id = 'virtBootstrap.virt_bootstrap.get_map_id'
> - with mock.patch(get_map_id) as m_get_map_id:
> - m_get_map_id.return_value = new_id
> - virt_bootstrap.map_id(root_path, map_uid, map_gid)
> -
> - mocked['lchown'].assert_has_calls(expected_calls)
> -
> - ###################################
> - # Tests for: get_mapping_opts()
> - ###################################
> - def test_get_mapping_opts(self):
> - """
> - Ensures that get_mapping_opts() returns correct options for
> - mapping value.
> - """
> - test_values = [
> - {
> - 'mapping': [0, 1000, 10],
> - 'expected_result': {'first': 0, 'last': 10, 'offset': 1000},
> - },
> - {
> - 'mapping': [0, 1000, 10],
> - 'expected_result': {'first': 0, 'last': 10, 'offset': 1000},
> - },
> - {
> - 'mapping': [500, 1500, 1],
> - 'expected_result': {'first': 500, 'last': 501, 'offset': 1000},
> - },
> - {
> - 'mapping': [-1, -1, -1],
> - 'expected_result': {'first': 0, 'last': 1, 'offset': 0},
> - }
> - ]
> -
> - for test in test_values:
> - res = virt_bootstrap.get_mapping_opts(test['mapping'])
> - self.assertEqual(test['expected_result'], res)
> -
> - ###################################
> - # Tests for: get_map_id()
> - ###################################
> - def test_get_map_id(self):
> - """
> - Ensures that get_map_id() returns correct UID/GID mapping value.
> - """
> - test_values = [
> - {
> - 'old_id': 0,
> - 'mapping': [0, 1000, 10],
> - 'expected_result': 1000
> - },
> - {
> - 'old_id': 5,
> - 'mapping': [0, 500, 10],
> - 'expected_result': 505
> - },
> - {
> - 'old_id': 10,
> - 'mapping': [0, 100, 10],
> - 'expected_result': -1
> - },
> - ]
> - for test in test_values:
> - opts = virt_bootstrap.get_mapping_opts(test['mapping'])
> - res = virt_bootstrap.get_map_id(test['old_id'], opts)
> - self.assertEqual(test['expected_result'], res)
> -
> - ###################################
> - # Tests for: parse_idmap()
> - ###################################
> - def test_parse_idmap(self):
> - """
> - Ensures that parse_idmap() returns correct UID/GID mapping value.
> - """
> - test_values = [
> - {
> - 'mapping': ['0:1000:10', '0:100:10'],
> - 'expected_result': [[0, 1000, 10], [0, 100, 10]],
> - },
> - {
> - 'mapping': ['0:1000:10'],
> - 'expected_result': [[0, 1000, 10]],
> - },
> - {
> - 'mapping': ['500:1500:1'],
> - 'expected_result': [[500, 1500, 1]],
> - },
> - {
> - 'mapping': ['-1:-1:-1'],
> - 'expected_result': [[-1, -1, -1]],
> - },
> - {
> - 'mapping': [],
> - 'expected_result': None,
> - }
> - ]
> - for test in test_values:
> - res = virt_bootstrap.parse_idmap(test['mapping'])
> - self.assertEqual(test['expected_result'], res)
> -
> - def test_parse_idmap_raise_exception_on_invalid_mapping_value(self):
> - """
> - Ensures that parse_idmap() raise ValueError on mapping value.
> - """
> - with self.assertRaises(ValueError):
> - virt_bootstrap.parse_idmap(['invalid'])
> -
> - ###################################
> - # Tests for: bootstrap()
> - ###################################
> - def test_bootsrap_creates_directory_if_does_not_exist(self):
> - """
> - Ensures that bootstrap() creates destination directory if
> - it does not exists.
> - """
> - src, dest = 'foo', 'bar'
> - with mock.patch.multiple(virt_bootstrap,
> - get_source=mock.DEFAULT,
> - os=mock.DEFAULT) as mocked:
> - mocked['os'].path.exists.return_value = False
> - virt_bootstrap.bootstrap(src, dest)
> - mocked['os'].path.exists.assert_called_once_with(dest)
> - mocked['os'].makedirs.assert_called_once_with(dest)
> -
> - def test_bootstrap_exit_if_dest_is_invalid(self):
> - """
> - Ensures that bootstrap() exits with code 1 when the destination
> - path exists but it is not directory.
> - """
> - src, dest = 'foo', 'bar'
> - with mock.patch.multiple(virt_bootstrap,
> - get_source=mock.DEFAULT,
> - os=mock.DEFAULT,
> - logger=mock.DEFAULT,
> - sys=mock.DEFAULT) as mocked:
> - mocked['os'].path.exists.return_value = True
> - mocked['os'].path.isdir.return_value = False
> - virt_bootstrap.bootstrap(src, dest)
> - mocked['os'].path.isdir.assert_called_once_with(dest)
> - mocked['sys'].exit.assert_called_once_with(1)
> -
> - def test_bootsrap_exit_if_no_write_access_on_dest(self):
> - """
> - Ensures that bootstrap() exits with code 1 when the current user
> - has not write permissions on the destination folder.
> - """
> - src, dest = 'foo', 'bar'
> - with mock.patch.multiple(virt_bootstrap,
> - get_source=mock.DEFAULT,
> - os=mock.DEFAULT,
> - logger=mock.DEFAULT,
> - sys=mock.DEFAULT) as mocked:
> - mocked['os'].path.exists.return_value = True
> - mocked['os'].path.isdir.return_value = True
> - mocked['os'].access.return_value = False
> - virt_bootstrap.bootstrap(src, dest)
> - mocked['os'].access.assert_called_once_with(dest,
> - mocked['os'].W_OK)
> - mocked['sys'].exit.assert_called_once_with(1)
> -
> - def test_bootstrap_use_file_source_if_none_was_specified(self):
> - """
> - Ensures that bootstrap() calls get_source() with argument
> - 'file' when source format is not specified.
> - """
> - src, dest = 'foo', 'bar'
> - with mock.patch.multiple(virt_bootstrap,
> - get_source=mock.DEFAULT,
> - os=mock.DEFAULT,
> - sys=mock.DEFAULT) as mocked:
> - virt_bootstrap.bootstrap(src, dest)
> - mocked['get_source'].assert_called_once_with('file')
> -
> - def test_bootstrap_successful_call(self):
> - """
> - Ensures that bootstrap() creates source instance and calls the
> - unpack method with destination path as argument.
> - """
> - src, dest = 'foo', 'bar'
> - with mock.patch.multiple(virt_bootstrap,
> - get_source=mock.DEFAULT,
> - os=mock.DEFAULT,
> - sys=mock.DEFAULT) as mocked:
> - mocked['os'].path.exists.return_value = True
> - mocked['os'].path.isdir.return_value = True
> - mocked['os'].access.return_value = True
> - mocked_source = mock.Mock()
> - mocked_unpack = mock.Mock()
> - mocked_source.return_value.unpack = mocked_unpack
> - mocked['get_source'].return_value = mocked_source
> - virt_bootstrap.bootstrap(src, dest)
> - # sys.exit should not be called
> - mocked['sys'].exit.assert_not_called()
> - mocked_source.assert_called_once()
> - mocked_unpack.assert_called_once_with(dest)
> -
> - def test_bootstrap_all_params_are_passed_to_source_instance(self):
> - """
> - Ensures that bootstrap() is passing all arguments to newly created
> - source instance.
> - """
> - params_list = ['dest', 'fmt', 'username', 'password', 'root_password',
> - 'not_secure', 'no_cache', 'progress_cb']
> - params = {param: param for param in params_list}
> -
> - for kw_param in params_list:
> - params[kw_param] = kw_param
> -
> - with mock.patch.multiple(virt_bootstrap,
> - get_source=mock.DEFAULT,
> - os=mock.DEFAULT,
> - urlparse=mock.DEFAULT,
> - progress=mock.DEFAULT,
> - utils=mock.DEFAULT,
> - sys=mock.DEFAULT) as mocked:
> - mocked['os'].path.exists.return_value = True
> - mocked['os'].path.isdir.return_value = True
> - mocked['os'].access.return_value = True
> -
> - mocked['progress'].Progress.return_value = params['progress_cb']
> -
> - mocked_source = mock.Mock()
> - mocked_unpack = mock.Mock()
> - mocked_source.return_value.unpack = mocked_unpack
> - mocked['get_source'].return_value = mocked_source
> -
> - mocked_uri = mock.Mock()
> - mocked['urlparse'].return_value = mocked_uri
> - params['uri'] = mocked_uri
> -
> - virt_bootstrap.bootstrap(**params)
> - # sys.exit should not be called
> - mocked['sys'].exit.assert_not_called()
> -
> - mocked_source.assert_called_once()
> - mocked_unpack.assert_called_once_with(params['dest'])
> -
> - called_with_args, called_with_kwargs = mocked_source.call_args
> - self.assertEqual(called_with_args, ())
> -
> - del params['dest']
> - del params['root_password']
> - params['progress'] = params.pop('progress_cb')
> - for kwarg in params:
> - self.assertEqual(called_with_kwargs[kwarg], params[kwarg])
> -
> - def test_if_bootstrap_calls_set_root_password(self):
> - """
> - Ensures that bootstrap() calls set_root_password() when the argument
> - root_password is specified.
> - """
> - src, fmt, dest, root_password = 'foo', 'fmt', 'bar', 'root_password'
> - with mock.patch.multiple(virt_bootstrap,
> - get_source=mock.DEFAULT,
> - os=mock.DEFAULT,
> - utils=mock.DEFAULT,
> - sys=mock.DEFAULT) as mocked:
> - mocked['os'].path.exists.return_value = True
> - mocked['os'].path.isdir.return_value = True
> - mocked['os'].access.return_value = True
> -
> - virt_bootstrap.bootstrap(src, dest,
> - fmt=fmt,
> - root_password=root_password)
> -
> - mocked['utils'].set_root_password.assert_called_once_with(
> - fmt, dest, root_password)
> -
> - def test_if_bootstrap_calls_set_mapping_uid_gid(self):
> - """
> - Ensures that bootstrap() calls mapping_uid_gid() when the argument
> - uid_map or gid_map is specified.
> - """
> - src, dest, uid_map, gid_map = 'foo', 'bar', 'id', 'id'
> - expected_calls = [
> - mock.call('bar', None, 'id'),
> - mock.call('bar', 'id', None),
> - mock.call('bar', 'id', 'id')
> - ]
> -
> - with mock.patch.multiple(virt_bootstrap,
> - get_source=mock.DEFAULT,
> - os=mock.DEFAULT,
> - mapping_uid_gid=mock.DEFAULT,
> - utils=mock.DEFAULT,
> - sys=mock.DEFAULT) as mocked:
> - mocked['os'].path.exists.return_value = True
> - mocked['os'].path.isdir.return_value = True
> - mocked['os'].access.return_value = True
> -
> - virt_bootstrap.bootstrap(src, dest, gid_map=gid_map)
> - virt_bootstrap.bootstrap(src, dest, uid_map=uid_map)
> - virt_bootstrap.bootstrap(src, dest,
> - uid_map=uid_map, gid_map=gid_map)
> - mocked['mapping_uid_gid'].assert_has_calls(expected_calls)
> -
> - ###################################
> - # Tests for: set_logging_conf()
> - ###################################
> - def test_if_logging_level_format_handler_are_set(self):
> - """
> - Ensures that set_logging_conf() sets log level and adds new stream
> - handler with formatting.
> - """
> - with mock.patch('virtBootstrap.virt_bootstrap.logging') as m_logging:
> - mocked_stream_hdlr = mock.Mock()
> - m_logger = mock.Mock()
> - m_logging.getLogger.return_value = m_logger
> - m_logging.StreamHandler.return_value = mocked_stream_hdlr
> - virt_bootstrap.set_logging_conf()
> - m_logging.getLogger.assert_called_once_with('virtBootstrap')
> - mocked_stream_hdlr.setFormatter.assert_called_once()
> - m_logger.addHandler.assert_called_once_with(mocked_stream_hdlr)
> - m_logger.setLevel.assert_called_once()
> -
> -
> -if __name__ == '__main__':
> - unittest.main(exit=False)
ACK
--
Cedric
More information about the virt-tools-list
mailing list