They may have some values in quotes, so a safe way to compare is to
extract the key.
"""
- with file(file_path, 'r') as f:
+ with open(file_path) as f:
for line in f:
content = line.strip()
if len(content) == 0:
return False
keyring_name_local = keytype_path_to(args, keytype)
keyring_path_local = os.path.join(dest_dir, keyring_name_local)
- with file(keyring_path_local, 'w') as f:
+ with open(keyring_path_local, 'wb') as f:
for line in out:
- f.write(line + '\n')
+ f.write(line + b'\n')
return True
return False
mon_name_local = keytype_path_to(args, "mon")
mon_path_local = os.path.join(dest_dir, mon_name_local)
- with file(mon_path_local, 'w') as f:
+ with open(mon_path_local, 'wb') as f:
f.write(mon_key)
rlogger = logging.getLogger(host)
path_asok = ceph_deploy.util.paths.mon.asok(args.cluster, remote_hostname)
rlogger.debug(line)
return False
try:
- mon_status = json.loads("".join(out))
+ mon_status = json.loads(b''.join(out).decode('utf-8'))
except ValueError:
rlogger.error('"ceph mon_status %s" output was not json', host)
for line in out:
0, # le32 created: nanoseconds,
len(key), # le16: len(key)
)
- return base64.b64encode(header + key)
+ return base64.b64encode(header + key).decode('utf-8')
def ssh_copy_keys(hostname, username=None):
def get_key_static(keytype, key_path):
- with file(key_path, 'w') as f:
+ with open(key_path, 'w') as f:
f.write("[%s]\n" % (gatherkeys.keytype_identity(keytype)))
f.write("key=fred\n")
def get_key_dynamic(keytype, key_path):
- with open(key_path, 'w', 0600) as f:
+ with open(key_path, 'w', 0o600) as f:
f.write("[%s]\n" % (gatherkeys.keytype_identity(keytype)))
f.write("key='%s'" % (new.generate_auth_key()))
def mock_remoto_process_check_success(conn, args):
secret = new.generate_auth_key()
out = '[mon.]\nkey = %s\ncaps mon = allow *\n' % secret
- return out.split('\n'), "", 0
+ return out.encode('utf-8').split(b'\n'), [], 0
def mock_remoto_process_check_rc_error(conn, args):
- return [""], ["this failed\n"], 1
+ return [b""], [b"this failed\n"], 1
class TestGatherKeysMissing(object):
def mock_hosts_get_file_key_content(host, **kwargs):
output = mock_distro()
mon_keyring = '[mon.]\nkey = %s\ncaps mon = allow *\n' % new.generate_auth_key()
- output.conn.remote_module.get_file_result = mon_keyring
+ output.conn.remote_module.get_file_result = mon_keyring.encode('utf-8')
output.conn.remote_module.longhostname = host
return output
def mock_remoto_process_check_success(conn, args):
out = json.dumps(remoto_process_check_success_output,sort_keys=True, indent=4)
- return out.split('\n'), "", 0
+ return out.encode('utf-8').split(b'\n'), [], 0
def mock_remoto_process_check_rc_error(conn, args):
- return [""], ["this failed\n"], 1
+ return [b""], [b"this failed\n"], 1
def mock_remoto_process_check_out_not_json(conn, args):
- return ["}bad output{"], [""], 0
+ return [b"}bad output{"], [b""], 0
def mock_remoto_process_check_out_missing_quorum(conn, args):
outdata = copy.deepcopy(remoto_process_check_success_output)
del outdata["quorum"]
out = json.dumps(outdata,sort_keys=True, indent=4)
- return out.split('\n'), "", 0
+ return out.encode('utf-8').split(b'\n'), [], 0
def mock_remoto_process_check_out_missing_quorum_1(conn, args):
outdata = copy.deepcopy(remoto_process_check_success_output)
del outdata["quorum"][1]
out = json.dumps(outdata,sort_keys=True, indent=4)
- return out.split('\n'), "", 0
+ return out.encode('utf-8').split(b'\n'), [], 0
def mock_remoto_process_check_out_missing_monmap(conn, args):
outdata = copy.deepcopy(remoto_process_check_success_output)
del outdata["monmap"]
out = json.dumps(outdata,sort_keys=True, indent=4)
- return out.split('\n'), "", 0
+ return out.encode('utf-8').split(b'\n'), [], 0
def mock_remoto_process_check_out_missing_mons(conn, args):
outdata = copy.deepcopy(remoto_process_check_success_output)
del outdata["monmap"]["mons"]
out = json.dumps(outdata,sort_keys=True, indent=4)
- return out.split('\n'), "", 0
+ return out.encode('utf-8').split(b'\n'), [], 0
def mock_remoto_process_check_out_missing_monmap_host1(conn, args):
outdata = copy.deepcopy(remoto_process_check_success_output)
del outdata["monmap"]["mons"][1]
out = json.dumps(outdata,sort_keys=True, indent=4)
- return out.split('\n'), "", 0
+ return out.encode('utf-8').split(b'\n'), [], 0
class TestGatherKeysWithMon(object):
def write_key_mon_with_caps(path, secret):
mon_keyring = '[mon.]\nkey = %s\ncaps mon = allow *\n' % secret
- with open(path, 'w', 0600) as f:
+ with open(path, 'w', 0o600) as f:
f.write(mon_keyring)
def write_key_mon_with_caps_with_tab(path, secret):
mon_keyring = '[mon.]\n\tkey = %s\n\tcaps mon = allow *\n' % secret
- with open(path, 'w', 0600) as f:
+ with open(path, 'w', 0o600) as f:
f.write(mon_keyring)
def write_key_mon_with_caps_with_tab_quote(path, secret):
mon_keyring = '[mon.]\n\tkey = %s\n\tcaps mon = "allow *"\n' % secret
- with open(path, 'w', 0600) as f:
+ with open(path, 'w', 0o600) as f:
f.write(mon_keyring)
def write_key_mon_without_caps(path, secret):
mon_keyring = '[mon.]\nkey = %s\n' % secret
- with open(path, 'w', 0600) as f:
+ with open(path, 'w', 0o600) as f:
f.write(mon_keyring)