--- /dev/null
+"""
+Perform rados IO along with checksum
+this task creates objects and maintains checksum.
+This task also performs read, write, deletes and
+checks checksum ar random intervals.
+"""
+import logging
+import hashlib
+import random
+from StringIO import StringIO
+import json
+
+from teuthology import misc as teuthology
+from util.rados import rados
+
+log = logging.getLogger(__name__)
+
+
+class IO():
+ '''
+ This class implements a base class for different type of IOs
+ Object IO: using rados put
+ FS IO: Using cephFS
+ RGW IO
+ '''
+ def __init__(self, io_type='object', pool='default', until='migrate'):
+ self.csum = 0
+ self.pool = pool
+ # sepcial variable to differentiate
+ # when to stop IO, becasuse if we are using this task
+ # in case of fs to bs migration we may want to stop
+ # after all the osds are converted to bs
+ # if until="migrate" then till migration
+ # if until="50" it will be time
+ self.until = until
+ self.timeout = 0
+ try:
+ if type(int(until)) == int:
+ self.timeout = int(until)
+ except ValueError:
+ pass
+ self.io_type = io_type
+
+ def read(self):
+ ''' implement in child class'''
+ pass
+
+ def write(self):
+ '''implement in child class '''
+ pass
+
+ def csum_calc(self):
+ '''implement in child class'''
+ pass
+
+ def rm_obj(self):
+ ''' implement in child class'''
+ pass
+
+
+class RadosObj(IO):
+ '''
+ This class implements IO specific to rados objects
+ we use rados put, get and rm operations
+ '''
+ def __init__(self, ctx, mon, manager=None, pool='default12', maxobj=1000):
+ self.nobj = 0
+ self.maxobj = maxobj
+ IO.__init__(self, pool=pool)
+ self.total = 0 # Total objects of this type in the cluster
+ self.obj_prefix = "robj_"
+ self.ctx = ctx
+ self.mon = mon
+ self.manager = manager
+ self.ref_obj = "/tmp/reffile"
+ self.prev_max = 1 # Used to randomly select the different object range
+ self.size = 1024 * 1024
+ self.poolcreate()
+
+ def poolcreate(self):
+ cmd = [
+ 'sudo',
+ 'ceph',
+ 'osd',
+ 'pool',
+ 'create',
+ self.pool,
+ '1',
+ '1',
+ ]
+ proc = self.mon.run(
+ args=cmd,
+ stdout=StringIO(),
+ )
+ out = proc.stdout.getvalue()
+ log.info(out)
+
+
+ def find_range(self):
+ '''
+ returns (start, stop) range for ops like read, calc_csum
+ '''
+ nobj = random.randint(1, self.total)
+ if (self.prev_max + self.nobj) < self.total:
+ start = self.prev_max
+ self.prev_max = self.prev_max + nobj
+ else:
+ self.prev_max = nobj
+ start = 1
+
+ if (start + nobj) > self.total:
+ stop = self.total
+ else:
+ stop = start + nobj
+ return (start, stop)
+
+ def read(self):
+ '''
+ This function reads random objects
+ Number of objects to read will be randomly chosen per iteration
+ '''
+ if (self.total) == 0:
+ log.info("Nothing to read as no objects in the cluster")
+ return
+ (start, stop) = self.find_range()
+
+ for i in range(start, stop):
+ cmd = [
+ 'sudo',
+ 'rados',
+ '-p',
+ self.pool,
+ 'get',
+ self.obj_prefix + str(i),
+ '/tmp/null',
+ ]
+ proc = self.mon.run(
+ args=cmd,
+ stdout=StringIO(),
+ )
+ out = proc.stdout.getvalue()
+ log.info(out)
+
+ def calc_csum(self):
+ '''
+ This function calculates checksum of one object
+ and copy of same objects will be place all over the cluster
+ so that it will be easy to perform expected csum
+ validation
+ '''
+ if (self.total) == 0:
+ log.info("Nothing to read as no objects in the cluster")
+ return
+
+ rfile = "/tmp/tmpobj"
+
+ (start, stop) = self.find_range()
+ for i in range(start, stop):
+ cmd = [
+ 'sudo',
+ 'rados',
+ '-p',
+ self.pool,
+ 'get',
+ self.obj_prefix + str(i),
+ rfile,
+ ]
+ proc = self.mon.run(
+ args=cmd,
+ stdout=StringIO(),
+ )
+ out = proc.stdout.getvalue()
+ log.info(out)
+
+ data = teuthology.get_file(self.mon, rfile)
+ t_csum = hashlib.sha1(data).hexdigest()
+ if self.csum != t_csum:
+ log.error("csum mismatch for {}".format(
+ self.obj_prefix + str(i)))
+ assert False
+ else:
+ log.info("csum matched !!!")
+
+ def write(self):
+ '''
+ create random number of objects with given name,content
+ of the object will be as same as reference object
+ '''
+ nobj = random.randint(1, self.total+10)
+ start = self.total + 1
+ stop = start + nobj
+
+ for i in range(start, stop+1):
+ cmd = [
+ 'sudo',
+ 'rados',
+ '-p',
+ self.pool,
+ 'put',
+ self.obj_prefix + str(i),
+ self.ref_obj,
+ ]
+
+ try:
+ proc = self.mon.run(
+ args=cmd,
+ stdout=StringIO(),
+ )
+ out = proc.stdout.getvalue()
+ log.info(out)
+ self.total = self.total + 1
+ except:
+ log.error("Failed to put {}".format(self.obj_prefix + str(i)))
+ assert False
+
+ def rm_obj(self):
+ '''
+ Remove random number of objects from the pool
+ '''
+ if (self.total) == 0:
+ log.info("Nothing to read as no objects in the cluster")
+ return
+ (start, stop) = self.find_range()
+ for i in range(start, stop):
+ try:
+ rados(self.ctx, self.mon, ['-p', self.pool, 'rm',
+ self.obj_prefix + str(i)])
+ self.total = self.total - 1
+ except:
+ log.error("Unable to rm {}".format(self.obj_prefix + str(i)))
+ assert False
+
+ def check_all_bluestore(self):
+ '''
+ return True if all osds are on bluestore
+ else False
+ '''
+ osd_cnt = teuthology.num_instances_of_type(self.ctx.cluster, 'osd')
+ log.info("Total osds = {}".format(osd_cnt))
+ fcount = 0
+
+ store_cnt = [
+ 'sudo',
+ 'ceph',
+ 'osd',
+ 'count-metadata',
+ 'osd_objectstore',
+ ]
+
+ proc = self.mon.run(
+ args=store_cnt,
+ stdout=StringIO(),
+ )
+ out = proc.stdout.getvalue()
+ log.info(out)
+
+ dout = json.loads(out)
+ log.info(dout)
+ if 'bluestore' in dout:
+ fcount = dout['bluestore']
+
+ if fcount != osd_cnt:
+ log.error("Not all osds are on bluestore")
+ return False
+
+ return True
+
+ def check_timeout(self):
+ if self.timeout == 0:
+ return True
+ else:
+ self.timeout = self.timeout - 1
+ return False
+
+ def time_to_stop(self):
+ '''
+ This function decides whether stop IOs or not
+ '''
+ if self.until == 'migrate':
+ return self.check_all_bluestore()
+ else:
+ try:
+ if type(int(self.until)) == int:
+ return self.check_timeout()
+ except ValueError:
+ pass
+
+
+ def create_ref_data(self):
+ refpath = "/tmp/reffile"
+ refbuf = "HELLOCEPH" * (self.size / len("HELLOCEPH"))
+ self.csum = hashlib.sha1(refbuf).hexdigest()
+ with open(refpath, "w") as ref:
+ ref.write(refbuf)
+ self.mon.put_file(refpath, self.ref_obj)
+
+ def runner(self):
+ '''
+ This function randomly call read, write and calc_sum function
+ This function also check the condition 'until' to determine
+ when to terminate the IOs.
+ '''
+ log.info("Inside runner")
+ self.create_ref_data()
+ while not self.time_to_stop():
+ op = {1: self.write, 2: self.read, 3: self.calc_csum, 4: self.rm_obj}
+ rand_op = op[random.randint(1, 3)]
+ rand_op()
+
+
+def task(ctx, config):
+ """
+ Task for handling IO with checksum
+ """
+ log.info("Inside io task")
+ if config is None:
+ config = {}
+ assert isinstance(config, dict), \
+ 'IO with csum task accepts a dict for config'
+
+ # manager = ctx.managers['ceph']
+ first_mon = teuthology.get_first_mon(ctx, config)
+ (mon,) = ctx.cluster.only(first_mon).remotes.iterkeys()
+ # TODO instantiate IO class and call runner
+ # as of now hard setting the 'until' and 'io_type'
+ # TODO get it from config if any new cases added
+ IO = RadosObj(ctx, mon)
+ IO.runner()
--- /dev/null
+"""
+Migrate from filestore to bluestore
+"""
+import logging
+import json
+import time
+from StringIO import StringIO
+
+from teuthology import misc as teuthology
+# from util.rados import rados
+
+LOG = logging.getLogger(__name__)
+
+
+def is_active_osd_systemd(remote, osdid):
+ ''' check status through systemd'''
+ is_active = [
+ 'sudo',
+ 'systemctl',
+ 'is-active',
+ 'ceph-osd@{}'.format(osdid),
+ ]
+ proc = remote.run(
+ args=is_active,
+ stdout=StringIO(),
+ stderr=StringIO(),
+ check_status=False,
+ )
+ if proc.stdout is not None:
+ status = proc.stdout.getvalue()
+ else:
+ status = proc.stderr.getvalue()
+ LOG.info(status)
+ return "active" == status.strip('\n')
+
+
+def is_active_osd(remote, osdid):
+ return is_active_osd_systemd(remote, osdid)
+
+
+def start_osd_systemd(remote, osdid, timeout=600):
+ '''start an osd using systemd command'''
+ start = time.time()
+ status = is_active_osd(remote, osdid)
+ if status:
+ return
+
+ start_cmd = [
+ 'sudo',
+ 'systemctl',
+ 'start',
+ 'ceph-osd@{}'.format(osdid),
+ ]
+ remote.run(
+ args=start_cmd,
+ wait=False,
+ )
+
+ while timeout > (time.time() - start):
+ active = is_active_osd(remote, osdid)
+ if active:
+ return
+ else:
+ LOG.info("osd status is not UP, looping again")
+ continue
+ LOG.error("Failed to start osd.{}".format(osdid))
+ assert False
+
+
+def start_osd(remote, osd):
+ start_osd_systemd(remote, osd)
+
+
+def stop_osd_systemd(remote, osdid):
+ ''' stop osd through systemd command '''
+ active = is_active_osd(remote, osdid)
+ if active:
+ stop_cmd = [
+ 'sudo',
+ 'systemctl',
+ 'stop',
+ 'ceph-osd@{}'.format(osdid),
+ ]
+ remote.run(
+ args=stop_cmd,
+ wait=False,
+ )
+ else:
+ LOG.info("osd.{} is not running".format(osdid))
+ return 1
+
+ time.sleep(5)
+ active = is_active_osd(remote, osdid)
+ return active
+
+
+def stop_osd(remote, osdid):
+ ''' stop osd : may be daemon-helper or systemd'''
+ return stop_osd_systemd(remote, osdid)
+
+
+def unmount_osd(remote, disk):
+ '''
+ TODO: to be handled for different journal partition, encrypted etc..
+ as of now just unmount it
+ '''
+ if 'encrypted' in disk:
+ '''TODO: handle if disk is encrypted'''
+ pass
+ if 'lvm' in disk:
+ '''TODO: may need to handle this'''
+ pass
+ target = disk['data']['path']
+ umnt = [
+ 'sudo',
+ 'umount',
+ '{}'.format(target),
+ ]
+ proc = remote.run(
+ args=umnt,
+ check_status=False,
+ )
+ time.sleep(5)
+ proc.wait()
+ return proc.exitstatus == 0
+
+
+def prepare_to_migrate(remote, osdid, disk):
+ ''' Stops the osd and unmount the disk '''
+ ret = 0
+
+ ret = stop_osd(remote, osdid)
+ if ret:
+ LOG.error("Could not stop osd.{} OR its already stopped".format(osdid))
+ assert False
+ LOG.info("Stopped osd.{}".format(osdid))
+
+ ret = unmount_osd(remote, disk)
+ if not ret:
+ LOG.error("Couldn't unmount disk {} for osd {}".format(disk, osdid))
+ assert False
+
+
+def is_worth_migrating(osd, mon):
+ ''' If its not filestore osd then its not worth trying migration'''
+ osd_meta = [
+ 'sudo',
+ 'ceph',
+ 'osd',
+ 'metadata',
+ '{}'.format(osd),
+ ]
+ proc = mon.run(
+ args=osd_meta,
+ stdout=StringIO(),
+ stderr=StringIO(),
+ )
+ if proc.stdout is not None:
+ status = proc.stdout.getvalue()
+ else:
+ status = proc.stderr.getvalue()
+ LOG.info(status)
+ dstatus = json.loads(status)
+ return dstatus['osd_objectstore'] == 'filestore'
+
+
+def check_objectstore(osd, mon, store):
+ ''' check what objectstore it has '''
+ osd_meta = [
+ 'sudo',
+ 'ceph',
+ 'osd',
+ 'metadata',
+ '{}'.format(osd),
+ ]
+ proc = mon.run(
+ args=osd_meta,
+ stdout=StringIO(),
+ stderr=StringIO(),
+ )
+ if proc.stdout is not None:
+ status = proc.stdout.getvalue()
+ else:
+ status = proc.stderr.getvalue()
+ LOG.info(status)
+
+ dstatus = json.loads(status)
+ return dstatus['osd_objectstore'] == store
+
+
+def check_safe_to_destroy(mon, osdid, timeout=600):
+ ''' check whether its safe to destroy osd'''
+ start = time.time()
+ is_safe = [
+ 'sudo',
+ 'ceph',
+ 'osd',
+ 'safe-to-destroy',
+ str(osdid),
+ ]
+
+ while timeout > (time.time() - start):
+ proc = mon.run(
+ args=is_safe,
+ stdout=StringIO(),
+ stderr=StringIO(),
+ check_status=False,
+ )
+ out = proc.stdout.getvalue()
+ err = proc.stderr.getvalue()
+ LOG.info(out)
+ LOG.info(err)
+
+ if "EBUSY" in err:
+ # looks like recovery is still in progress try for sometime
+ continue
+ if "safe to destroy" in out or "safe to destroy" in err:
+ return
+ LOG.error("OSD never reached safe-to-destroy condition")
+ assert False
+
+
+def zap_volume(remote, disk):
+ ''' zap the disk '''
+ if 'encrypted' in disk:
+ '''TODO: handle this'''
+ pass
+ if 'lvm' in disk:
+ '''TODO: handle this'''
+ pass
+ target = disk['data']['path']
+ zap_cmd = [
+ 'sudo',
+ 'ceph-volume',
+ 'lvm',
+ 'zap',
+ target,
+ ]
+ proc = remote.run(
+ args=zap_cmd,
+ stdout=StringIO(),
+ stderr=StringIO(),
+ )
+ if proc.stdout is not None:
+ status = proc.stdout.getvalue()
+ else:
+ status = proc.stderr.getvalue()
+ LOG.info(status)
+ if 'success' not in status:
+ LOG.error("Failed to zap disk {}".format(target))
+ assert False
+
+
+def destroy_osd(mon, osd_num):
+ ''' destroy the osd'''
+ d_cmd = [
+ 'sudo',
+ 'ceph',
+ 'osd',
+ 'destroy',
+ str(osd_num),
+ '--yes-i-really-mean-it',
+ ]
+ mon.run(
+ args=d_cmd,
+ wait=False
+ )
+
+
+def create_bluestore(remote, disk, osd_num, mon):
+ ''' create bluestore on the disk '''
+ time.sleep(10)
+ osd_tree = [
+ 'sudo',
+ 'ceph',
+ 'osd',
+ 'tree',
+ ]
+ proc = mon.run(
+ args=osd_tree,
+ stdout=StringIO(),
+ stderr=StringIO(),
+ )
+ if proc.stdout is not None:
+ status = proc.stdout.getvalue()
+ else:
+ status = proc.stdout.getvalue()
+
+ LOG.info(status)
+
+ target = disk['data']['path']
+ create_blue = [
+ 'sudo',
+ 'ceph-volume',
+ 'lvm',
+ 'create',
+ '--bluestore',
+ '--data',
+ target,
+ '--osd-id',
+ str(osd_num),
+ ]
+ proc = remote.run(
+ args=create_blue,
+ stdout=StringIO(),
+ stderr=StringIO(),
+ )
+ if proc.stdout is not None:
+ status = proc.stdout.getvalue()
+ else:
+ status = proc.stderr.getvalue()
+ LOG.info(status)
+
+ if "lvm create successful" not in status:
+ LOG.error("lvm create failed for osd.{}".format(osd_num))
+ assert False
+ LOG.info("lvm creation successful for osd.{}".format(osd_num))
+
+
+def build_remote_osds(osd_disk_info):
+ '''
+ osd_disk_info is a dict with osd number as key,
+ for easy of iteration purpose we will build a new dict with
+ remote as key so that migration loop will be looping around
+ all the osds on a single remote
+ O/P: {'remote':[{osd1:{'data':disk1}, {'journal':disk2}}, {osd2:disk2}]}
+ '''
+ remotes_osds_disks = dict()
+ for osd, disks_remote in osd_disk_info.iteritems():
+ if disks_remote[1] not in remotes_osds_disks:
+ remotes_osds_disks[disks_remote[1]] = []
+ remotes_osds_disks[disks_remote[1]].append({osd: disks_remote[0]})
+ return remotes_osds_disks
+
+
+def migrate(manager, osd_disk_info, mon):
+ ''' Prepare and migrate to bluestore'''
+ remotes_osds_disks = build_remote_osds(osd_disk_info)
+ for remote, osd_disks in remotes_osds_disks.iteritems():
+ for ele in osd_disks:
+ for osd_id, disk in ele.iteritems():
+ if not is_worth_migrating(osd_id, mon):
+ LOG.warn("Skipping migration of osd %s" % osd_id)
+ else:
+ manager.mark_out_osd(osd_id)
+ manager.wait_for_clean()
+ check_safe_to_destroy(mon, osd_id)
+ prepare_to_migrate(remote, osd_id, disk)
+ zap_volume(remote, disk)
+ destroy_osd(mon, osd_id)
+ create_bluestore(remote, disk, osd_id, mon)
+ start_osd(remote, osd_id)
+ manager.mark_in_osd(osd_id)
+ manager.wait_for_clean()
+ # check whether it has bluestore now
+ if not check_objectstore(osd_id, mon, "bluestore"):
+ LOG.error("Failed conversion from fs to bs in osd.{}".
+ format(osd_id))
+ assert False
+ LOG.info("Successfully migrated from fs to bs for osd.{}".
+ format(osd_id))
+
+
+def check_all_filestore(mon, num_osds):
+ ''' Check whether all the osds are currently filestore or not '''
+ fcount = 0
+ objstore = [
+ 'sudo',
+ 'ceph',
+ 'osd',
+ 'count-metadata',
+ 'osd_objectstore',
+ ]
+
+ proc = mon.run(
+ args=objstore,
+ stdout=StringIO(),
+ )
+ if proc.stdout is not None:
+ out = proc.stdout.getvalue()
+ else:
+ out = proc.stderr.getvalue()
+ LOG.info(out)
+
+ dout = json.loads(out)
+ fcount = dout['filestore']
+
+ if fcount != num_osds:
+ LOG.warning("Looks like not all osds are on filestore \
+ before migration")
+ if 'bluestore' in out:
+ LOG.warning("some of the osds are already on Bluestore")
+
+
+def task(ctx, config):
+ """
+ task for migrating from filestore to bluestore
+ """
+ if config is None:
+ config = {}
+ assert isinstance(config, dict), \
+ 'Migrate task accepts only dict'
+ manager = ctx.managers['ceph']
+ first_mon = teuthology.get_first_mon(ctx, config)
+ (mon,) = ctx.cluster.only(first_mon).remotes.iterkeys()
+
+ num_osds = teuthology.num_instances_of_type(ctx.cluster, 'osd')
+ LOG.info('number of osds = {}'.format(num_osds))
+
+ check_all_filestore(mon, num_osds)
+ migrate(manager, ctx.osd_disk_info, mon)