net = [n for n in r.keys()
if ipaddress.ip_address(ip) in ipaddress.ip_network(n)]
if net and iface in r[net[0]]:
- assert(iface)
+ assert iface
r[net[0]][iface].add(ip)
return r
for rank in range(num_ranks):
if rank in by_rank:
d = by_rank[rank]
- assert(d.ports)
+ assert d.ports
servers.append({
'name': f"{spec.backend_service}.{rank}",
'ip': d.ip or resolve_ip(self.mgr.inventory.get_addr(str(d.hostname))),
def self_test(self) -> None:
r = self.get('io_rate')
- assert('pg_stats_delta' in r)
- assert('stamp_delta' in r['pg_stats_delta'])
- assert('stat_sum' in r['pg_stats_delta'])
- assert('num_read_kb' in r['pg_stats_delta']['stat_sum'])
- assert('num_write_kb' in r['pg_stats_delta']['stat_sum'])
- assert('num_write' in r['pg_stats_delta']['stat_sum'])
- assert('num_read' in r['pg_stats_delta']['stat_sum'])
+ assert 'pg_stats_delta' in r
+ assert 'stamp_delta' in r['pg_stats_delta']
+ assert 'stat_sum' in r['pg_stats_delta']
+ assert 'num_read_kb' in r['pg_stats_delta']['stat_sum']
+ assert 'num_write_kb' in r['pg_stats_delta']['stat_sum']
+ assert 'num_write' in r['pg_stats_delta']['stat_sum']
+ assert 'num_read' in r['pg_stats_delta']['stat_sum']
@CLIReadCommand('iostat', poll=True)
def iostat(self, width: int = 80, print_header: bool = False) -> HandleCommandResult:
chunk_start = 0
chunk_size = 0x20000 # 131072 bytes, default max ssl buffer size
while chunk_start + chunk_size < file_size:
- yield(chunk_start, chunk_size)
+ yield chunk_start, chunk_size
chunk_start += chunk_size
final_chunk_size = file_size - chunk_start
- yield(chunk_start, final_chunk_size)
+ yield chunk_start, final_chunk_size
def to_bytes(param):