import logging
import os
import uuid
+from itertools import repeat
from math import floor
from ceph_volume import process, util
from ceph_volume.exceptions import (
report[type_uuid] = self.tags['ceph.{}'.format(type_uuid)]
return report
- def clear_tags(self):
+ def _format_tag_args(self, op, tags):
+ tag_args = ['{}={}'.format(k, v) for k, v in tags.items()]
+ # weird but efficient way of ziping two lists and getting a flat list
+ return list(sum(zip(repeat(op), tag_args), ()))
+
+ def clear_tags(self, keys=None):
"""
- Removes all tags from the Logical Volume.
+ Removes all or passed tags from the Logical Volume.
"""
- for k in list(self.tags):
- self.clear_tag(k)
+ if not keys:
+ keys = self.tags.keys()
+
+ del_tags = {k: self.tags[k] for k in keys if k in self.tags}
+ if not del_tags:
+ # nothing to clear
+ return
+ del_tag_args = self._format_tag_args('--deltag', del_tags)
+ # --deltag returns successful even if the to be deleted tag is not set
+ process.call(['lvchange'] + del_tag_args + [self.lv_path])
+ for k in del_tags.keys():
+ del self.tags[k]
def set_tags(self, tags):
At the end of all modifications, the tags are refreshed to reflect
LVM's most current view.
"""
+ self.clear_tags(tags.keys())
+ add_tag_args = self._format_tag_args('--addtag', tags)
+ process.call(['lvchange'] + add_tag_args + [self.lv_path])
for k, v in tags.items():
- self.set_tag(k, v)
+ self.tags[k] = v
def clear_tag(self, key):
assert self.foo_volume.tags == tags
expected = [
- ['lvchange', '--deltag', 'ceph.foo0=bar0', '/path'],
- ['lvchange', '--addtag', 'ceph.foo0=bar0', '/path'],
- ['lvchange', '--deltag', 'ceph.foo1=bar1', '/path'],
- ['lvchange', '--addtag', 'ceph.foo1=baz1', '/path'],
- ['lvchange', '--deltag', 'ceph.foo2=bar2', '/path'],
- ['lvchange', '--addtag', 'ceph.foo2=baz2', '/path'],
- ['lvchange', '--deltag', 'ceph.foo1=baz1', '/path'],
- ['lvchange', '--addtag', 'ceph.foo1=other1', '/path'],
+ sorted(['lvchange', '--deltag', 'ceph.foo0=bar0', '--deltag',
+ 'ceph.foo1=bar1', '--deltag', 'ceph.foo2=bar2', '/path']),
+ sorted(['lvchange', '--deltag', 'ceph.foo1=baz1', '/path']),
+ sorted(['lvchange', '--addtag', 'ceph.foo0=bar0', '--addtag',
+ 'ceph.foo1=baz1', '--addtag', 'ceph.foo2=baz2', '/path']),
+ sorted(['lvchange', '--addtag', 'ceph.foo1=other1', '/path']),
]
# The order isn't guaranted
for call in capture.calls:
- assert call['args'][0] in expected
+ assert sorted(call['args'][0]) in expected
assert len(capture.calls) == len(expected)
def test_clear_tags(self, monkeypatch, capture):
assert self.foo_volume_clean.tags == {}
expected = [
- ['lvchange', '--addtag', 'ceph.foo0=bar0', '/pathclean'],
- ['lvchange', '--addtag', 'ceph.foo1=bar1', '/pathclean'],
- ['lvchange', '--addtag', 'ceph.foo2=bar2', '/pathclean'],
- ['lvchange', '--deltag', 'ceph.foo0=bar0', '/pathclean'],
- ['lvchange', '--deltag', 'ceph.foo1=bar1', '/pathclean'],
- ['lvchange', '--deltag', 'ceph.foo2=bar2', '/pathclean'],
+ sorted(['lvchange', '--addtag', 'ceph.foo0=bar0', '--addtag',
+ 'ceph.foo1=bar1', '--addtag', 'ceph.foo2=bar2',
+ '/pathclean']),
+ sorted(['lvchange', '--deltag', 'ceph.foo0=bar0', '--deltag',
+ 'ceph.foo1=bar1', '--deltag', 'ceph.foo2=bar2',
+ '/pathclean']),
]
# The order isn't guaranted
for call in capture.calls:
- assert call['args'][0] in expected
+ assert sorted(call['args'][0]) in expected
assert len(capture.calls) == len(expected)