with open(proc_path, 'w') as f:
f.write('')
monkeypatch.setattr(system, 'PROCDIR', PROCDIR)
- assert system.get_mounts() == {}
+ m = system.Mounts()
+ assert m.get_mounts() == {}
def test_is_mounted_(self, fake_proc):
- result = system.get_mounts()
- assert result['/dev/sdc2'] == ['/boot']
+ m = system.Mounts()
+ assert m.get_mounts()['/dev/sdc2'] == ['/boot']
def test_ignores_two_fields(self, fake_proc):
- result = system.get_mounts()
- assert result.get('/dev/sde4') is None
+ m = system.Mounts()
+ assert m.get_mounts().get('/dev/sde4') is None
def test_tmpfs_is_reported(self, fake_proc):
- result = system.get_mounts()
- assert result['tmpfs'][0] == '/dev/shm'
+ m = system.Mounts()
+ assert m.get_mounts()['tmpfs'][0] == '/dev/shm'
def test_non_skip_devs_arent_reported(self, fake_proc):
- result = system.get_mounts()
- assert result.get('cgroup') is None
+ m = system.Mounts()
+ assert m.get_mounts().get('cgroup') is None
def test_multiple_mounts_are_appended(self, fake_proc):
- result = system.get_mounts()
- assert len(result['tmpfs']) == 7
+ m = system.Mounts()
+ assert len(m.get_mounts()['tmpfs']) == 7
def test_nonexistent_devices_are_skipped(self, tmpdir, monkeypatch):
PROCDIR = str(tmpdir)
/dev/sda2 /far/lib/ceph/osd/ceph-1 xfs rw,attr2,inode64,noquota 0 0"""))
monkeypatch.setattr(system, 'PROCDIR', PROCDIR)
monkeypatch.setattr(os.path, 'exists', lambda x: False if x == '/dev/sda1' else True)
- result = system.get_mounts()
- assert result.get('/dev/sda1') is None
+ m = system.Mounts()
+ assert m.get_mounts().get('/dev/sda1') is None
class TestIsBinary(object):
import tempfile
import uuid
import subprocess
+import threading
from ceph_volume import process, terminal
from . import as_string
"""
Check if the given path is mounted
"""
- mounts = get_mounts(paths=True)
+ m = Mounts(paths=True)
+ mounts = m.get_mounts()
realpath = os.path.realpath(path)
mounted_locations = mounts.get(realpath, [])
Check if the given device is mounted, optionally validating that a
destination exists
"""
- plain_mounts = get_mounts(devices=True)
- realpath_mounts = get_mounts(devices=True, realpath=True)
+ plain_mounts = Mounts(devices=True)
+ realpath_mounts = Mounts(devices=True, realpath=True)
+
realpath_dev = os.path.realpath(dev) if dev.startswith('/') else dev
destination = os.path.realpath(destination) if destination else None
# plain mounts
- plain_dev_mounts = plain_mounts.get(dev, [])
- realpath_dev_mounts = plain_mounts.get(realpath_dev, [])
+ plain_dev_mounts = plain_mounts.get_mounts().get(dev, [])
+ realpath_dev_mounts = plain_mounts.get_mounts().get(realpath_dev, [])
# realpath mounts
- plain_dev_real_mounts = realpath_mounts.get(dev, [])
- realpath_dev_real_mounts = realpath_mounts.get(realpath_dev, [])
+ plain_dev_real_mounts = realpath_mounts.get_mounts().get(dev, [])
+ realpath_dev_real_mounts = realpath_mounts.get_mounts().get(realpath_dev, [])
mount_locations = [
plain_dev_mounts,
logger.info('%s was not found as mounted', dev)
return False
-
-def get_mounts(devices=False, paths=False, realpath=False):
- """
- Create a mapping of all available system mounts so that other helpers can
- detect nicely what path or device is mounted
-
- It ignores (most of) non existing devices, but since some setups might need
- some extra device information, it will make an exception for:
-
- - tmpfs
- - devtmpfs
- - /dev/root
-
- If ``devices`` is set to ``True`` the mapping will be a device-to-path(s),
- if ``paths`` is set to ``True`` then the mapping will be
- a path-to-device(s)
-
- :param realpath: Resolve devices to use their realpaths. This is useful for
- paths like LVM where more than one path can point to the same device
- """
- devices_mounted = {}
- paths_mounted = {}
- do_not_skip = ['tmpfs', 'devtmpfs', '/dev/root']
- default_to_devices = devices is False and paths is False
-
- with open(PROCDIR + '/mounts', 'rb') as mounts:
- proc_mounts = mounts.readlines()
-
- for line in proc_mounts:
- fields = [as_string(f) for f in line.split()]
- if len(fields) < 3:
- continue
- if realpath:
- device = os.path.realpath(fields[0]) if fields[0].startswith('/') else fields[0]
- else:
- device = fields[0]
- path = os.path.realpath(fields[1])
- # only care about actual existing devices
- if not os.path.exists(device) or not device.startswith('/'):
- if device not in do_not_skip:
+class Mounts(object):
+ excluded_paths = []
+
+ def __init__(self, devices=False, paths=False, realpath=False):
+ self.devices = devices
+ self.paths = paths
+ self.realpath = realpath
+
+ def safe_realpath(self, path, timeout=0.2):
+ def _realpath(path, result):
+ p = os.path.realpath(path)
+ result.append(p)
+
+ result = []
+ t = threading.Thread(target=_realpath, args=(path, result))
+ t.setDaemon(True)
+ t.start()
+ t.join(timeout)
+ if t.is_alive():
+ return None
+ return result[0]
+
+ def get_mounts(self):
+ """
+ Create a mapping of all available system mounts so that other helpers can
+ detect nicely what path or device is mounted
+
+ It ignores (most of) non existing devices, but since some setups might need
+ some extra device information, it will make an exception for:
+
+ - tmpfs
+ - devtmpfs
+ - /dev/root
+
+ If ``devices`` is set to ``True`` the mapping will be a device-to-path(s),
+ if ``paths`` is set to ``True`` then the mapping will be
+ a path-to-device(s)
+
+ :param realpath: Resolve devices to use their realpaths. This is useful for
+ paths like LVM where more than one path can point to the same device
+ """
+ devices_mounted = {}
+ paths_mounted = {}
+ do_not_skip = ['tmpfs', 'devtmpfs', '/dev/root']
+ default_to_devices = self.devices is False and self.paths is False
+
+
+ with open(PROCDIR + '/mounts', 'rb') as mounts:
+ proc_mounts = mounts.readlines()
+
+ for line in proc_mounts:
+ fields = [as_string(f) for f in line.split()]
+ if len(fields) < 3:
continue
- if device in devices_mounted.keys():
- devices_mounted[device].append(path)
- else:
- devices_mounted[device] = [path]
- if path in paths_mounted.keys():
- paths_mounted[path].append(device)
- else:
- paths_mounted[path] = [device]
+ if fields[0] in Mounts.excluded_paths or \
+ fields[1] in Mounts.excluded_paths:
+ continue
+ if self.realpath:
+ if fields[0].startswith('/'):
+ device = self.safe_realpath(fields[0])
+ if device is None:
+ logger.warning(f"Can't get realpath on {fields[0]}, skipping.")
+ Mounts.excluded_paths.append(fields[0])
+ continue
+ else:
+ device = fields[0]
+ else:
+ device = fields[0]
+ path = self.safe_realpath(fields[1])
+ if path is None:
+ logger.warning(f"Can't get realpath on {fields[1]}, skipping.")
+ Mounts.excluded_paths.append(fields[1])
+ continue
+ # only care about actual existing devices
+ if not os.path.exists(device) or not device.startswith('/'):
+ if device not in do_not_skip:
+ continue
+ if device in devices_mounted.keys():
+ devices_mounted[device].append(path)
+ else:
+ devices_mounted[device] = [path]
+ if path in paths_mounted.keys():
+ paths_mounted[path].append(device)
+ else:
+ paths_mounted[path] = [device]
- # Default to returning information for devices if
- if devices is True or default_to_devices:
- return devices_mounted
- else:
- return paths_mounted
+ # Default to returning information for devices if
+ if self.devices is True or default_to_devices:
+ return devices_mounted
+ else:
+ return paths_mounted
def set_context(path, recursive=False):