--- /dev/null
+From b5020a047fc487f35b76fc05f31e52665a1afda1 Mon Sep 17 00:00:00 2001
+From: abhishekkekane <abhishek.kekane@nttdata.com>
+Date: Mon, 6 Jul 2015 01:51:26 -0700
+Subject: [PATCH] libvirt: Kill rsync/scp processes before deleting instance
+
+In the resize operation, during copying files from source to
+destination compute node scp/rsync processes are not aborted after
+the instance is deleted because linux kernel doesn't delete instance
+files physically until all processes using the file handle is closed
+completely. Hence rsync/scp process keeps on running until it
+transfers 100% of file data.
+
+Added new module instancejobtracker to libvirt driver which will add,
+remove or terminate the processes running against particular instances.
+Added callback methods to execute call which will store the pid of
+scp/rsync process in cache as a key: value pair and to remove the
+pid from the cache after process completion. Process id will be used to
+kill the process if it is running while deleting the instance. Instance
+uuid is used as a key in the cache and pid will be the value.
+
+Conflicts:
+ nova/virt/libvirt/driver.py
+
+SecurityImpact
+
+Closes-bug: #1387543
+Change-Id: Ie03acc00a7c904aec13c90ae6a53938d08e5e0c9
+(cherry picked from commit 7ab75d5b0b75fc3426323bef19bf436a258b9707)
+---
+ nova/tests/unit/virt/libvirt/test_driver.py | 38 +++++++++++
+ nova/tests/unit/virt/libvirt/test_utils.py | 9 ++-
+ nova/virt/libvirt/driver.py | 18 +++++-
+ nova/virt/libvirt/instancejobtracker.py | 98 +++++++++++++++++++++++++++++
+ nova/virt/libvirt/utils.py | 14 +++--
+ 5 files changed, 168 insertions(+), 9 deletions(-)
+ create mode 100644 nova/virt/libvirt/instancejobtracker.py
+
+diff --git a/nova/tests/unit/virt/libvirt/test_driver.py b/nova/tests/unit/virt/libvirt/test_driver.py
+index 859df95..5ff978a 100644
+--- a/nova/tests/unit/virt/libvirt/test_driver.py
++++ b/nova/tests/unit/virt/libvirt/test_driver.py
+@@ -23,6 +23,7 @@
+ import random
+ import re
+ import shutil
++import signal
+ import threading
+ import time
+ import uuid
+@@ -9817,6 +9818,15 @@ def test_shared_storage_detection_easy(self):
+ self.mox.ReplayAll()
+ self.assertTrue(drvr._is_storage_shared_with('foo', '/path'))
+
++ def test_store_pid_remove_pid(self):
++ instance = objects.Instance(**self.test_instance)
++ drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
++ popen = mock.Mock(pid=3)
++ drvr.job_tracker.add_job(instance, popen.pid)
++ self.assertIn(3, drvr.job_tracker.jobs[instance.uuid])
++ drvr.job_tracker.remove_job(instance, popen.pid)
++ self.assertNotIn(instance.uuid, drvr.job_tracker.jobs)
++
+ @mock.patch('nova.virt.libvirt.host.Host.get_domain')
+ def test_get_domain_info_with_more_return(self, mock_get_domain):
+ instance = objects.Instance(**self.test_instance)
+@@ -11316,12 +11326,18 @@ def fake_get_host_ip_addr():
+ def fake_execute(*args, **kwargs):
+ pass
+
++ def fake_copy_image(src, dest, host=None, receive=False,
++ on_execute=None, on_completion=None):
++ self.assertIsNotNone(on_execute)
++ self.assertIsNotNone(on_completion)
++
+ self.stubs.Set(self.drvr, 'get_instance_disk_info',
+ fake_get_instance_disk_info)
+ self.stubs.Set(self.drvr, '_destroy', fake_destroy)
+ self.stubs.Set(self.drvr, 'get_host_ip_addr',
+ fake_get_host_ip_addr)
+ self.stubs.Set(utils, 'execute', fake_execute)
++ self.stubs.Set(libvirt_utils, 'copy_image', fake_copy_image)
+
+ ins_ref = self._create_instance(params=params_for_instance)
+
+@@ -12428,6 +12444,28 @@ def test_delete_instance_files(self, get_instance_path, exists, exe,
+ @mock.patch('shutil.rmtree')
+ @mock.patch('nova.utils.execute')
+ @mock.patch('os.path.exists')
++ @mock.patch('os.kill')
++ @mock.patch('nova.virt.libvirt.utils.get_instance_path')
++ def test_delete_instance_files_kill_running(
++ self, get_instance_path, kill, exists, exe, shutil):
++ get_instance_path.return_value = '/path'
++ instance = objects.Instance(uuid='fake-uuid', id=1)
++ self.drvr.job_tracker.jobs[instance.uuid] = [3, 4]
++
++ exists.side_effect = [False, False, True, False]
++
++ result = self.drvr.delete_instance_files(instance)
++ get_instance_path.assert_called_with(instance)
++ exe.assert_called_with('mv', '/path', '/path_del')
++ kill.assert_has_calls([mock.call(3, signal.SIGKILL), mock.call(3, 0),
++ mock.call(4, signal.SIGKILL), mock.call(4, 0)])
++ shutil.assert_called_with('/path_del')
++ self.assertTrue(result)
++ self.assertNotIn(instance.uuid, self.drvr.job_tracker.jobs)
++
++ @mock.patch('shutil.rmtree')
++ @mock.patch('nova.utils.execute')
++ @mock.patch('os.path.exists')
+ @mock.patch('nova.virt.libvirt.utils.get_instance_path')
+ def test_delete_instance_files_resize(self, get_instance_path, exists,
+ exe, shutil):
+diff --git a/nova/tests/unit/virt/libvirt/test_utils.py b/nova/tests/unit/virt/libvirt/test_utils.py
+index 7fa0326..14bf822 100644
+--- a/nova/tests/unit/virt/libvirt/test_utils.py
++++ b/nova/tests/unit/virt/libvirt/test_utils.py
+@@ -62,7 +62,8 @@ def test_copy_image_local_cp(self, mock_execute):
+ mock_execute.assert_called_once_with('cp', 'src', 'dest')
+
+ _rsync_call = functools.partial(mock.call,
+- 'rsync', '--sparse', '--compress')
++ 'rsync', '--sparse', '--compress',
++ on_execute=None, on_completion=None)
+
+ @mock.patch('nova.utils.execute')
+ def test_copy_image_rsync(self, mock_execute):
+@@ -85,7 +86,8 @@ def test_copy_image_scp(self, mock_execute):
+
+ mock_execute.assert_has_calls([
+ self._rsync_call('--dry-run', 'src', 'host:dest'),
+- mock.call('scp', 'src', 'host:dest'),
++ mock.call('scp', 'src', 'host:dest',
++ on_execute=None, on_completion=None),
+ ])
+ self.assertEqual(2, mock_execute.call_count)
+
+@@ -110,7 +112,8 @@ def test_copy_image_scp_ipv6(self, mock_execute):
+
+ mock_execute.assert_has_calls([
+ self._rsync_call('--dry-run', 'src', '[2600::]:dest'),
+- mock.call('scp', 'src', '[2600::]:dest'),
++ mock.call('scp', 'src', '[2600::]:dest',
++ on_execute=None, on_completion=None),
+ ])
+ self.assertEqual(2, mock_execute.call_count)
+
+diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py
+index 40ee080..0a94d5a 100644
+--- a/nova/virt/libvirt/driver.py
++++ b/nova/virt/libvirt/driver.py
+@@ -95,6 +95,7 @@
+ from nova.virt.libvirt import host
+ from nova.virt.libvirt import imagebackend
+ from nova.virt.libvirt import imagecache
++from nova.virt.libvirt import instancejobtracker
+ from nova.virt.libvirt import lvm
+ from nova.virt.libvirt import rbd_utils
+ from nova.virt.libvirt import utils as libvirt_utils
+@@ -465,6 +466,8 @@ def __init__(self, virtapi, read_only=False):
+ 'expect': ', '.join("'%s'" % k for k in
+ sysinfo_serial_funcs.keys())})
+
++ self.job_tracker = instancejobtracker.InstanceJobTracker()
++
+ def _get_volume_drivers(self):
+ return libvirt_volume_drivers
+
+@@ -6301,6 +6304,11 @@ def migrate_disk_and_power_off(self, context, instance, dest,
+ # finish_migration/_create_image to re-create it for us.
+ continue
+
++ on_execute = lambda process: self.job_tracker.add_job(
++ instance, process.pid)
++ on_completion = lambda process: self.job_tracker.remove_job(
++ instance, process.pid)
++
+ if info['type'] == 'qcow2' and info['backing_file']:
+ tmp_path = from_path + "_rbase"
+ # merge backing file
+@@ -6310,11 +6318,15 @@ def migrate_disk_and_power_off(self, context, instance, dest,
+ if shared_storage:
+ utils.execute('mv', tmp_path, img_path)
+ else:
+- libvirt_utils.copy_image(tmp_path, img_path, host=dest)
++ libvirt_utils.copy_image(tmp_path, img_path, host=dest,
++ on_execute=on_execute,
++ on_completion=on_completion)
+ utils.execute('rm', '-f', tmp_path)
+
+ else: # raw or qcow2 with no backing file
+- libvirt_utils.copy_image(from_path, img_path, host=dest)
++ libvirt_utils.copy_image(from_path, img_path, host=dest,
++ on_execute=on_execute,
++ on_completion=on_completion)
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ self._cleanup_remote_migration(dest, inst_base,
+@@ -6683,6 +6695,8 @@ def delete_instance_files(self, instance):
+ # invocation failed due to the absence of both target and
+ # target_resize.
+ if not remaining_path and os.path.exists(target_del):
++ self.job_tracker.terminate_jobs(instance)
++
+ LOG.info(_LI('Deleting instance files %s'), target_del,
+ instance=instance)
+ remaining_path = target_del
+diff --git a/nova/virt/libvirt/instancejobtracker.py b/nova/virt/libvirt/instancejobtracker.py
+new file mode 100644
+index 0000000..d47fb45
+--- /dev/null
++++ b/nova/virt/libvirt/instancejobtracker.py
+@@ -0,0 +1,98 @@
++# Copyright 2015 NTT corp.
++# All Rights Reserved.
++# Licensed under the Apache License, Version 2.0 (the "License"); you may
++# not use this file except in compliance with the License. You may obtain
++# a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
++# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
++# License for the specific language governing permissions and limitations
++# under the License.
++
++
++import collections
++import errno
++import os
++import signal
++
++from oslo_log import log as logging
++
++from nova.i18n import _LE
++from nova.i18n import _LW
++
++
++LOG = logging.getLogger(__name__)
++
++
++class InstanceJobTracker(object):
++ def __init__(self):
++ self.jobs = collections.defaultdict(list)
++
++ def add_job(self, instance, pid):
++ """Appends process_id of instance to cache.
++
++ This method will store the pid of a process in cache as
++ a key: value pair which will be used to kill the process if it
++ is running while deleting the instance. Instance uuid is used as
++ a key in the cache and pid will be the value.
++
++ :param instance: Object of instance
++ :param pid: Id of the process
++ """
++ self.jobs[instance.uuid].append(pid)
++
++ def remove_job(self, instance, pid):
++ """Removes pid of process from cache.
++
++ This method will remove the pid of a process from the cache.
++
++ :param instance: Object of instance
++ :param pid: Id of the process
++ """
++ uuid = instance.uuid
++ if uuid in self.jobs and pid in self.jobs[uuid]:
++ self.jobs[uuid].remove(pid)
++
++ # remove instance.uuid if no pid's remaining
++ if not self.jobs[uuid]:
++ self.jobs.pop(uuid, None)
++
++ def terminate_jobs(self, instance):
++ """Kills the running processes for given instance.
++
++ This method is used to kill all running processes of the instance if
++ it is deleted in between.
++
++ :param instance: Object of instance
++ """
++ pids_to_remove = list(self.jobs.get(instance.uuid, []))
++ for pid in pids_to_remove:
++ try:
++ # Try to kill the process
++ os.kill(pid, signal.SIGKILL)
++ except OSError as exc:
++ if exc.errno != errno.ESRCH:
++ LOG.error(_LE('Failed to kill process %(pid)s '
++ 'due to %(reason)s, while deleting the '
++ 'instance.'), {'pid': pid, 'reason': exc},
++ instance=instance)
++
++ try:
++ # Check if the process is still alive.
++ os.kill(pid, 0)
++ except OSError as exc:
++ if exc.errno != errno.ESRCH:
++ LOG.error(_LE('Unexpected error while checking process '
++ '%(pid)s.'), {'pid': pid},
++ instance=instance)
++ else:
++ # The process is still around
++ LOG.warn(_LW("Failed to kill a long running process "
++ "%(pid)s related to the instance when "
++ "deleting it."), {'pid': pid},
++ instance=instance)
++
++ self.remove_job(instance, pid)
+diff --git a/nova/virt/libvirt/utils.py b/nova/virt/libvirt/utils.py
+index 7b80464..83d5ba3 100644
+--- a/nova/virt/libvirt/utils.py
++++ b/nova/virt/libvirt/utils.py
+@@ -294,13 +294,16 @@ def get_disk_backing_file(path, basename=True):
+ return backing_file
+
+
+-def copy_image(src, dest, host=None, receive=False):
++def copy_image(src, dest, host=None, receive=False,
++ on_execute=None, on_completion=None):
+ """Copy a disk image to an existing directory
+
+ :param src: Source image
+ :param dest: Destination path
+ :param host: Remote host
+ :param receive: Reverse the rsync direction
++ :param on_execute: Callback method to store pid of process in cache
++ :param on_completion: Callback method to remove pid of process from cache
+ """
+
+ if not host:
+@@ -322,11 +325,14 @@ def copy_image(src, dest, host=None, receive=False):
+ # Do a relatively light weight test first, so that we
+ # can fall back to scp, without having run out of space
+ # on the destination for example.
+- execute('rsync', '--sparse', '--compress', '--dry-run', src, dest)
++ execute('rsync', '--sparse', '--compress', '--dry-run', src, dest,
++ on_execute=on_execute, on_completion=on_completion)
+ except processutils.ProcessExecutionError:
+- execute('scp', src, dest)
++ execute('scp', src, dest, on_execute=on_execute,
++ on_completion=on_completion)
+ else:
+- execute('rsync', '--sparse', '--compress', src, dest)
++ execute('rsync', '--sparse', '--compress', src, dest,
++ on_execute=on_execute, on_completion=on_completion)
+
+
+ def write_to_file(path, contents, umask=None):