dir_contents = os.listdir(archive_dir)
def is_job_dir(parent, subdir):
- if (os.path.isdir(os.path.join(parent, subdir)) and re.match('\d+$',
+ if (os.path.isdir(os.path.join(parent, subdir)) and re.match(r'\d+$',
subdir)):
return True
return False
def canonicalize_hostname(hostname, user='ubuntu'):
hostname_expr = hostname_expr_templ.format(
- lab_domain=config.lab_domain.replace('.', '\.'))
+ lab_domain=config.lab_domain.replace('.', r'\.'))
match = re.match(hostname_expr, hostname)
if _is_ipv4(hostname) or _is_ipv6(hostname):
return "%s@%s" % (user, hostname)
def decanonicalize_hostname(hostname):
lab_domain = ''
if config.lab_domain:
- lab_domain='\.' + config.lab_domain.replace('.', '\.')
+ lab_domain=r'\.' + config.lab_domain.replace('.', r'\.')
hostname_expr = hostname_expr_templ.format(lab_domain=lab_domain)
match = re.match(hostname_expr, hostname)
if match:
log.info('Unmount any osd tmpfs dirs...')
ctx.cluster.run(
args=[
- 'egrep', 'tmpfs\s+/mnt', '/etc/mtab', run.Raw('|'),
+ 'egrep', r'tmpfs\s+/mnt', '/etc/mtab', run.Raw('|'),
'awk', '{print $2}', run.Raw('|'),
'xargs', '-r',
'sudo', 'umount', run.Raw(';'),
if remote.os.package_type != 'rpm':
continue
remote.run(
- args="sudo find /var/cache/yum -name 'timedhosts' -exec rm {} \;",
+ args=r"sudo find /var/cache/yum -name 'timedhosts' -exec rm {} \;",
check_status=False, timeout=180
)
with safe_while(sleep=2, tries=30,
action="get ip " + self['id']) as proceed:
while proceed():
- found = re.match('.*\d+', self['addresses'])
+ found = re.match(r'.*\d+', self['addresses'])
if found:
return self['addresses']
self.set_info()
self.private_ip = self.get_ip_neutron()
except Exception as e:
log.debug("ignoring get_ip_neutron exception " + str(e))
- self.private_ip = re.findall(network + '=([\d.]+)',
+ self.private_ip = re.findall(network + r'=([\d.]+)',
self.get_addresses())[0]
return self.private_ip
self.status_cmd + " | grep 'Main.*code=exited'",
)
line = out.strip().split('\n')[-1]
- exit_code = int(re.match('.*status=(\d+).*', line).groups()[0])
+ exit_code = int(re.match(r'.*status=(\d+).*', line).groups()[0])
if exit_code:
self.remote.run(
args=self.output_cmd
"""
return the instance name suffixed with the IP address.
"""
- digits = map(int, re.findall('(\d+)\.(\d+)\.(\d+)\.(\d+)', ip)[0])
+ digits = map(int, re.findall(r'(\d+)\.(\d+)\.(\d+)\.(\d+)', ip)[0])
return prefix + "%03d%03d%03d%03d" % tuple(digits)
def create(self, num, os_type, os_version, arch, resources_hint):
teuthology.provision.reimage(ctx, 'f.q.d.n.org', 'not-defined-type')
e_str = str(e_info)
print("Caught exception: " + e_str)
- assert e_str.find("configured\sprovisioners") == -1
+ assert e_str.find(r"configured\sprovisioners") == -1
with raises(Exception) as e_info:
teuthology.provision.reimage(ctx, 'f.q.d.n.org', 'common_type')
e_str = str(e_info)
print("Caught exception: " + e_str)
- assert e_str.find("used\swith\sone\sprovisioner\sonly") == -1
+ assert e_str.find(r"used\swith\sone\sprovisioner\sonly") == -1
teuthology.provision.reimage(ctx, 'f.q.d.n.org', 'ptype1')
e_str = str(e_info)
print("Caught exception: " + e_str)
- assert e_str.find("Name\sor\sservice\snot\sknown") == -1
+ assert e_str.find(r"Name\sor\sservice\snot\sknown") == -1
base = config.get_ceph_git_url()
else:
base = 'https://github.com/{project_owner}/{project}'
- url_templ = re.sub('\.git$', '', base)
+ url_templ = re.sub(r'\.git$', '', base)
return url_templ.format(project_owner=project_owner, project=project)
return {}
jobs = {}
for item in os.listdir(archive_dir):
- if not re.match('\d+$', item):
+ if not re.match(r'\d+$', item):
continue
job_id = item
job_dir = os.path.join(archive_dir, job_id)
else:
if "Test failure:" in self.failure_reason:
return self.failure_reason == job.get_failure_reason()
- elif re.search("workunit test (.*)\) on ", self.failure_reason):
- workunit_name = re.search("workunit test (.*)\) on ", self.failure_reason).group(1)
- other_match = re.search("workunit test (.*)\) on ", job.get_failure_reason())
+ elif re.search(r"workunit test (.*)\) on ", self.failure_reason):
+ workunit_name = re.search(r"workunit test (.*)\) on ", self.failure_reason).group(1)
+ other_match = re.search(r"workunit test (.*)\) on ", job.get_failure_reason())
return other_match is not None and workunit_name == other_match.group(1)
else:
reason_ratio = difflib.SequenceMatcher(None, self.failure_reason, job.get_failure_reason()).ratio()
for line in grep(tlog_path, "command crashed with signal"):
log.debug("Found a crash indication: {0}".format(line))
# tasks.ceph.osd.1.plana82.stderr
- match = re.search("tasks.ceph.([^\.]+).([^\.]+).([^\.]+).stderr", line)
+ match = re.search(r"tasks.ceph.([^\.]+).([^\.]+).([^\.]+).stderr", line)
if not match:
log.warning("Not-understood crash indication {0}".format(line))
continue
}
},
'flavor': Placeholder('flavor'),
- 'log-ignorelist': ['\(MDS_ALL_DOWN\)',
- '\(MDS_UP_LESS_THAN_MAX\)'],
+ 'log-ignorelist': [r'\(MDS_ALL_DOWN\)',
+ r'\(MDS_UP_LESS_THAN_MAX\)'],
'sha1': Placeholder('ceph_hash'),
},
'ceph-deploy': {
@staticmethod
def _repo_name(url):
- return re.sub('\.git$', '', url.split('/')[-1])
+ return re.sub(r'\.git$', '', url.split('/')[-1])
def choose_suite_branch(self):
suite_repo_name = self.suite_repo_name
run.Raw('|'),
# Any package that is unpacked or half-installed and also requires
# reinstallation
- 'grep', '^.\(U\|H\)R',
+ 'grep', r'^.\(U\|H\)R',
run.Raw('|'),
'awk', '{print $2}',
run.Raw('|'),
if re.match('^title', line):
titleline = line
titlelinenum = linenum
- if re.match('(^\s+)root', line):
+ if re.match(r'(^\s+)root', line):
rootline = line
- if re.match('(^\s+)kernel', line):
+ if re.match(r'(^\s+)kernel', line):
kernelline = line
for word in line.split(' '):
if 'vmlinuz' in word:
kernelversion = word.split('vmlinuz-')[-1]
- if re.match('(^\s+)initrd', line):
+ if re.match(r'(^\s+)initrd', line):
initline = line
if (kernelline != '') and (initline != ''):
break
se_allowlist = self.config.get('allowlist', [])
if se_allowlist:
known_denials.extend(se_allowlist)
- ignore_known_denials = '\'\(' + str.join('\|', known_denials) + '\)\''
+ ignore_known_denials = r'\'\(' + str.join(r'\|', known_denials) + r'\)\''
for remote in self.cluster.remotes.keys():
proc = remote.run(
args=['sudo', 'grep', '-a', 'avc: .*denied',