import errno
import os
import pwd
+import platform
import uuid
from ceph_volume import process
+# TODO: get these out of here and into a common area for others to consume
+if platform.system() == 'FreeBSD':
+ FREEBSD = True
+ DEFAULT_FS_TYPE = 'zfs'
+ PROCDIR = '/compat/linux/proc'
+ # FreeBSD does not have blockdevices any more
+ BLOCKDIR = '/dev'
+ ROOTGROUP = 'wheel'
+else:
+ FREEBSD = False
+ DEFAULT_FS_TYPE = 'xfs'
+ PROCDIR = '/proc'
+ BLOCKDIR = '/sys/block'
+ ROOTGROUP = 'root'
+
+
def generate_uuid():
return str(uuid.uuid4())
process.run(['chown', '-R', 'ceph:ceph', path])
else:
os.chown(path, uid, gid)
+
+
+def is_mounted(source, destination=None):
+ """
+ Check if the given device is mounted, optionally validating destination.
+ This relies on absolute path devices, it will ignore non-absolute
+ entries like::
+
+ tmpfs /run tmpfs rw,seclabel,nosuid,nodev,mode=755 0 0
+
+ But will parse paths that are absolute like::
+
+ /dev/sdc2 /boot xfs rw,attr2,inode64,noquota 0 0
+
+ When destination is passed in, it will check that the entry where the
+ source appears is mounted to where destination defines. This is useful so
+ that an error message can report that a source is not mounted at an
+ expected destination.
+ """
+ dev = os.path.realpath(source)
+ with open(PROCDIR + '/mounts', 'rb') as proc_mounts:
+ for line in proc_mounts:
+ fields = line.split()
+ if len(fields) < 3:
+ continue
+ mounted_device = fields[0]
+ mounted_path = fields[1]
+ if os.path.isabs(mounted_device) and os.path.exists(mounted_device):
+ mounted_device = os.path.realpath(mounted_device)
+ if mounted_device == dev:
+ if destination:
+ destination = os.path.realpath(destination)
+ return destination == os.path.realpath(mounted_path)
+ else:
+ return True
+ return False