Merge pull request #18598 from dillaman/wip-21958
[ceph.git] / src / ceph-disk
1 #!/usr/bin/env python
2 #
3 # Copyright (C) 2014 Inktank <info@inktank.com>
4 # Copyright (C) 2014 Cloudwatt <libre.licensing@cloudwatt.com>
5 # Copyright (C) 2014 Catalyst.net Ltd
6 #
7 # Author: Loic Dachary <loic@dachary.org>
8 #
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU Library Public License as published by
11 # the Free Software Foundation; either version 2, or (at your option)
12 # any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU Library Public License for more details.
18 #
19
20 import argparse
21 import errno
22 import fcntl
23 import logging
24 import os
25 import os.path
26 import platform
27 import re
28 import subprocess
29 import stat
30 import sys
31 import tempfile
32 import uuid
33 import time
34 import shlex
35 import stat
36
37 """
38 Prepare:
39  - create GPT partition
40  - mark the partition with the ceph type uuid
41  - create a file system
42  - mark the fs as ready for ceph consumption
43  - entire data disk is used (one big partition)
44  - a new partition is added to the journal disk (so it can be easily shared)
45
46  - triggered by administrator or ceph-deploy, e.g.  'ceph-disk <data disk> [journal disk]
47
48 Activate:
49  - mount the volume in a temp location
50  - allocate an osd id (if needed)
51  - remount in the correct location /var/lib/ceph/osd/$cluster-$id
52  - start ceph-osd
53
54  - triggered by udev when it sees the OSD gpt partition type
55  - triggered by admin 'ceph-disk activate <path>'
56  - triggered on ceph service startup with 'ceph-disk activate-all'
57
58 We rely on /dev/disk/by-partuuid to find partitions by their UUID;
59 this is what the journal symlink inside the osd data volume normally
60 points to.
61
62 activate-all relies on /dev/disk/by-parttypeuuid/$typeuuid.$uuid to
63 find all partitions.  We install special udev rules to create these
64 links.
65
66 udev triggers 'ceph-disk activate <dev>' or 'ceph-disk
67 activate-journal <dev>' based on the partition type.
68
69 On old distros (e.g., RHEL6), the blkid installed does not recognized
70 GPT partition metadata and the /dev/disk/by-partuuid etc. links aren't
71 present.  We have a horrible hack in the form of ceph-disk-udev that
72 parses gparted output to create the symlinks above and triggers the
73 'ceph-disk activate' etc commands that udev normally would do if it
74 knew the GPT partition type.
75
76 """
77
78 CEPH_OSD_ONDISK_MAGIC = 'ceph osd volume v026'
79
80 JOURNAL_UUID =              '45b0969e-9b03-4f30-b4c6-b4b80ceff106'
81 DMCRYPT_JOURNAL_UUID =      '45b0969e-9b03-4f30-b4c6-5ec00ceff106'
82 DMCRYPT_LUKS_JOURNAL_UUID = '45b0969e-9b03-4f30-b4c6-35865ceff106'
83 OSD_UUID =                  '4fbd7e29-9d25-41b8-afd0-062c0ceff05d'
84 DMCRYPT_OSD_UUID =          '4fbd7e29-9d25-41b8-afd0-5ec00ceff05d'
85 DMCRYPT_LUKS_OSD_UUID =     '4fbd7e29-9d25-41b8-afd0-35865ceff05d'
86 TOBE_UUID =                 '89c57f98-2fe5-4dc0-89c1-f3ad0ceff2be'
87 DMCRYPT_TOBE_UUID =         '89c57f98-2fe5-4dc0-89c1-5ec00ceff2be'
88 DMCRYPT_JOURNAL_TOBE_UUID = '89c57f98-2fe5-4dc0-89c1-35865ceff2be'
89
90 DEFAULT_FS_TYPE = 'xfs'
91
92 MOUNT_OPTIONS = dict(
93     btrfs='noatime,user_subvol_rm_allowed',
94     # user_xattr is default ever since linux 2.6.39 / 3.0, but we'll
95     # delay a moment before removing it fully because we did have some
96     # issues with ext4 before the xatts-in-leveldb work, and it seemed
97     # that user_xattr helped
98     ext4='noatime,user_xattr',
99     xfs='noatime,inode64',
100     )
101
102 MKFS_ARGS = dict(
103     btrfs=[
104         '-m', 'single',
105         '-l', '32768',
106         '-n', '32768',
107         ],
108     xfs=[
109         # xfs insists on not overwriting previous fs; even if we wipe
110         # partition table, we often recreate it exactly the same way,
111         # so we'll see ghosts of filesystems past
112         '-f',
113         '-i', 'size=2048',
114         ],
115     )
116
117 INIT_SYSTEMS = [
118     'upstart',
119     'sysvinit',
120     'systemd',
121     'auto',
122     'none',
123     ]
124
125 STATEDIR = '/var/lib/ceph'
126
127 SYSCONFDIR = '/etc/ceph'
128
129 # only warn once about some things
130 warned_about = {}
131
132 # Nuke the TERM variable to avoid confusing any subprocesses we call.
133 # For example, libreadline will print weird control sequences for some
134 # TERM values.
135 if 'TERM' in os.environ:
136     del os.environ['TERM']
137
138 LOG_NAME = __name__
139 if LOG_NAME == '__main__':
140     LOG_NAME = os.path.basename(sys.argv[0])
141 LOG = logging.getLogger(LOG_NAME)
142
143
144 ###### lock ########
145
146 class filelock(object):
147     def __init__(self, fn):
148         self.fn = fn
149         self.fd = None
150
151     def acquire(self):
152         assert not self.fd
153         self.fd = file(self.fn, 'w')
154         fcntl.lockf(self.fd, fcntl.LOCK_EX)
155
156     def release(self):
157         assert self.fd
158         fcntl.lockf(self.fd, fcntl.LOCK_UN)
159         self.fd = None
160
161
162 ###### exceptions ########
163
164
165 class Error(Exception):
166     """
167     Error
168     """
169
170     def __str__(self):
171         doc = self.__doc__.strip()
172         return ': '.join([doc] + [str(a) for a in self.args])
173
174
175 class MountError(Error):
176     """
177     Mounting filesystem failed
178     """
179
180
181 class UnmountError(Error):
182     """
183     Unmounting filesystem failed
184     """
185
186
187 class BadMagicError(Error):
188     """
189     Does not look like a Ceph OSD, or incompatible version
190     """
191
192
193 class TruncatedLineError(Error):
194     """
195     Line is truncated
196     """
197
198
199 class TooManyLinesError(Error):
200     """
201     Too many lines
202     """
203
204
205 class FilesystemTypeError(Error):
206     """
207     Cannot discover filesystem type
208      """
209
210
211 class CephDiskException(Exception):
212     """
213     A base exception for ceph-disk to provide custom (ad-hoc) messages that
214     will be caught and dealt with when main() is executed
215     """
216     pass
217
218
219 class ExecutableNotFound(CephDiskException):
220     """
221     Exception to report on executables not available in PATH
222     """
223     pass
224
225
226 ####### utils
227
228
229 def maybe_mkdir(*a, **kw):
230     """
231     Creates a new directory if it doesn't exist, removes
232     existing symlink before creating the directory.
233     """
234     # remove any symlink, if it is there..
235     if os.path.exists(*a) and stat.S_ISLNK(os.lstat(*a).st_mode):
236         LOG.debug('Removing old symlink at %s', *a)
237         os.unlink(*a)
238     try:
239         os.mkdir(*a, **kw)
240     except OSError, e:
241         if e.errno == errno.EEXIST:
242             pass
243         else:
244             raise
245
246
247 def which(executable):
248     """find the location of an executable"""
249     if 'PATH' in os.environ:
250         envpath = os.environ['PATH']
251     else:
252         envpath = os.defpath
253     PATH = envpath.split(os.pathsep)
254
255     locations = PATH + [
256         '/usr/local/bin',
257         '/bin',
258         '/usr/bin',
259         '/usr/local/sbin',
260         '/usr/sbin',
261         '/sbin',
262     ]
263
264     for location in locations:
265         executable_path = os.path.join(location, executable)
266         if os.path.exists(executable_path):
267             return executable_path
268
269
270 def _get_command_executable(arguments):
271     """
272     Return the full path for an executable, raise if the executable is not
273     found. If the executable has already a full path do not perform any checks.
274     """
275     if arguments[0].startswith('/'):  # an absolute path
276         return arguments
277     executable = which(arguments[0])
278     if not executable:
279         command_msg = 'Could not run command: %s' % ' '.join(arguments)
280         executable_msg = '%s not in path.' % arguments[0]
281         raise ExecutableNotFound('%s %s' % (executable_msg, command_msg))
282
283     # swap the old executable for the new one
284     arguments[0] = executable
285     return arguments
286
287
288 def command(arguments, **kwargs):
289     """
290     Safely execute a ``subprocess.Popen`` call making sure that the
291     executable exists and raising a helpful error message
292     if it does not.
293
294     .. note:: This should be the prefered way of calling ``subprocess.Popen``
295     since it provides the caller with the safety net of making sure that
296     executables *will* be found and will error nicely otherwise.
297
298     This returns the output of the command and the return code of the
299     process in a tuple: (output, returncode).
300     """
301     arguments = _get_command_executable(arguments)
302     LOG.info('Running command: %s' % ' '.join(arguments))
303     process = subprocess.Popen(
304         arguments,
305         stdout=subprocess.PIPE,
306         **kwargs)
307     out, _ = process.communicate()
308     return out, process.returncode
309
310
311 def command_check_call(arguments):
312     """
313     Safely execute a ``subprocess.check_call`` call making sure that the
314     executable exists and raising a helpful error message if it does not.
315
316     .. note:: This should be the prefered way of calling
317     ``subprocess.check_call`` since it provides the caller with the safety net
318     of making sure that executables *will* be found and will error nicely
319     otherwise.
320     """
321     arguments = _get_command_executable(arguments)
322     LOG.info('Running command: %s', ' '.join(arguments))
323     return subprocess.check_call(arguments)
324
325
326 def platform_distro():
327     """
328     Returns a normalized, lower case string without any leading nor trailing
329     whitespace that represents the distribution name of the current machine.
330     """
331     distro = platform_information()[0] or ''
332     return distro.strip().lower()
333
334
335 def platform_information():
336     distro, release, codename = platform.linux_distribution()
337     if not codename and 'debian' in distro.lower():  # this could be an empty string in Debian
338         debian_codenames = {
339             '8': 'jessie',
340             '7': 'wheezy',
341             '6': 'squeeze',
342         }
343         major_version = release.split('.')[0]
344         codename = debian_codenames.get(major_version, '')
345
346         # In order to support newer jessie/sid or wheezy/sid strings we test this
347         # if sid is buried in the minor, we should use sid anyway.
348         if not codename and '/' in release:
349             major, minor = release.split('/')
350             if minor == 'sid':
351                 codename = minor
352             else:
353                 codename = major
354
355     return (
356         str(distro).strip(),
357         str(release).strip(),
358         str(codename).strip()
359     )
360
361
362 def get_dev_name(path):
363     """
364     get device name from path.  e.g.::
365
366         /dev/sda -> sdas, /dev/cciss/c0d1 -> cciss!c0d1
367
368     a device "name" is something like::
369
370         sdb
371         cciss!c0d1
372
373     """
374     assert path.startswith('/dev/')
375     base = path[5:]
376     return base.replace('/', '!')
377
378
379 def get_dev_path(name):
380     """
381     get a path (/dev/...) from a name (cciss!c0d1)
382     a device "path" is something like::
383
384         /dev/sdb
385         /dev/cciss/c0d1
386
387     """
388     return '/dev/' + name.replace('!', '/')
389
390
391 def get_dev_relpath(name):
392     """
393     get a relative path to /dev from a name (cciss!c0d1)
394     """
395     return name.replace('!', '/')
396
397
398 def get_dev_size(dev, size='megabytes'):
399     """
400     Attempt to get the size of a device so that we can prevent errors
401     from actions to devices that are smaller, and improve error reporting.
402
403     Because we want to avoid breakage in case this approach is not robust, we
404     will issue a warning if we failed to get the size.
405
406     :param size: bytes or megabytes
407     :param dev: the device to calculate the size
408     """
409     fd = os.open(dev, os.O_RDONLY)
410     dividers = {'bytes': 1, 'megabytes': 1024*1024}
411     try:
412         device_size = os.lseek(fd, 0, os.SEEK_END)
413         divider = dividers.get(size, 1024*1024)  # default to megabytes
414         return device_size/divider
415     except Exception as error:
416         LOG.warning('failed to get size of %s: %s' % (dev, str(error)))
417     finally:
418         os.close(fd)
419
420
421 def get_partition_dev(dev, pnum):
422     """
423     get the device name for a partition
424
425     assume that partitions are named like the base dev, with a number, and optionally
426     some intervening characters (like 'p').  e.g.,
427
428        sda 1 -> sda1
429        cciss/c0d1 1 -> cciss!c0d1p1
430     """
431     name = get_dev_name(os.path.realpath(dev))
432     partname = None
433     for f in os.listdir(os.path.join('/sys/block', name)):
434         if f.startswith(name) and f.endswith(str(pnum)):
435             # we want the shortest name that starts with the base name and ends with the partition number
436             if not partname or len(f) < len(partname):
437                 partname = f
438     if partname:
439         return get_dev_path(partname)
440     else:
441         raise Error('partition %d for %s does not appear to exist' % (pnum, dev))
442
443
444 def list_all_partitions():
445     """
446     Return a list of devices and partitions
447     """
448     dev_part_list = {}
449     for name in os.listdir('/sys/block'):
450         # /dev/fd0 may hang http://tracker.ceph.com/issues/6827
451         if re.match(r'^fd\d$', name):
452             continue
453         if not os.path.exists(os.path.join('/sys/block', name, 'device')):
454             continue
455         dev_part_list[name] = list_partitions(name)
456     return dev_part_list
457
458
459 def list_partitions(basename):
460     """
461     Return a list of partitions on the given device name
462     """
463     partitions = []
464     for name in os.listdir(os.path.join('/sys/block', basename)):
465         if name.startswith(basename):
466             partitions.append(name)
467     return partitions
468
469 def get_partition_base(dev):
470     """
471     Get the base device for a partition
472     """
473     dev = os.path.realpath(dev)
474     if not stat.S_ISBLK(os.lstat(dev).st_mode):
475         raise Error('not a block device', dev)
476
477     name = get_dev_name(dev)
478     if os.path.exists(os.path.join('/sys/block', name)):
479         raise Error('not a partition', dev)
480
481     # find the base
482     for basename in os.listdir('/sys/block'):
483         if os.path.exists(os.path.join('/sys/block', basename, name)):
484             return '/dev/' + basename
485     raise Error('no parent device for partition', dev)
486
487 def is_partition(dev):
488     """
489     Check whether a given device path is a partition or a full disk.
490     """
491     dev = os.path.realpath(dev)
492     if not stat.S_ISBLK(os.lstat(dev).st_mode):
493         raise Error('not a block device', dev)
494
495     name = get_dev_name(dev)
496     if os.path.exists(os.path.join('/sys/block', name)):
497         return False
498
499     # make sure it is a partition of something else
500     for basename in os.listdir('/sys/block'):
501         if os.path.exists(os.path.join('/sys/block', basename, name)):
502             return True
503
504     raise Error('not a disk or partition', dev)
505
506
507 def is_mounted(dev):
508     """
509     Check if the given device is mounted.
510     """
511     dev = os.path.realpath(dev)
512     with file('/proc/mounts', 'rb') as proc_mounts:
513         for line in proc_mounts:
514             fields = line.split()
515             if len(fields) < 3:
516                 continue
517             mounts_dev = fields[0]
518             path = fields[1]
519             if mounts_dev.startswith('/') and os.path.exists(mounts_dev):
520                 mounts_dev = os.path.realpath(mounts_dev)
521                 if mounts_dev == dev:
522                     return path
523     return None
524
525
526 def is_held(dev):
527     """
528     Check if a device is held by another device (e.g., a dm-crypt mapping)
529     """
530     assert os.path.exists(dev)
531     dev = os.path.realpath(dev)
532     base = get_dev_name(dev)
533
534     # full disk?
535     directory = '/sys/block/{base}/holders'.format(base=base)
536     if os.path.exists(directory):
537         return os.listdir(directory)
538
539     # partition?
540     part = base
541     while len(base):
542         directory = '/sys/block/{base}/{part}/holders'.format(part=part, base=base)
543         if os.path.exists(directory):
544             return os.listdir(directory)
545         base = base[:-1]
546     return []
547
548
549 def verify_not_in_use(dev, check_partitions=False):
550     """
551     Verify if a given device (path) is in use (e.g. mounted or
552     in use by device-mapper).
553
554     :raises: Error if device is in use.
555     """
556     assert os.path.exists(dev)
557     if is_mounted(dev):
558         raise Error('Device is mounted', dev)
559     holders = is_held(dev)
560     if holders:
561         raise Error('Device %s is in use by a device-mapper mapping (dm-crypt?)' % dev, ','.join(holders))
562
563     if check_partitions and not is_partition(dev):
564         basename = get_dev_name(os.path.realpath(dev))
565         for partname in list_partitions(basename):
566             partition = get_dev_path(partname)
567             if is_mounted(partition):
568                 raise Error('Device is mounted', partition)
569             holders = is_held(partition)
570             if holders:
571                 raise Error('Device %s is in use by a device-mapper mapping (dm-crypt?)' % partition, ','.join(holders))
572
573
574 def must_be_one_line(line):
575     """
576     Checks if given line is really one single line.
577
578     :raises: TruncatedLineError or TooManyLinesError
579     :return: Content of the line, or None if line isn't valid.
580     """
581     if line[-1:] != '\n':
582         raise TruncatedLineError(line)
583     line = line[:-1]
584     if '\n' in line:
585         raise TooManyLinesError(line)
586     return line
587
588
589 def read_one_line(parent, name):
590     """
591     Read a file whose sole contents are a single line.
592
593     Strips the newline.
594
595     :return: Contents of the line, or None if file did not exist.
596     """
597     path = os.path.join(parent, name)
598     try:
599         line = file(path, 'rb').read()
600     except IOError as e:
601         if e.errno == errno.ENOENT:
602             return None
603         else:
604             raise
605
606     try:
607         line = must_be_one_line(line)
608     except (TruncatedLineError, TooManyLinesError) as e:
609         raise Error(
610             'File is corrupt: {path}: {msg}'.format(
611                 path=path,
612                 msg=e,
613             )
614         )
615     return line
616
617
618 def write_one_line(parent, name, text):
619     """
620     Write a file whose sole contents are a single line.
621
622     Adds a newline.
623     """
624     path = os.path.join(parent, name)
625     tmp = '{path}.{pid}.tmp'.format(path=path, pid=os.getpid())
626     with file(tmp, 'wb') as tmp_file:
627         tmp_file.write(text + '\n')
628         os.fsync(tmp_file.fileno())
629     os.rename(tmp, path)
630
631
632 def check_osd_magic(path):
633     """
634     Check that this path has the Ceph OSD magic.
635
636     :raises: BadMagicError if this does not look like a Ceph OSD data
637     dir.
638     """
639     magic = read_one_line(path, 'magic')
640     if magic is None:
641         # probably not mkfs'ed yet
642         raise BadMagicError(path)
643     if magic != CEPH_OSD_ONDISK_MAGIC:
644         raise BadMagicError(path)
645
646
647 def check_osd_id(osd_id):
648     """
649     Ensures osd id is numeric.
650     """
651     if not re.match(r'^[0-9]+$', osd_id):
652         raise Error('osd id is not numeric', osd_id)
653
654
655 def allocate_osd_id(
656     cluster,
657     fsid,
658     keyring,
659     ):
660     """
661     Accocates an OSD id on the given cluster.
662
663     :raises: Error if the call to allocate the OSD id fails.
664     :return: The allocated OSD id.
665     """
666
667     LOG.debug('Allocating OSD id...')
668     try:
669         osd_id = _check_output(
670             args=[
671                 'ceph',
672                 '--cluster', cluster,
673                 '--name', 'client.bootstrap-osd',
674                 '--keyring', keyring,
675                 'osd', 'create', '--concise',
676                 fsid,
677                 ],
678             )
679     except subprocess.CalledProcessError as e:
680         raise Error('ceph osd create failed', e, e.output)
681     osd_id = must_be_one_line(osd_id)
682     check_osd_id(osd_id)
683     return osd_id
684
685
686 def get_osd_id(path):
687     """
688     Gets the OSD id of the OSD at the given path.
689     """
690     osd_id = read_one_line(path, 'whoami')
691     if osd_id is not None:
692         check_osd_id(osd_id)
693     return osd_id
694
695
696 def _check_output(args=None, **kwargs):
697     out, ret = command(args, **kwargs)
698     if ret:
699         cmd = args[0]
700         error = subprocess.CalledProcessError(ret, cmd)
701         error.output = out
702         raise error
703     return out
704
705
706 def get_conf(cluster, variable):
707     """
708     Get the value of the given configuration variable from the
709     cluster.
710
711     :raises: Error if call to ceph-conf fails.
712     :return: The variable value or None.
713     """
714     try:
715         out, ret = command(
716             [
717                 'ceph-conf',
718                 '--cluster={cluster}'.format(
719                     cluster=cluster,
720                     ),
721                 '--name=osd.',
722                 '--lookup',
723                 variable,
724                 ],
725             close_fds=True,
726             )
727     except OSError as e:
728         raise Error('error executing ceph-conf', e)
729     if ret == 1:
730         # config entry not found
731         return None
732     elif ret != 0:
733         raise Error('getting variable from configuration failed')
734     value = out.split('\n', 1)[0]
735     # don't differentiate between "var=" and no var set
736     if not value:
737         return None
738     return value
739
740
741 def get_conf_with_default(cluster, variable):
742     """
743     Get a config value that is known to the C++ code.
744
745     This will fail if called on variables that are not defined in
746     common config options.
747     """
748     try:
749         out = _check_output(
750             args=[
751                 'ceph-osd',
752                 '--cluster={cluster}'.format(
753                     cluster=cluster,
754                     ),
755                 '--show-config-value={variable}'.format(
756                     variable=variable,
757                     ),
758                 ],
759             close_fds=True,
760             )
761     except subprocess.CalledProcessError as e:
762         raise Error(
763             'getting variable from configuration failed',
764             e,
765             )
766
767     value = str(out).split('\n', 1)[0]
768     return value
769
770
771 def get_fsid(cluster):
772     """
773     Get the fsid of the cluster.
774
775     :return: The fsid or raises Error.
776     """
777     fsid = get_conf_with_default(cluster=cluster, variable='fsid')
778     if fsid is None:
779         raise Error('getting cluster uuid from configuration failed')
780     return fsid.lower()
781
782
783 def get_or_create_dmcrypt_key(
784     _uuid,
785     key_dir,
786     key_size,
787     luks
788     ):
789     """
790     Get path to dmcrypt key or create a new key file.
791
792     :return: Path to the dmcrypt key file.
793     """
794     if luks:
795         path = os.path.join(key_dir, _uuid + ".luks.key")
796     else:
797         path = os.path.join(key_dir, _uuid)
798
799     # already have it?
800     if os.path.exists(path):
801         return path
802
803     # make a new key
804     try:
805         if not os.path.exists(key_dir):
806             os.makedirs(key_dir, stat.S_IRUSR|stat.S_IWUSR|stat.S_IXUSR)
807         with file('/dev/urandom', 'rb') as i:
808             key = i.read(key_size / 8)
809             fd = os.open(path, os.O_WRONLY|os.O_CREAT,
810                          stat.S_IRUSR|stat.S_IWUSR)
811             assert os.write(fd, key) == len(key)
812             os.close(fd)
813         return path
814     except:
815         raise Error('unable to read or create dm-crypt key', path)
816
817
818 def dmcrypt_map(
819     rawdev,
820     keypath,
821     _uuid,
822     cryptsetup_parameters,
823     luks
824     ):
825     """
826     Maps a device to a dmcrypt device.
827
828     :return: Path to the dmcrypt device.
829     """
830     dev = '/dev/mapper/' + _uuid
831     luksFormat_args = [
832         'cryptsetup',
833         '--batch-mode',
834         '--key-file',
835         keypath,
836         'luksFormat',
837         rawdev,
838         ] + cryptsetup_parameters
839
840     luksOpen_args = [
841         'cryptsetup',
842         '--key-file',
843         keypath,
844         'luksOpen',
845         rawdev,
846         _uuid,
847         ]
848
849     create_args = [
850         'cryptsetup',
851         '--key-file',
852         keypath,
853         'create',
854         _uuid,
855         rawdev,
856         ] + cryptsetup_parameters
857
858     try:
859         if luks:
860             command_check_call(luksFormat_args)
861             command_check_call(luksOpen_args)
862         else:
863             # Plain mode has no format function, nor any validation that the key is correct.
864             command_check_call(create_args)
865         return dev
866
867     except subprocess.CalledProcessError as e:
868         raise Error('unable to map device', rawdev, e)
869
870
871 def dmcrypt_unmap(
872     _uuid
873     ):
874     """
875     Removes the dmcrypt device with the given UUID.
876     """
877     args = [
878         'cryptsetup',
879         'remove',
880         _uuid
881     ]
882
883     try:
884         command_check_call(args)
885
886     except subprocess.CalledProcessError as e:
887         raise Error('unable to unmap device', _uuid, e)
888
889
890 def mount(
891     dev,
892     fstype,
893     options,
894     ):
895     """
896     Mounts a device with given filessystem type and
897     mount options to a tempfile path under /var/lib/ceph/tmp.
898     """
899     # sanity check: none of the arguments are None
900     if dev is None:
901         raise ValueError('dev may not be None')
902     if fstype is None:
903         raise ValueError('fstype may not be None')
904
905     # pick best-of-breed mount options based on fs type
906     if options is None:
907         options = MOUNT_OPTIONS.get(fstype, '')
908
909     # mount
910     path = tempfile.mkdtemp(
911         prefix='mnt.',
912         dir=STATEDIR + '/tmp',
913         )
914     try:
915         LOG.debug('Mounting %s on %s with options %s', dev, path, options)
916         command_check_call(
917             [
918                 'mount',
919                 '-t', fstype,
920                 '-o', options,
921                 '--',
922                 dev,
923                 path,
924                 ],
925             )
926     except subprocess.CalledProcessError as e:
927         try:
928             os.rmdir(path)
929         except (OSError, IOError):
930             pass
931         raise MountError(e)
932
933     return path
934
935
936 def unmount(
937     path,
938     ):
939     """
940     Unmount and removes the given mount point.
941     """
942     retries = 0
943     while True:
944         try:
945             LOG.debug('Unmounting %s', path)
946             command_check_call(
947                 [
948                     '/bin/umount',
949                     '--',
950                     path,
951                     ],
952                 )
953             break
954         except subprocess.CalledProcessError as e:
955             # on failure, retry 3 times with incremental backoff
956             if retries == 3:
957                 raise UnmountError(e)
958             else:
959                 time.sleep(0.5 + retries * 1.0)
960                 retries += 1
961
962     os.rmdir(path)
963
964
965 ###########################################
966
967 def extract_parted_partition_numbers(partitions):
968     numbers_as_strings = re.findall('^\d+', partitions, re.MULTILINE)
969     return map(int, numbers_as_strings)
970
971 def get_free_partition_index(dev):
972     """
973     Get the next free partition index on a given device.
974
975     :return: Index number (> 1 if there is already a partition on the device)
976     or 1 if there is no partition table.
977     """
978     try:
979         lines = _check_output(
980             args=[
981                 'parted',
982                 '--machine',
983                 '--',
984                 dev,
985                 'print',
986                 ],
987             )
988     except subprocess.CalledProcessError as e:
989         LOG.info('cannot read partition index; assume it isn\'t present\n (Error: %s)' % e)
990         return 1
991
992     if not lines:
993         raise Error('parted failed to output anything')
994     if ('CHS;' not in lines and
995         'CYL;' not in lines and
996         'BYT;' not in lines):
997         raise Error('parted output expected to contain one of ' +
998                     'CHH; CYL; or BYT; : ' + lines)
999     if os.path.realpath(dev) not in lines:
1000         raise Error('parted output expected to contain ' + dev + ': ' + lines)
1001     _, partitions = lines.split(os.path.realpath(dev))
1002     partition_numbers = extract_parted_partition_numbers(partitions)
1003     if partition_numbers:
1004         return max(partition_numbers) + 1
1005     else:
1006         return 1
1007
1008
1009 def update_partition(action, dev, description):
1010      # try to make sure the kernel refreshes the table.  note
1011      # that if this gets ebusy, we are probably racing with
1012      # udev because it already updated it.. ignore failure here.
1013
1014      # On RHEL and CentOS distros, calling partprobe forces a reboot of the
1015      # server. Since we are not resizing partitons so we rely on calling
1016      # partx
1017      if platform_distro().startswith(('centos', 'red', 'scientific')):
1018          LOG.info('calling partx on %s device %s', description, dev)
1019          LOG.info('re-reading known partitions will display errors')
1020          command(
1021              [
1022                  'partx',
1023                  action,
1024                  dev,
1025              ],
1026          )
1027
1028      else:
1029          LOG.debug('Calling partprobe on %s device %s', description, dev)
1030          command(
1031              [
1032                  'partprobe',
1033                  dev,
1034              ],
1035          )
1036
1037
1038 def zap(dev):
1039     """
1040     Destroy the partition table and content of a given disk.
1041     """
1042     dmode = os.stat(dev).st_mode
1043     if not stat.S_ISBLK(dmode) or is_partition(dev):
1044         raise Error('not full block device; cannot zap', dev)
1045     try:
1046         LOG.debug('Zapping partition table on %s', dev)
1047
1048         # try to wipe out any GPT partition table backups.  sgdisk
1049         # isn't too thorough.
1050         lba_size = 4096
1051         size = 33 * lba_size
1052         with file(dev, 'wb') as dev_file:
1053             dev_file.seek(-size, os.SEEK_END)
1054             dev_file.write(size*'\0')
1055
1056         command_check_call(
1057             [
1058                 'sgdisk',
1059                 '--zap-all',
1060                 '--',
1061                 dev,
1062             ],
1063         )
1064         command_check_call(
1065             [
1066                 'sgdisk',
1067                 '--clear',
1068                 '--mbrtogpt',
1069                 '--',
1070                 dev,
1071             ],
1072         )
1073
1074         update_partition('-d', dev, 'zapped')
1075
1076     except subprocess.CalledProcessError as e:
1077         raise Error(e)
1078
1079
1080 def prepare_journal_dev(
1081     data,
1082     journal,
1083     journal_size,
1084     journal_uuid,
1085     journal_dm_keypath,
1086     cryptsetup_parameters,
1087     luks
1088    ):
1089
1090     reusing_partition = False
1091
1092     if is_partition(journal):
1093         if journal_dm_keypath:
1094             raise Error(journal + ' partition already exists'
1095                         ' and --dmcrypt specified')
1096         LOG.debug('Journal %s is a partition', journal)
1097         LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data')
1098         if get_partition_type(journal) == JOURNAL_UUID:
1099             LOG.debug('Journal %s was previously prepared with ceph-disk. Reusing it.', journal)
1100             reusing_partition = True
1101             # Read and reuse the partition uuid from this journal's previous life.
1102             # We reuse the uuid instead of changing it because udev does not reliably
1103             # notice changes to an existing partition's GUID.
1104             # See http://tracker.ceph.com/issues/10146
1105             journal_uuid = get_partition_uuid(journal)
1106             LOG.debug('Reusing journal with uuid %s', journal_uuid)
1107         else:
1108             LOG.warning('Journal %s was not prepared with ceph-disk. Symlinking directly.', journal)
1109             return (journal, None, None)
1110
1111     journal_symlink = '/dev/disk/by-partuuid/{journal_uuid}'.format(
1112         journal_uuid=journal_uuid,
1113         )
1114
1115     journal_dmcrypt = None
1116     if journal_dm_keypath:
1117         journal_dmcrypt = journal_symlink
1118         journal_symlink = '/dev/mapper/{uuid}'.format(uuid=journal_uuid)
1119
1120     if reusing_partition:
1121         # confirm that the journal_symlink exists. It should since this was an active journal
1122         # in the past. Continuing otherwise would be futile.
1123         assert os.path.exists(journal_symlink)
1124         return (journal_symlink, journal_dmcrypt, journal_uuid)
1125
1126     # From here on we are creating a new journal device, not reusing.
1127
1128     ptype = JOURNAL_UUID
1129     ptype_tobe = JOURNAL_UUID
1130     if journal_dm_keypath:
1131         if luks:
1132             ptype = DMCRYPT_LUKS_JOURNAL_UUID
1133         else:
1134             ptype = DMCRYPT_JOURNAL_UUID
1135         ptype_tobe = DMCRYPT_JOURNAL_TOBE_UUID
1136
1137     # it is a whole disk.  create a partition!
1138     num = None
1139     if journal == data:
1140         # we're sharing the disk between osd data and journal;
1141         # make journal be partition number 2, so it's pretty
1142         num = 2
1143         journal_part = '{num}:0:{size}M'.format(
1144             num=num,
1145             size=journal_size,
1146             )
1147     else:
1148         # sgdisk has no way for me to say "whatever is the next
1149         # free index number" when setting type guids etc, so we
1150         # need to awkwardly look up the next free number, and then
1151         # fix that in the call -- and hope nobody races with us;
1152         # then again nothing guards the partition table from races
1153         # anyway
1154         num = get_free_partition_index(dev=journal)
1155         journal_part = '{num}:0:+{size}M'.format(
1156             num=num,
1157             size=journal_size,
1158             )
1159         LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data')
1160
1161     dev_size = get_dev_size(journal)
1162
1163     if journal_size > dev_size:
1164         LOG.error('refusing to create journal on %s' % journal)
1165         LOG.error('journal size (%sM) is bigger than device (%sM)' % (journal_size, dev_size))
1166         raise Error(
1167             '%s device size (%sM) is not big enough for journal' % (journal, dev_size)
1168         )
1169
1170     try:
1171         LOG.debug('Creating journal partition num %d size %d on %s', num, journal_size, journal)
1172         command_check_call(
1173             [
1174                 'sgdisk',
1175                 '--new={part}'.format(part=journal_part),
1176                 '--change-name={num}:ceph journal'.format(num=num),
1177                 '--partition-guid={num}:{journal_uuid}'.format(
1178                     num=num,
1179                     journal_uuid=journal_uuid,
1180                     ),
1181                 '--typecode={num}:{uuid}'.format(
1182                     num=num,
1183                     uuid=ptype_tobe,
1184                     ),
1185                 '--mbrtogpt',
1186                 '--',
1187                 journal,
1188                 ]
1189             )
1190
1191         update_partition('-a', journal, 'prepared')
1192
1193         # wait for udev event queue to clear
1194         command(
1195             [
1196                 'udevadm',
1197                 'settle',
1198                 ],
1199             )
1200
1201         LOG.debug('Journal is GPT partition %s', journal_symlink)
1202
1203         if journal_dm_keypath:
1204             if luks:
1205                 luksFormat_args = [
1206                     'cryptsetup',
1207                     '--batch-mode',
1208                     '--key-file',
1209                     journal_dm_keypath,
1210                     'luksFormat',
1211                     journal_dmcrypt,
1212                     ] + cryptsetup_parameters
1213
1214                 try:
1215                     command_check_call(luksFormat_args)
1216                 except subprocess.CalledProcessError as e:
1217                     raise Error('unable to format device for LUKS', journal_symlink, e)
1218
1219             try:
1220                 command_check_call(
1221                     [
1222                         'sgdisk',
1223                         '--typecode={num}:{uuid}'.format(
1224                             num=num,
1225                             uuid=ptype,
1226                             ),
1227                         '--',
1228                         journal,
1229                         ],
1230                     )
1231             except subprocess.CalledProcessError as e:
1232                 raise Error('unable to mark device as formatted for LUKS', journal_symlink, e)
1233
1234         LOG.debug('Journal is GPT partition %s', journal_symlink)
1235         return (journal_symlink, journal_dmcrypt, journal_uuid)
1236
1237     except subprocess.CalledProcessError as e:
1238         raise Error(e)
1239
1240 def prepare_journal_file(
1241     journal):
1242
1243     if not os.path.exists(journal):
1244         LOG.debug('Creating journal file %s with size 0 (ceph-osd will resize and allocate)', journal)
1245         with file(journal, 'wb') as journal_file:  # noqa
1246             pass
1247
1248     LOG.debug('Journal is file %s', journal)
1249     LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data')
1250     return (journal, None, None)
1251
1252
1253 def prepare_journal(
1254     data,
1255     journal,
1256     journal_size,
1257     journal_uuid,
1258     force_file,
1259     force_dev,
1260     journal_dm_keypath,
1261     cryptsetup_parameters,
1262     luks
1263     ):
1264
1265     if journal is None:
1266         if force_dev:
1267             raise Error('Journal is unspecified; not a block device')
1268         return (None, None, None)
1269
1270     if not os.path.exists(journal):
1271         if force_dev:
1272             raise Error('Journal does not exist; not a block device', journal)
1273         return prepare_journal_file(journal)
1274
1275     jmode = os.stat(journal).st_mode
1276     if stat.S_ISREG(jmode):
1277         if force_dev:
1278             raise Error('Journal is not a block device', journal)
1279         return prepare_journal_file(journal)
1280
1281     if stat.S_ISBLK(jmode):
1282         if force_file:
1283             raise Error('Journal is not a regular file', journal)
1284         return prepare_journal_dev(data, journal, journal_size, journal_uuid, journal_dm_keypath, cryptsetup_parameters, luks)
1285
1286     raise Error('Journal %s is neither a block device nor regular file' % journal)
1287
1288
1289 def adjust_symlink(target, path):
1290     create = True
1291     if os.path.lexists(path):
1292         try:
1293             mode = os.lstat(path).st_mode
1294             if stat.S_ISREG(mode):
1295                 LOG.debug('Removing old file %s', path)
1296                 os.unlink(path)
1297             elif stat.S_ISLNK(mode):
1298                 old = os.readlink(path)
1299                 if old != target:
1300                     LOG.debug('Removing old symlink %s -> %s', path, old)
1301                     os.unlink(path)
1302                 else:
1303                     create = False
1304         except:
1305             raise Error('unable to remove (or adjust) old file (symlink)', path)
1306     if create:
1307         LOG.debug('Creating symlink %s -> %s', path, target)
1308         try:
1309             os.symlink(target, path)
1310         except:
1311             raise Error('unable to create symlink %s -> %s' % (path, target))
1312
1313
1314 def prepare_dir(
1315     path,
1316     journal,
1317     cluster_uuid,
1318     osd_uuid,
1319     journal_uuid,
1320     journal_dmcrypt=None,
1321     ):
1322
1323     if os.path.exists(os.path.join(path, 'magic')):
1324         LOG.debug('Data dir %s already exists', path)
1325         return
1326     else:
1327         LOG.debug('Preparing osd data dir %s', path)
1328
1329     if osd_uuid is None:
1330         osd_uuid = str(uuid.uuid4())
1331
1332     if journal is not None:
1333         # we're using an external journal; point to it here
1334         adjust_symlink(journal, os.path.join(path, 'journal'))
1335
1336     if journal_dmcrypt is not None:
1337         adjust_symlink(journal_dmcrypt, os.path.join(path, 'journal_dmcrypt'))
1338     else:
1339         try:
1340             os.unlink(os.path.join(path, 'journal_dmcrypt'))
1341         except OSError:
1342             pass
1343
1344     write_one_line(path, 'ceph_fsid', cluster_uuid)
1345     write_one_line(path, 'fsid', osd_uuid)
1346
1347     if journal_uuid is not None:
1348         # i.e., journal is a tagged partition
1349         write_one_line(path, 'journal_uuid', journal_uuid)
1350
1351     write_one_line(path, 'magic', CEPH_OSD_ONDISK_MAGIC)
1352
1353
1354 def prepare_dev(
1355     data,
1356     journal,
1357     fstype,
1358     mkfs_args,
1359     mount_options,
1360     cluster_uuid,
1361     osd_uuid,
1362     journal_uuid,
1363     journal_dmcrypt,
1364     osd_dm_keypath,
1365     cryptsetup_parameters,
1366     luks
1367     ):
1368     """
1369     Prepare a data/journal combination to be used for an OSD.
1370
1371     The ``magic`` file is written last, so it's presence is a reliable
1372     indicator of the whole sequence having completed.
1373
1374     WARNING: This will unconditionally overwrite anything given to
1375     it.
1376     """
1377
1378     ptype_tobe = TOBE_UUID
1379     ptype_osd = OSD_UUID
1380     if osd_dm_keypath:
1381         ptype_tobe = DMCRYPT_TOBE_UUID
1382         if luks:
1383             ptype_osd = DMCRYPT_LUKS_OSD_UUID
1384         else:
1385             ptype_osd = DMCRYPT_OSD_UUID
1386
1387     rawdev = None
1388     if is_partition(data):
1389         LOG.debug('OSD data device %s is a partition', data)
1390         rawdev = data
1391     else:
1392         LOG.debug('Creating osd partition on %s', data)
1393         try:
1394             command_check_call(
1395                 [
1396                     'sgdisk',
1397                     '--largest-new=1',
1398                     '--change-name=1:ceph data',
1399                     '--partition-guid=1:{osd_uuid}'.format(
1400                         osd_uuid=osd_uuid,
1401                         ),
1402                     '--typecode=1:%s' % ptype_tobe,
1403                     '--',
1404                     data,
1405                 ],
1406             )
1407             update_partition('-a', data, 'created')
1408             command(
1409                 [
1410                     # wait for udev event queue to clear
1411                     'udevadm',
1412                     'settle',
1413                     ],
1414                 )
1415         except subprocess.CalledProcessError as e:
1416             raise Error(e)
1417
1418         rawdev = get_partition_dev(data, 1)
1419
1420     dev = None
1421     if osd_dm_keypath:
1422         dev = dmcrypt_map(rawdev, osd_dm_keypath, osd_uuid, cryptsetup_parameters, luks)
1423     else:
1424         dev = rawdev
1425
1426     try:
1427         args = [
1428             'mkfs',
1429             '-t',
1430             fstype,
1431             ]
1432         if mkfs_args is not None:
1433             args.extend(mkfs_args.split())
1434             if fstype == 'xfs':
1435                 args.extend(['-f'])  # always force
1436         else:
1437             args.extend(MKFS_ARGS.get(fstype, []))
1438         args.extend([
1439             '--',
1440             dev,
1441             ])
1442         try:
1443             LOG.debug('Creating %s fs on %s', fstype, dev)
1444             command_check_call(args)
1445         except subprocess.CalledProcessError as e:
1446             raise Error(e)
1447
1448         #remove whitespaces from mount_options
1449         if mount_options is not None:
1450             mount_options = "".join(mount_options.split())
1451
1452         path = mount(dev=dev, fstype=fstype, options=mount_options)
1453
1454         try:
1455             prepare_dir(
1456                 path=path,
1457                 journal=journal,
1458                 cluster_uuid=cluster_uuid,
1459                 osd_uuid=osd_uuid,
1460                 journal_uuid=journal_uuid,
1461                 journal_dmcrypt=journal_dmcrypt,
1462                 )
1463         finally:
1464             unmount(path)
1465     finally:
1466         if rawdev != dev:
1467             dmcrypt_unmap(osd_uuid)
1468
1469     if not is_partition(data):
1470         try:
1471             command_check_call(
1472                 [
1473                     'sgdisk',
1474                     '--typecode=1:%s' % ptype_osd,
1475                     '--',
1476                     data,
1477                 ],
1478             )
1479         except subprocess.CalledProcessError as e:
1480             raise Error(e)
1481
1482
1483 def main_prepare(args):
1484     journal_dm_keypath = None
1485     osd_dm_keypath = None
1486
1487     try:
1488         prepare_lock.acquire()  # noqa
1489         if not os.path.exists(args.data):
1490             if args.data_dev:
1491                 raise Error('data path for device does not exist', args.data)
1492             if args.data_dir:
1493                 raise Error('data path for directory does not exist', args.data)
1494             raise Error('data path does not exist', args.data)
1495
1496         # in use?
1497         dmode = os.stat(args.data).st_mode
1498         if stat.S_ISBLK(dmode):
1499             verify_not_in_use(args.data, True)
1500
1501         if args.journal and os.path.exists(args.journal):
1502             jmode = os.stat(args.journal).st_mode
1503             if stat.S_ISBLK(jmode):
1504                 verify_not_in_use(args.journal, False)
1505
1506         if args.zap_disk is not None:
1507             zap(args.data)
1508
1509         if args.cluster_uuid is None:
1510             args.cluster_uuid = get_fsid(cluster=args.cluster)
1511             if args.cluster_uuid is None:
1512                 raise Error(
1513                     'must have fsid in config or pass --cluster-uuid=',
1514                     )
1515
1516         if args.fs_type is None:
1517             args.fs_type = get_conf(
1518                 cluster=args.cluster,
1519                 variable='osd_mkfs_type',
1520                 )
1521             if args.fs_type is None:
1522                 args.fs_type = get_conf(
1523                     cluster=args.cluster,
1524                     variable='osd_fs_type',
1525                     )
1526             if args.fs_type is None:
1527                 args.fs_type = DEFAULT_FS_TYPE
1528
1529         mkfs_args = get_conf(
1530             cluster=args.cluster,
1531             variable='osd_mkfs_options_{fstype}'.format(
1532                 fstype=args.fs_type,
1533                 ),
1534             )
1535         if mkfs_args is None:
1536             mkfs_args = get_conf(
1537                 cluster=args.cluster,
1538                 variable='osd_fs_mkfs_options_{fstype}'.format(
1539                     fstype=args.fs_type,
1540                     ),
1541                 )
1542
1543         mount_options = get_conf(
1544             cluster=args.cluster,
1545             variable='osd_mount_options_{fstype}'.format(
1546                 fstype=args.fs_type,
1547                 ),
1548             )
1549         if mount_options is None:
1550             mount_options = get_conf(
1551                 cluster=args.cluster,
1552                 variable='osd_fs_mount_options_{fstype}'.format(
1553                     fstype=args.fs_type,
1554                     ),
1555                 )
1556
1557         journal_size = get_conf_with_default(
1558             cluster=args.cluster,
1559             variable='osd_journal_size',
1560             )
1561         journal_size = int(journal_size)
1562
1563         cryptsetup_parameters_str = get_conf(
1564             cluster=args.cluster,
1565             variable='osd_cryptsetup_parameters',
1566             )
1567         if cryptsetup_parameters_str is None:
1568             cryptsetup_parameters = []
1569         else:
1570             cryptsetup_parameters = shlex.split(cryptsetup_parameters_str)
1571
1572         dmcrypt_keysize_str = get_conf(
1573             cluster=args.cluster,
1574             variable='osd_dmcrypt_key_size',
1575             )
1576
1577         dmcrypt_type = get_conf(
1578             cluster=args.cluster,
1579             variable='osd_dmcrypt_type',
1580             )
1581
1582         if dmcrypt_type is None:
1583             dmcrypt_type = "luks"
1584
1585         if dmcrypt_type == "plain":
1586             if dmcrypt_keysize_str is None:
1587                 # This value is hard-coded in the udev script
1588                 dmcrypt_keysize = 256
1589             else:
1590                 dmcrypt_keysize = int(dmcrypt_keysize_str)
1591                 LOG.warning('''ensure the 95-ceph-osd.rules file has been copied to /etc/udev/rules.d
1592  and modified to call cryptsetup with --key-size=%s'''
1593                             % dmcrypt_keysize_str)
1594
1595             if len (cryptsetup_parameters) > 0:
1596                 LOG.warning('''ensure the 95-ceph-osd.rules file has been copied to /etc/udev/rules.d
1597  and modified to call cryptsetup with %s'''
1598                             % cryptsetup_parameters_str)
1599
1600             cryptsetup_parameters = ['--key-size', str(dmcrypt_keysize)] + cryptsetup_parameters
1601             luks = False
1602         elif dmcrypt_type == "luks":
1603             if dmcrypt_keysize_str is None:
1604                 # As LUKS will hash the 'passphrase' in .luks.key into a key, set a large default
1605                 # so if not updated for some time, it is still a
1606                 # reasonable value.
1607                 #
1608                 # We don't force this into the cryptsetup_parameters, as we want the cryptsetup defaults
1609                 # to prevail for the actual LUKS key lengths.
1610                 dmcrypt_keysize = 1024
1611             else:
1612                 dmcrypt_keysize = int(dmcrypt_keysize_str)
1613                 cryptsetup_parameters = ['--key-size', str(dmcrypt_keysize)] + cryptsetup_parameters
1614
1615             luks = True
1616         else:
1617             raise Error('invalid osd_dmcrypt_type parameter (must be luks or plain): ', dmcrypt_type)
1618
1619         # colocate journal with data?
1620         if stat.S_ISBLK(dmode) and not is_partition(args.data) and args.journal is None and args.journal_file is None:
1621             LOG.info('Will colocate journal with data on %s', args.data)
1622             args.journal = args.data
1623
1624         if args.journal_uuid is None:
1625             args.journal_uuid = str(uuid.uuid4())
1626         if args.osd_uuid is None:
1627             args.osd_uuid = str(uuid.uuid4())
1628
1629         # dm-crypt keys?
1630         if args.dmcrypt:
1631             journal_dm_keypath = get_or_create_dmcrypt_key(args.journal_uuid, args.dmcrypt_key_dir, dmcrypt_keysize, luks)
1632             osd_dm_keypath = get_or_create_dmcrypt_key(args.osd_uuid, args.dmcrypt_key_dir, dmcrypt_keysize, luks)
1633
1634         # prepare journal
1635         (journal_symlink, journal_dmcrypt, journal_uuid) = prepare_journal(
1636             data=args.data,
1637             journal=args.journal,
1638             journal_size=journal_size,
1639             journal_uuid=args.journal_uuid,
1640             force_file=args.journal_file,
1641             force_dev=args.journal_dev,
1642             journal_dm_keypath=journal_dm_keypath,
1643             cryptsetup_parameters=cryptsetup_parameters,
1644             luks=luks
1645             )
1646
1647         # prepare data
1648         if stat.S_ISDIR(dmode):
1649             if args.data_dev:
1650                 raise Error('data path is not a block device', args.data)
1651             prepare_dir(
1652                 path=args.data,
1653                 journal=journal_symlink,
1654                 cluster_uuid=args.cluster_uuid,
1655                 osd_uuid=args.osd_uuid,
1656                 journal_uuid=journal_uuid,
1657                 journal_dmcrypt=journal_dmcrypt,
1658                 )
1659         elif stat.S_ISBLK(dmode):
1660             if args.data_dir:
1661                 raise Error('data path is not a directory', args.data)
1662             prepare_dev(
1663                 data=args.data,
1664                 journal=journal_symlink,
1665                 fstype=args.fs_type,
1666                 mkfs_args=mkfs_args,
1667                 mount_options=mount_options,
1668                 cluster_uuid=args.cluster_uuid,
1669                 osd_uuid=args.osd_uuid,
1670                 journal_uuid=journal_uuid,
1671                 journal_dmcrypt=journal_dmcrypt,
1672                 osd_dm_keypath=osd_dm_keypath,
1673                 cryptsetup_parameters=cryptsetup_parameters,
1674                 luks=luks
1675                 )
1676         else:
1677             raise Error('not a dir or block device', args.data)
1678         prepare_lock.release()  # noqa
1679
1680         if stat.S_ISBLK(dmode):
1681             update_partition('-a', args.data, 'prepared')
1682
1683     except Error as e:
1684         if journal_dm_keypath:
1685             try:
1686                 os.unlink(journal_dm_keypath)
1687             except OSError as e2:
1688                 if e2.errno != errno.ENOENT: # errno.ENOENT = no such file or directory
1689                     raise # re-raise exception if a different error occured
1690         if osd_dm_keypath:
1691             try:
1692                 os.unlink(osd_dm_keypath)
1693             except OSError as e2:
1694                 if e2.errno != errno.ENOENT: # errno.ENOENT = no such file or directory
1695                     raise # re-raise exception if a different error occured
1696         prepare_lock.release()  # noqa
1697         raise e
1698
1699
1700 ###########################
1701
1702
1703 def mkfs(
1704     path,
1705     cluster,
1706     osd_id,
1707     fsid,
1708     keyring,
1709     ):
1710     monmap = os.path.join(path, 'activate.monmap')
1711     command_check_call(
1712         [
1713             'ceph',
1714             '--cluster', cluster,
1715             '--name', 'client.bootstrap-osd',
1716             '--keyring', keyring,
1717             'mon', 'getmap', '-o', monmap,
1718             ],
1719         )
1720
1721     command_check_call(
1722         [
1723             'ceph-osd',
1724             '--cluster', cluster,
1725             '--mkfs',
1726             '--mkkey',
1727             '-i', osd_id,
1728             '--monmap', monmap,
1729             '--osd-data', path,
1730             '--osd-journal', os.path.join(path, 'journal'),
1731             '--osd-uuid', fsid,
1732             '--keyring', os.path.join(path, 'keyring'),
1733             ],
1734         )
1735     # TODO ceph-osd --mkfs removes the monmap file?
1736     # os.unlink(monmap)
1737
1738
1739 def auth_key(
1740     path,
1741     cluster,
1742     osd_id,
1743     keyring,
1744     ):
1745     try:
1746         # try dumpling+ cap scheme
1747         command_check_call(
1748             [
1749                 'ceph',
1750                 '--cluster', cluster,
1751                 '--name', 'client.bootstrap-osd',
1752                 '--keyring', keyring,
1753                 'auth', 'add', 'osd.{osd_id}'.format(osd_id=osd_id),
1754                 '-i', os.path.join(path, 'keyring'),
1755                 'osd', 'allow *',
1756                 'mon', 'allow profile osd',
1757                 ],
1758             )
1759     except subprocess.CalledProcessError as err:
1760         if err.returncode == errno.EACCES:
1761             # try old cap scheme
1762             command_check_call(
1763                 [
1764                     'ceph',
1765                     '--cluster', cluster,
1766                     '--name', 'client.bootstrap-osd',
1767                     '--keyring', keyring,
1768                     'auth', 'add', 'osd.{osd_id}'.format(osd_id=osd_id),
1769                     '-i', os.path.join(path, 'keyring'),
1770                     'osd', 'allow *',
1771                     'mon', 'allow rwx',
1772                 ],
1773                 )
1774         else:
1775             raise
1776
1777 def get_mount_point(cluster, osd_id):
1778     parent = STATEDIR + '/osd'
1779     return os.path.join(
1780         parent,
1781         '{cluster}-{osd_id}'.format(cluster=cluster, osd_id=osd_id),
1782     )
1783
1784 def move_mount(
1785     dev,
1786     path,
1787     cluster,
1788     osd_id,
1789     fstype,
1790     mount_options,
1791     ):
1792     LOG.debug('Moving mount to final location...')
1793     osd_data = get_mount_point(cluster, osd_id)
1794     maybe_mkdir(osd_data)
1795
1796     # pick best-of-breed mount options based on fs type
1797     if mount_options is None:
1798         mount_options = MOUNT_OPTIONS.get(fstype, '')
1799
1800     # we really want to mount --move, but that is not supported when
1801     # the parent mount is shared, as it is by default on RH, Fedora,
1802     # and probably others.  Also, --bind doesn't properly manipulate
1803     # /etc/mtab, which *still* isn't a symlink to /proc/mounts despite
1804     # this being 2013.  Instead, mount the original device at the final
1805     # location.
1806     command_check_call(
1807         [
1808             '/bin/mount',
1809             '-o',
1810             mount_options,
1811             '--',
1812             dev,
1813             osd_data,
1814             ],
1815         )
1816     command_check_call(
1817         [
1818             '/bin/umount',
1819             '-l',   # lazy, in case someone else is peeking at the
1820                     # wrong moment
1821             '--',
1822             path,
1823             ],
1824         )
1825
1826
1827 def start_daemon(
1828     cluster,
1829     osd_id,
1830     ):
1831     LOG.debug('Starting %s osd.%s...', cluster, osd_id)
1832
1833     path = (STATEDIR + '/osd/{cluster}-{osd_id}').format(
1834         cluster=cluster, osd_id=osd_id)
1835
1836     # upstart?
1837     try:
1838         if os.path.exists(os.path.join(path,'upstart')):
1839             command_check_call(
1840                 [
1841                     '/sbin/initctl',
1842                     # use emit, not start, because start would fail if the
1843                     # instance was already running
1844                     'emit',
1845                     # since the daemon starting doesn't guarantee much about
1846                     # the service being operational anyway, don't bother
1847                     # waiting for it
1848                     '--no-wait',
1849                     '--',
1850                     'ceph-osd',
1851                     'cluster={cluster}'.format(cluster=cluster),
1852                     'id={osd_id}'.format(osd_id=osd_id),
1853                     ],
1854                 )
1855         elif os.path.exists(os.path.join(path, 'sysvinit')):
1856             if os.path.exists('/usr/sbin/service'):
1857                 svc = '/usr/sbin/service'
1858             else:
1859                 svc = '/sbin/service'
1860             command_check_call(
1861                 [
1862                     svc,
1863                     'ceph',
1864                     '--cluster',
1865                     '{cluster}'.format(cluster=cluster),
1866                     'start',
1867                     'osd.{osd_id}'.format(osd_id=osd_id),
1868                     ],
1869                 )
1870         elif os.path.exists(os.path.join(path, 'systemd')):
1871             command_check_call(
1872                 [
1873                     'systemctl',
1874                     'enable',
1875                     'ceph-osd@{osd_id}'.format(osd_id=osd_id),
1876                     ],
1877                 )
1878             command_check_call(
1879                 [
1880                     'systemctl',
1881                     'start',
1882                     'ceph-osd@{osd_id}'.format(osd_id=osd_id),
1883                     ],
1884                 )
1885         else:
1886             raise Error('{cluster} osd.{osd_id} is not tagged with an init system'.format(
1887                 cluster=cluster,
1888                 osd_id=osd_id,
1889             ))
1890     except subprocess.CalledProcessError as e:
1891         raise Error('ceph osd start failed', e)
1892
1893
1894 def detect_fstype(
1895     dev,
1896     ):
1897     fstype = _check_output(
1898         args=[
1899             '/sbin/blkid',
1900             # we don't want stale cached results
1901             '-p',
1902             '-s', 'TYPE',
1903             '-o' 'value',
1904             '--',
1905             dev,
1906             ],
1907         )
1908     fstype = must_be_one_line(fstype)
1909     return fstype
1910
1911
1912 def mount_activate(
1913     dev,
1914     activate_key_template,
1915     init,
1916     ):
1917
1918     try:
1919         fstype = detect_fstype(dev=dev)
1920     except (subprocess.CalledProcessError,
1921             TruncatedLineError,
1922             TooManyLinesError) as e:
1923         raise FilesystemTypeError(
1924             'device {dev}'.format(dev=dev),
1925             e,
1926             )
1927
1928     # TODO always using mount options from cluster=ceph for
1929     # now; see http://tracker.newdream.net/issues/3253
1930     mount_options = get_conf(
1931         cluster='ceph',
1932         variable='osd_mount_options_{fstype}'.format(
1933             fstype=fstype,
1934             ),
1935         )
1936
1937     if mount_options is None:
1938         mount_options = get_conf(
1939             cluster='ceph',
1940             variable='osd_fs_mount_options_{fstype}'.format(
1941                 fstype=fstype,
1942                 ),
1943             )
1944
1945     #remove whitespaces from mount_options
1946     if mount_options is not None:
1947         mount_options = "".join(mount_options.split())
1948
1949     path = mount(dev=dev, fstype=fstype, options=mount_options)
1950
1951     osd_id = None
1952     cluster = None
1953     try:
1954         (osd_id, cluster) = activate(path, activate_key_template, init)
1955
1956         # check if the disk is already active, or if something else is already
1957         # mounted there
1958         active = False
1959         other = False
1960         src_dev = os.stat(path).st_dev
1961         try:
1962             dst_dev = os.stat((STATEDIR + '/osd/{cluster}-{osd_id}').format(
1963                 cluster=cluster,
1964                 osd_id=osd_id)).st_dev
1965             if src_dev == dst_dev:
1966                 active = True
1967             else:
1968                 parent_dev = os.stat(STATEDIR + '/osd').st_dev
1969                 if dst_dev != parent_dev:
1970                     other = True
1971                 elif os.listdir(get_mount_point(cluster, osd_id)):
1972                     LOG.info(get_mount_point(cluster, osd_id) + " is not empty, won't override")
1973                     other = True
1974
1975         except OSError:
1976             pass
1977
1978         if active:
1979             LOG.info('%s osd.%s already mounted in position; unmounting ours.' % (cluster, osd_id))
1980             unmount(path)
1981         elif other:
1982             raise Error('another %s osd.%s already mounted in position (old/different cluster instance?); unmounting ours.' % (cluster, osd_id))
1983         else:
1984             move_mount(
1985                 dev=dev,
1986                 path=path,
1987                 cluster=cluster,
1988                 osd_id=osd_id,
1989                 fstype=fstype,
1990                 mount_options=mount_options,
1991                 )
1992         return (cluster, osd_id)
1993
1994     except:
1995         LOG.error('Failed to activate')
1996         unmount(path)
1997         raise
1998     finally:
1999         # remove our temp dir
2000         if os.path.exists(path):
2001             os.rmdir(path)
2002
2003
2004 def activate_dir(
2005     path,
2006     activate_key_template,
2007     init,
2008     ):
2009
2010     if not os.path.exists(path):
2011         raise Error(
2012             'directory %s does not exist' % path
2013             )
2014
2015     (osd_id, cluster) = activate(path, activate_key_template, init)
2016
2017     if init not in (None, 'none' ):
2018         canonical = (STATEDIR + '/osd/{cluster}-{osd_id}').format(
2019             cluster=cluster,
2020             osd_id=osd_id)
2021         if path != canonical:
2022             # symlink it from the proper location
2023             create = True
2024             if os.path.lexists(canonical):
2025                 old = os.readlink(canonical)
2026                 if old != path:
2027                     LOG.debug('Removing old symlink %s -> %s', canonical, old)
2028                     try:
2029                         os.unlink(canonical)
2030                     except:
2031                         raise Error('unable to remove old symlink', canonical)
2032                 else:
2033                     create = False
2034             if create:
2035                 LOG.debug('Creating symlink %s -> %s', canonical, path)
2036                 try:
2037                     os.symlink(path, canonical)
2038                 except:
2039                     raise Error('unable to create symlink %s -> %s' % (canonical, path))
2040
2041     return (cluster, osd_id)
2042
2043
2044 def find_cluster_by_uuid(_uuid):
2045     """
2046     Find a cluster name by searching /etc/ceph/*.conf for a conf file
2047     with the right uuid.
2048     """
2049     _uuid = _uuid.lower()
2050     no_fsid = []
2051     if not os.path.exists(SYSCONFDIR):
2052         return None
2053     for conf_file in os.listdir(SYSCONFDIR):
2054         if not conf_file.endswith('.conf'):
2055             continue
2056         cluster = conf_file[:-5]
2057         try:
2058             fsid = get_fsid(cluster)
2059         except Error as e:
2060             if e.message != 'getting cluster uuid from configuration failed':
2061                 raise e
2062             no_fsid.append(cluster)
2063         else:
2064             if fsid == _uuid:
2065                 return cluster
2066     # be tolerant of /etc/ceph/ceph.conf without an fsid defined.
2067     if len(no_fsid) == 1 and no_fsid[0] == 'ceph':
2068         LOG.warning('No fsid defined in ' + SYSCONFDIR + '/ceph.conf; using anyway')
2069         return 'ceph'
2070     return None
2071
2072
2073 def activate(
2074     path,
2075     activate_key_template,
2076     init,
2077     ):
2078
2079     check_osd_magic(path)
2080
2081     ceph_fsid = read_one_line(path, 'ceph_fsid')
2082     if ceph_fsid is None:
2083         raise Error('No cluster uuid assigned.')
2084     LOG.debug('Cluster uuid is %s', ceph_fsid)
2085
2086     cluster = find_cluster_by_uuid(ceph_fsid)
2087     if cluster is None:
2088         raise Error('No cluster conf found in ' + SYSCONFDIR + ' with fsid %s' % ceph_fsid)
2089     LOG.debug('Cluster name is %s', cluster)
2090
2091     fsid = read_one_line(path, 'fsid')
2092     if fsid is None:
2093         raise Error('No OSD uuid assigned.')
2094     LOG.debug('OSD uuid is %s', fsid)
2095
2096     keyring = activate_key_template.format(cluster=cluster,
2097                                            statedir=STATEDIR)
2098
2099     osd_id = get_osd_id(path)
2100     if osd_id is None:
2101         osd_id = allocate_osd_id(
2102             cluster=cluster,
2103             fsid=fsid,
2104             keyring=keyring,
2105             )
2106         write_one_line(path, 'whoami', osd_id)
2107     LOG.debug('OSD id is %s', osd_id)
2108
2109     if not os.path.exists(os.path.join(path, 'ready')):
2110         LOG.debug('Initializing OSD...')
2111         # re-running mkfs is safe, so just run until it completes
2112         mkfs(
2113             path=path,
2114             cluster=cluster,
2115             osd_id=osd_id,
2116             fsid=fsid,
2117             keyring=keyring,
2118             )
2119
2120     if init not in (None, 'none' ):
2121         if init == 'auto':
2122             conf_val = get_conf(
2123                 cluster=cluster,
2124                 variable='init'
2125                 )
2126             if conf_val is not None:
2127                 init = conf_val
2128             else:
2129                 (distro, release, codename) = platform.dist()
2130                 if distro == 'Ubuntu':
2131                     init = 'upstart'
2132                 else:
2133                     init = 'sysvinit'
2134
2135         LOG.debug('Marking with init system %s', init)
2136         with file(os.path.join(path, init), 'w'):
2137             pass
2138
2139     # remove markers for others, just in case.
2140     for other in INIT_SYSTEMS:
2141         if other != init:
2142             try:
2143                 os.unlink(os.path.join(path, other))
2144             except OSError:
2145                 pass
2146
2147     if not os.path.exists(os.path.join(path, 'active')):
2148         LOG.debug('Authorizing OSD key...')
2149         auth_key(
2150             path=path,
2151             cluster=cluster,
2152             osd_id=osd_id,
2153             keyring=keyring,
2154             )
2155         write_one_line(path, 'active', 'ok')
2156     LOG.debug('%s osd.%s data dir is ready at %s', cluster, osd_id, path)
2157     return (osd_id, cluster)
2158
2159
2160 def main_activate(args):
2161     cluster = None
2162     osd_id = None
2163
2164     if not os.path.exists(args.path):
2165         raise Error('%s does not exist' % args.path)
2166
2167     if is_suppressed(args.path):
2168         LOG.info('suppressed activate request on %s', args.path)
2169         return
2170
2171     activate_lock.acquire()  # noqa
2172     try:
2173         mode = os.stat(args.path).st_mode
2174         if stat.S_ISBLK(mode):
2175             (cluster, osd_id) = mount_activate(
2176                 dev=args.path,
2177                 activate_key_template=args.activate_key_template,
2178                 init=args.mark_init,
2179                 )
2180             osd_data = get_mount_point(cluster, osd_id)
2181
2182         elif stat.S_ISDIR(mode):
2183             (cluster, osd_id) = activate_dir(
2184                 path=args.path,
2185                 activate_key_template=args.activate_key_template,
2186                 init=args.mark_init,
2187                 )
2188             osd_data = args.path
2189
2190         else:
2191             raise Error('%s is not a directory or block device' % args.path)
2192
2193         if args.mark_init == 'none':
2194             command_check_call(
2195                 [
2196                     'ceph-osd',
2197                     '--cluster={cluster}'.format(cluster=cluster),
2198                     '--id={osd_id}'.format(osd_id=osd_id),
2199                     '--osd-data={path}'.format(path=osd_data),
2200                     '--osd-journal={path}/journal'.format(path=osd_data),
2201                 ],
2202             )
2203
2204         if args.mark_init not in (None, 'none' ):
2205
2206             start_daemon(
2207                 cluster=cluster,
2208                 osd_id=osd_id,
2209             )
2210
2211     finally:
2212         activate_lock.release()  # noqa
2213
2214
2215 ###########################
2216
2217 def get_journal_osd_uuid(path):
2218     if not os.path.exists(path):
2219         raise Error('%s does not exist' % path)
2220
2221     mode = os.stat(path).st_mode
2222     if not stat.S_ISBLK(mode):
2223         raise Error('%s is not a block device' % path)
2224
2225     try:
2226         out = _check_output(
2227             args=[
2228                 'ceph-osd',
2229                 '-i', '0',   # this is ignored
2230                 '--get-journal-uuid',
2231                 '--osd-journal',
2232                 path,
2233                 ],
2234             close_fds=True,
2235             )
2236     except subprocess.CalledProcessError as e:
2237         raise Error(
2238             'failed to get osd uuid/fsid from journal',
2239             e,
2240             )
2241     value = str(out).split('\n', 1)[0]
2242     LOG.debug('Journal %s has OSD UUID %s', path, value)
2243     return value
2244
2245
2246 def main_activate_journal(args):
2247     if not os.path.exists(args.dev):
2248         raise Error('%s does not exist' % args.dev)
2249
2250     cluster = None
2251     osd_id = None
2252     osd_uuid = None
2253     activate_lock.acquire()  # noqa
2254     try:
2255         osd_uuid = get_journal_osd_uuid(args.dev)
2256         path = os.path.join('/dev/disk/by-partuuid/', osd_uuid.lower())
2257
2258         (cluster, osd_id) = mount_activate(
2259             dev=path,
2260             activate_key_template=args.activate_key_template,
2261             init=args.mark_init,
2262             )
2263
2264         start_daemon(
2265             cluster=cluster,
2266             osd_id=osd_id,
2267             )
2268
2269     finally:
2270         activate_lock.release()  # noqa
2271
2272
2273 ###########################
2274
2275
2276 def main_activate_all(args):
2277     dir = '/dev/disk/by-parttypeuuid'
2278     LOG.debug('Scanning %s', dir)
2279     if not os.path.exists(dir):
2280         return
2281     err = False
2282     for name in os.listdir(dir):
2283         if name.find('.') < 0:
2284             continue
2285         (tag, uuid) = name.split('.')
2286
2287         if tag == OSD_UUID or tag == DMCRYPT_OSD_UUID or tag == DMCRYPT_LUKS_OSD_UUID:
2288
2289             if tag == DMCRYPT_OSD_UUID or tag == DMCRYPT_LUKS_OSD_UUID:
2290                 path = os.path.join('/dev/mapper', uuid)
2291             else:
2292                 path = os.path.join(dir, name)
2293
2294             LOG.info('Activating %s', path)
2295             activate_lock.acquire()  # noqa
2296             try:
2297                 (cluster, osd_id) = mount_activate(
2298                     dev=path,
2299                     activate_key_template=args.activate_key_template,
2300                     init=args.mark_init,
2301                     )
2302                 start_daemon(
2303                     cluster=cluster,
2304                     osd_id=osd_id,
2305                     )
2306
2307             except Exception as e:
2308                 print >> sys.stderr, '{prog}: {msg}'.format(
2309                     prog=args.prog,
2310                     msg=e,
2311                     )
2312                 err = True
2313
2314             finally:
2315                 activate_lock.release()  # noqa
2316     if err:
2317         raise Error('One or more partitions failed to activate')
2318
2319
2320 ###########################
2321
2322 def is_swap(dev):
2323     dev = os.path.realpath(dev)
2324     with file('/proc/swaps', 'rb') as proc_swaps:
2325         for line in proc_swaps.readlines()[1:]:
2326             fields = line.split()
2327             if len(fields) < 3:
2328                 continue
2329             swaps_dev = fields[0]
2330             if swaps_dev.startswith('/') and os.path.exists(swaps_dev):
2331                 swaps_dev = os.path.realpath(swaps_dev)
2332                 if swaps_dev == dev:
2333                     return True
2334     return False
2335
2336
2337 def get_oneliner(base, name):
2338     path = os.path.join(base, name)
2339     if os.path.isfile(path):
2340         with open(path, 'r') as _file:
2341             return _file.readline().rstrip()
2342     return None
2343
2344
2345 def get_dev_fs(dev):
2346     fscheck, _ = command(
2347         [
2348             'blkid',
2349             '-s',
2350             'TYPE',
2351             dev,
2352         ],
2353     )
2354     if 'TYPE' in fscheck:
2355         fstype = fscheck.split()[1].split('"')[1]
2356         return fstype
2357     else:
2358         return None
2359
2360
2361 def split_dev_base_partnum(dev):
2362     if 'loop' in dev or 'cciss' in dev or 'nvme' in dev:
2363         return re.match('(.*\d+)p(\d+)', dev).group(1, 2)
2364     else:
2365         return re.match('(\D+)(\d+)', dev).group(1, 2)
2366
2367
2368 def get_partition_type(part):
2369     """
2370     Get the GPT partition type UUID.  If we have an old blkid and can't
2371     get it that way, use sgdisk and use the description instead (and hope
2372     dmcrypt isn't being used).
2373     """
2374     blkid, _ = command(
2375         [
2376             'blkid',
2377             '-p',
2378             '-o', 'udev',
2379             part,
2380         ]
2381     )
2382     saw_part_entry = False
2383     for line in blkid.splitlines():
2384         (key, value) = line.split('=')
2385         if key == 'ID_PART_ENTRY_TYPE':
2386             return value
2387         if key == 'ID_PART_ENTRY_SCHEME':
2388             table_type = value
2389         if key.startswith('ID_PART_ENTRY_'):
2390             saw_part_entry = True
2391
2392     # hmm, is it in fact GPT?
2393     table_type = None
2394     base = get_partition_base(part)
2395     blkid, _ = command(
2396         [
2397             'blkid',
2398             '-p',
2399             '-o', 'udev',
2400             base
2401         ]
2402     )
2403     for line in blkid.splitlines():
2404         (key, value) = line.split('=')
2405         if key == 'ID_PART_TABLE_TYPE':
2406             table_type = value
2407     if table_type != 'gpt':
2408         return None    # not even GPT
2409
2410     if saw_part_entry:
2411         return None    # GPT, and blkid appears to be new, so we're done.
2412
2413     # bah, fall back to sgdisk.
2414     if 'blkid' not in warned_about:
2415         LOG.warning('Old blkid does not support ID_PART_ENTRY_* fields, trying sgdisk; may not correctly identify ceph volumes with dmcrypt')
2416         warned_about['blkid'] = True
2417     (base, partnum) = split_dev_base_partnum(part)
2418     sgdisk, _ = command(
2419         [
2420             'sgdisk',
2421             '-p',
2422             base,
2423         ]
2424     )
2425
2426     for line in sgdisk.splitlines():
2427         m = re.search('\s+(\d+)\s+\d+\s+\d+\s+\S+ \S+B\s+\S+\s+(.*)', line)
2428         if m is not None:
2429             num = m.group(1)
2430             if num != partnum:
2431                 continue
2432             desc = m.group(2)
2433             # assume unencrypted ... blkid has failed us :(
2434             if desc == 'ceph data':
2435                 return OSD_UUID
2436             if desc == 'ceph journal':
2437                 return JOURNAL_UUID
2438
2439     return None
2440
2441
2442 def get_partition_uuid(dev):
2443     #
2444     # blkid is prefered
2445     #
2446     what = 'ID_PART_ENTRY_UUID'
2447     out, _ = command(
2448         [
2449             'blkid',
2450             '-o',
2451             'udev',
2452             '-p',
2453             dev,
2454         ]
2455     )
2456     p = {}
2457     for line in out.splitlines():
2458         (key, value) = line.split('=')
2459         p[key] = value
2460     if what in p:
2461         return p[what]
2462     #
2463     # if blkid does not deliver, fallback to sgdisk
2464     #
2465     (base, partnum) = split_dev_base_partnum(dev)
2466     out, _ = command(['sgdisk', '-i', partnum, base])
2467     for line in out.splitlines():
2468         m = re.match('Partition unique GUID: (\S+)', line)
2469         if m:
2470             return m.group(1).lower()
2471     return None
2472
2473
2474 def more_osd_info(path, uuid_map):
2475     desc = []
2476     ceph_fsid = get_oneliner(path, 'ceph_fsid')
2477     if ceph_fsid:
2478         cluster = find_cluster_by_uuid(ceph_fsid)
2479         if cluster:
2480             desc.append('cluster ' + cluster)
2481         else:
2482             desc.append('unknown cluster ' + ceph_fsid)
2483
2484     who = get_oneliner(path, 'whoami')
2485     if who:
2486         desc.append('osd.%s' % who)
2487
2488     journal_uuid = get_oneliner(path, 'journal_uuid')
2489     if journal_uuid:
2490         journal_uuid = journal_uuid.lower()
2491         if journal_uuid in uuid_map:
2492             desc.append('journal %s' % uuid_map[journal_uuid])
2493
2494     return desc
2495
2496 def list_dev_osd(dev, uuid_map):
2497     path = is_mounted(dev)
2498     fs_type = get_dev_fs(dev)
2499     desc = []
2500     if path:
2501         desc.append('active')
2502         desc.extend(more_osd_info(path, uuid_map))
2503     elif fs_type:
2504         try:
2505             tpath = mount(dev=dev, fstype=fs_type, options='')
2506             if tpath:
2507                 try:
2508                     magic = get_oneliner(tpath, 'magic')
2509                     if magic is not None:
2510                         desc.append('prepared')
2511                         desc.extend(more_osd_info(tpath, uuid_map))
2512                 finally:
2513                     unmount(tpath)
2514         except MountError:
2515             pass
2516     return desc
2517
2518 def list_dev(dev, uuid_map, journal_map):
2519     ptype = 'unknown'
2520     prefix = ''
2521     if is_partition(dev):
2522         ptype = get_partition_type(dev)
2523         prefix = ' '
2524
2525     desc = []
2526     if ptype == OSD_UUID:
2527         desc = list_dev_osd(dev, uuid_map)
2528         if desc:
2529             desc = ['ceph data'] + desc
2530         else:
2531             desc = ['ceph data', 'unprepared']
2532     elif ptype == DMCRYPT_OSD_UUID:
2533         holders = is_held(dev)
2534         if not holders:
2535             desc = ['ceph data (dmcrypt plain)', 'not currently mapped']
2536         elif len(holders) == 1:
2537             holder = '/dev/' + holders[0]
2538             fs_desc = list_dev_osd(holder, uuid_map)
2539             desc = ['ceph data (dmcrypt plain %s)' % holder] + fs_desc
2540         else:
2541             desc = ['ceph data (dmcrypt plain)', 'holders: ' + ','.join(holders)]
2542     elif ptype == DMCRYPT_LUKS_OSD_UUID:
2543         holders = is_held(dev)
2544         if not holders:
2545             desc = ['ceph data (dmcrypt LUKS)', 'not currently mapped']
2546         elif len(holders) == 1:
2547             holder = '/dev/' + holders[0]
2548             fs_desc = list_dev_osd(holder, uuid_map)
2549             desc = ['ceph data (dmcrypt LUKS %s)' % holder] + fs_desc
2550         else:
2551             desc = ['ceph data (dmcrypt LUKS)', 'holders: ' + ','.join(holders)]
2552     elif ptype == JOURNAL_UUID:
2553         desc.append('ceph journal')
2554         part_uuid = get_partition_uuid(dev)
2555         if part_uuid and part_uuid in journal_map:
2556             desc.append('for %s' % journal_map[part_uuid])
2557     elif ptype == DMCRYPT_JOURNAL_UUID:
2558         holders = is_held(dev)
2559         if len(holders) == 1:
2560             desc = ['ceph journal (dmcrypt plain /dev/%s)' % holders[0]]
2561         else:
2562             desc = ['ceph journal (dmcrypt plain)']
2563         part_uuid = get_partition_uuid(dev)
2564         if part_uuid and part_uuid in journal_map:
2565             desc.append('for %s' % journal_map[part_uuid])
2566     elif ptype == DMCRYPT_LUKS_JOURNAL_UUID:
2567         holders = is_held(dev)
2568         if len(holders) == 1:
2569             desc = ['ceph journal (dmcrypt LUKS /dev/%s)' % holders[0]]
2570         else:
2571             desc = ['ceph journal (dmcrypt LUKS)']
2572         part_uuid = get_partition_uuid(dev)
2573         if part_uuid and part_uuid in journal_map:
2574             desc.append('for %s' % journal_map[part_uuid])
2575     else:
2576         path = is_mounted(dev)
2577         fs_type = get_dev_fs(dev)
2578         if is_swap(dev):
2579             desc.append('swap')
2580         else:
2581             desc.append('other')
2582         if fs_type:
2583             desc.append(fs_type)
2584         elif ptype:
2585             desc.append(ptype)
2586         if path:
2587             desc.append('mounted on %s' % path)
2588
2589     print '%s%s %s' % (prefix, dev, ', '.join(desc))
2590
2591
2592 def main_list(args):
2593     partmap = list_all_partitions()
2594
2595     uuid_map = {}
2596     journal_map = {}
2597     for base, parts in sorted(partmap.iteritems()):
2598         for p in parts:
2599             dev = get_dev_path(p)
2600             part_uuid = get_partition_uuid(dev)
2601             if part_uuid:
2602                 uuid_map[part_uuid] = dev
2603             ptype = get_partition_type(dev)
2604             if ptype == OSD_UUID:
2605                 fs_type = get_dev_fs(dev)
2606                 if fs_type is not None:
2607                     try:
2608                         tpath = mount(dev=dev, fstype=fs_type, options='')
2609                         try:
2610                             journal_uuid = get_oneliner(tpath, 'journal_uuid')
2611                             if journal_uuid:
2612                                 journal_map[journal_uuid.lower()] = dev
2613                         finally:
2614                             unmount(tpath)
2615                     except MountError:
2616                         pass
2617             if ptype == DMCRYPT_OSD_UUID or ptype == DMCRYPT_LUKS_OSD_UUID:
2618                 holders = is_held(dev)
2619                 if len(holders) == 1:
2620                     holder = '/dev/' + holders[0]
2621                     fs_type = get_dev_fs(holder)
2622                     if fs_type is not None:
2623                         try:
2624                             tpath = mount(dev=holder, fstype=fs_type, options='')
2625                             try:
2626                                 journal_uuid = get_oneliner(tpath, 'journal_uuid')
2627                                 if journal_uuid:
2628                                     journal_map[journal_uuid.lower()] = dev
2629                             finally:
2630                                 unmount(tpath)
2631                         except MountError:
2632                             pass
2633
2634     for base, parts in sorted(partmap.iteritems()):
2635         if parts:
2636             print '%s :' % get_dev_path(base)
2637             for p in sorted(parts):
2638                 list_dev(get_dev_path(p), uuid_map, journal_map)
2639         else:
2640             list_dev(get_dev_path(base), uuid_map, journal_map)
2641
2642
2643 ###########################
2644 #
2645 # Mark devices that we want to suppress activates on with a
2646 # file like
2647 #
2648 #  /var/lib/ceph/tmp/suppress-activate.sdb
2649 #
2650 # where the last bit is the sanitized device name (/dev/X without the
2651 # /dev/ prefix) and the is_suppress() check matches a prefix.  That
2652 # means suppressing sdb will stop activate on sdb1, sdb2, etc.
2653 #
2654
2655 def is_suppressed(path):
2656     disk = os.path.realpath(path)
2657     try:
2658         if not disk.startswith('/dev/') or not stat.S_ISBLK(os.lstat(path).st_mode):
2659             return False
2660         base = get_dev_name(disk)
2661         while len(base):
2662             if os.path.exists(SUPPRESS_PREFIX + base):  # noqa
2663                 return True
2664             base = base[:-1]
2665     except:
2666         return False
2667
2668
2669 def set_suppress(path):
2670     disk = os.path.realpath(path)
2671     if not os.path.exists(disk):
2672         raise Error('does not exist', path)
2673     if not stat.S_ISBLK(os.lstat(path).st_mode):
2674         raise Error('not a block device', path)
2675     base = get_dev_name(disk)
2676
2677     with file(SUPPRESS_PREFIX + base, 'w') as f:  # noqa
2678         pass
2679     LOG.info('set suppress flag on %s', base)
2680
2681
2682 def unset_suppress(path):
2683     disk = os.path.realpath(path)
2684     if not os.path.exists(disk):
2685         raise Error('does not exist', path)
2686     if not stat.S_ISBLK(os.lstat(path).st_mode):
2687         raise Error('not a block device', path)
2688     assert disk.startswith('/dev/')
2689     base = get_dev_name(disk)
2690
2691     fn = SUPPRESS_PREFIX + base  # noqa
2692     if not os.path.exists(fn):
2693         raise Error('not marked as suppressed', path)
2694
2695     try:
2696         os.unlink(fn)
2697         LOG.info('unset suppress flag on %s', base)
2698     except OSError as e:
2699         raise Error('failed to unsuppress', e)
2700
2701
2702 def main_suppress(args):
2703     set_suppress(args.path)
2704
2705
2706 def main_unsuppress(args):
2707     unset_suppress(args.path)
2708
2709
2710 def main_zap(args):
2711     for dev in args.dev:
2712         zap(dev)
2713
2714 ###########################
2715
2716
2717 def setup_statedir(dir):
2718     # XXX The following use of globals makes linting
2719     # really hard. Global state in Python is iffy and
2720     # should be avoided.
2721     global STATEDIR
2722     STATEDIR = dir
2723
2724     if not os.path.exists(STATEDIR):
2725         os.mkdir(STATEDIR)
2726     if not os.path.exists(STATEDIR + "/tmp"):
2727         os.mkdir(STATEDIR + "/tmp")
2728
2729     global prepare_lock
2730     prepare_lock = filelock(STATEDIR + '/tmp/ceph-disk.prepare.lock')
2731
2732     global activate_lock
2733     activate_lock = filelock(STATEDIR + '/tmp/ceph-disk.activate.lock')
2734
2735     global SUPPRESS_PREFIX
2736     SUPPRESS_PREFIX = STATEDIR + '/tmp/suppress-activate.'
2737
2738
2739 def setup_sysconfdir(dir):
2740     global SYSCONFDIR
2741     SYSCONFDIR = dir
2742
2743
2744 def parse_args():
2745     parser = argparse.ArgumentParser(
2746         'ceph-disk',
2747         )
2748     parser.add_argument(
2749         '-v', '--verbose',
2750         action='store_true', default=None,
2751         help='be more verbose',
2752         )
2753     parser.add_argument(
2754         '--prepend-to-path',
2755         metavar='PATH',
2756         default='/usr/bin',
2757         help='prepend PATH to $PATH for backward compatibility (default /usr/bin)',
2758         )
2759     parser.add_argument(
2760         '--statedir',
2761         metavar='PATH',
2762         default='/var/lib/ceph',
2763         help='directory in which ceph state is preserved (default /var/lib/ceph)',
2764         )
2765     parser.add_argument(
2766         '--sysconfdir',
2767         metavar='PATH',
2768         default='/etc/ceph',
2769         help='directory in which ceph configuration files are found (default /etc/ceph)',
2770         )
2771     parser.set_defaults(
2772         # we want to hold on to this, for later
2773         prog=parser.prog,
2774         cluster='ceph',
2775         )
2776
2777     subparsers = parser.add_subparsers(
2778         title='subcommands',
2779         description='valid subcommands',
2780         help='sub-command help',
2781         )
2782
2783     prepare_parser = subparsers.add_parser('prepare', help='Prepare a directory or disk for a Ceph OSD')
2784     prepare_parser.add_argument(
2785         '--cluster',
2786         metavar='NAME',
2787         help='cluster name to assign this disk to',
2788         )
2789     prepare_parser.add_argument(
2790         '--cluster-uuid',
2791         metavar='UUID',
2792         help='cluster uuid to assign this disk to',
2793         )
2794     prepare_parser.add_argument(
2795         '--osd-uuid',
2796         metavar='UUID',
2797         help='unique OSD uuid to assign this disk to',
2798         )
2799     prepare_parser.add_argument(
2800         '--journal-uuid',
2801         metavar='UUID',
2802         help='unique uuid to assign to the journal',
2803         )
2804     prepare_parser.add_argument(
2805         '--fs-type',
2806         help='file system type to use (e.g. "ext4")',
2807         )
2808     prepare_parser.add_argument(
2809         '--zap-disk',
2810         action='store_true', default=None,
2811         help='destroy the partition table (and content) of a disk',
2812         )
2813     prepare_parser.add_argument(
2814         '--data-dir',
2815         action='store_true', default=None,
2816         help='verify that DATA is a dir',
2817         )
2818     prepare_parser.add_argument(
2819         '--data-dev',
2820         action='store_true', default=None,
2821         help='verify that DATA is a block device',
2822         )
2823     prepare_parser.add_argument(
2824         '--journal-file',
2825         action='store_true', default=None,
2826         help='verify that JOURNAL is a file',
2827         )
2828     prepare_parser.add_argument(
2829         '--journal-dev',
2830         action='store_true', default=None,
2831         help='verify that JOURNAL is a block device',
2832         )
2833     prepare_parser.add_argument(
2834         '--dmcrypt',
2835         action='store_true', default=None,
2836         help='encrypt DATA and/or JOURNAL devices with dm-crypt',
2837         )
2838     prepare_parser.add_argument(
2839         '--dmcrypt-key-dir',
2840         metavar='KEYDIR',
2841         default='/etc/ceph/dmcrypt-keys',
2842         help='directory where dm-crypt keys are stored',
2843         )
2844     prepare_parser.add_argument(
2845         'data',
2846         metavar='DATA',
2847         help='path to OSD data (a disk block device or directory)',
2848         )
2849     prepare_parser.add_argument(
2850         'journal',
2851         metavar='JOURNAL',
2852         nargs='?',
2853         help=('path to OSD journal disk block device;'
2854               + ' leave out to store journal in file'),
2855         )
2856     prepare_parser.set_defaults(
2857         func=main_prepare,
2858         )
2859
2860     activate_parser = subparsers.add_parser('activate', help='Activate a Ceph OSD')
2861     activate_parser.add_argument(
2862         '--mount',
2863         action='store_true', default=None,
2864         help='mount a block device [deprecated, ignored]',
2865         )
2866     activate_parser.add_argument(
2867         '--activate-key',
2868         metavar='PATH',
2869         help='bootstrap-osd keyring path template (%(default)s)',
2870         dest='activate_key_template',
2871         )
2872     activate_parser.add_argument(
2873         '--mark-init',
2874         metavar='INITSYSTEM',
2875         help='init system to manage this dir',
2876         default='auto',
2877         choices=INIT_SYSTEMS,
2878         )
2879     activate_parser.add_argument(
2880         'path',
2881         metavar='PATH',
2882         nargs='?',
2883         help='path to block device or directory',
2884         )
2885     activate_parser.set_defaults(
2886         activate_key_template='{statedir}/bootstrap-osd/{cluster}.keyring',
2887         func=main_activate,
2888         )
2889
2890     activate_journal_parser = subparsers.add_parser('activate-journal', help='Activate an OSD via its journal device')
2891     activate_journal_parser.add_argument(
2892         'dev',
2893         metavar='DEV',
2894         help='path to journal block device',
2895         )
2896     activate_journal_parser.add_argument(
2897         '--activate-key',
2898         metavar='PATH',
2899         help='bootstrap-osd keyring path template (%(default)s)',
2900         dest='activate_key_template',
2901         )
2902     activate_journal_parser.add_argument(
2903         '--mark-init',
2904         metavar='INITSYSTEM',
2905         help='init system to manage this dir',
2906         default='auto',
2907         choices=INIT_SYSTEMS,
2908         )
2909     activate_journal_parser.set_defaults(
2910         activate_key_template='{statedir}/bootstrap-osd/{cluster}.keyring',
2911         func=main_activate_journal,
2912         )
2913
2914     activate_all_parser = subparsers.add_parser('activate-all', help='Activate all tagged OSD partitions')
2915     activate_all_parser.add_argument(
2916         '--activate-key',
2917         metavar='PATH',
2918         help='bootstrap-osd keyring path template (%(default)s)',
2919         dest='activate_key_template',
2920         )
2921     activate_all_parser.add_argument(
2922         '--mark-init',
2923         metavar='INITSYSTEM',
2924         help='init system to manage this dir',
2925         default='auto',
2926         choices=INIT_SYSTEMS,
2927         )
2928     activate_all_parser.set_defaults(
2929         activate_key_template='{statedir}/bootstrap-osd/{cluster}.keyring',
2930         func=main_activate_all,
2931         )
2932
2933     list_parser = subparsers.add_parser('list', help='List disks, partitions, and Ceph OSDs')
2934     list_parser.set_defaults(
2935         func=main_list,
2936         )
2937
2938     suppress_parser = subparsers.add_parser('suppress-activate', help='Suppress activate on a device (prefix)')
2939     suppress_parser.add_argument(
2940         'path',
2941         metavar='PATH',
2942         nargs='?',
2943         help='path to block device or directory',
2944         )
2945     suppress_parser.set_defaults(
2946         func=main_suppress,
2947         )
2948
2949     unsuppress_parser = subparsers.add_parser('unsuppress-activate', help='Stop suppressing activate on a device (prefix)')
2950     unsuppress_parser.add_argument(
2951         'path',
2952         metavar='PATH',
2953         nargs='?',
2954         help='path to block device or directory',
2955         )
2956     unsuppress_parser.set_defaults(
2957         func=main_unsuppress,
2958         )
2959
2960     zap_parser = subparsers.add_parser('zap', help='Zap/erase/destroy a device\'s partition table (and contents)')
2961     zap_parser.add_argument(
2962         'dev',
2963         metavar='DEV',
2964         nargs='+',
2965         help='path to block device',
2966         )
2967     zap_parser.set_defaults(
2968         func=main_zap,
2969         )
2970
2971     args = parser.parse_args()
2972     return args
2973
2974
2975 def main():
2976     args = parse_args()
2977
2978     loglevel = logging.WARNING
2979     if args.verbose:
2980         loglevel = logging.DEBUG
2981
2982     logging.basicConfig(
2983         level=loglevel,
2984         )
2985
2986     if args.prepend_to_path != '':
2987         path = os.environ.get('PATH', os.defpath)
2988         os.environ['PATH'] = args.prepend_to_path + ":" + path
2989
2990     setup_statedir(args.statedir)
2991     setup_sysconfdir(args.sysconfdir)
2992
2993     try:
2994         args.func(args)
2995
2996     except Error as e:
2997         raise SystemExit(
2998             '{prog}: {msg}'.format(
2999                 prog=args.prog,
3000                 msg=e,
3001             )
3002         )
3003
3004     except CephDiskException as error:
3005         exc_name = error.__class__.__name__
3006         raise SystemExit(
3007             '{prog} {exc_name}: {msg}'.format(
3008                 prog=args.prog,
3009                 exc_name=exc_name,
3010                 msg=error,
3011             )
3012         )
3013
3014
3015 if __name__ == '__main__':
3016     main()
3017     warned_about = {}