import random
import time
import gevent
+import base64
import json
import threading
import os
self.all_up()
+class ObjectStoreTool:
+
+ def __init__(self, manager, pool, **kwargs):
+ self.manager = manager
+ self.pool = pool
+ self.osd = kwargs.get('osd', None)
+ self.object_name = kwargs.get('object_name', None)
+ if self.osd and self.pool and self.object_name:
+ if self.osd == "primary":
+ self.osd = self.manager.get_object_primary(self.pool,
+ self.object_name)
+ assert self.osd
+ if self.object_name:
+ self.pgid = self.manager.get_object_pg_with_shard(self.pool,
+ self.object_name,
+ self.osd)
+ self.remote = self.manager.ctx.\
+ cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()[0]
+ path = self.manager.get_filepath().format(id=self.osd)
+ self.paths = ("--data-path {path} --journal-path {path}/journal".
+ format(path=path))
+
+ def build_cmd(self, options, args, stdin):
+ lines = []
+ if self.object_name:
+ lines.append("object=$(sudo ceph-objectstore-tool "
+ "{paths} --pgid {pgid} --op list |"
+ "grep '\"oid\":\"{name}\"')".
+ format(paths=self.paths,
+ pgid=self.pgid,
+ name=self.object_name))
+ args = '"$object" ' + args
+ options += " --pgid {pgid}".format(pgid=self.pgid)
+ cmd = ("sudo ceph-objectstore-tool {paths} {options} {args}".
+ format(paths=self.paths,
+ args=args,
+ options=options))
+ if stdin:
+ cmd = ("echo {payload} | base64 --decode | {cmd}".
+ format(payload=base64.encode(kwargs['stdin']),
+ cmd=cmd))
+ lines.append(cmd)
+ return "\n".join(lines)
+
+ def run(self, options, args, stdin=None):
+ self.manager.kill_osd(self.osd)
+ cmd = self.build_cmd(options, args, stdin)
+ self.manager.log(cmd)
+ try:
+ proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
+ check_status=False,
+ stdout=StringIO(),
+ stderr=StringIO())
+ proc.wait()
+ if proc.exitstatus != 0:
+ self.manager.log("failed with " + str(proc.exitstatus))
+ error = proc.stdout.getvalue() + " " + proc.stderr.getvalue()
+ raise Exception(error)
+ finally:
+ self.manager.revive_osd(self.osd)
+
+
class CephManager:
"""
Ceph manager object.
check_status=check_status
)
+ def objectstore_tool(self, pool, options, args, **kwargs):
+ return ObjectStoreTool(self, pool, **kwargs).run(options, args)
+
def get_pgid(self, pool, pgnum):
"""
:param pool: pool name