import time
import errno
try:
- from typing import Dict, List, Tuple, Optional, Union
+ from typing import Dict, List, Tuple, Optional, Union, Any
except ImportError:
pass
import uuid
else:
from ConfigParser import SafeConfigParser
-container_path = None
+container_path = ''
class Error(Exception):
pass
##################################
# Popen wrappers, lifted from ceph-volume
-def call(command,
- desc=None,
- verbose=False,
- verbose_on_failure=True,
- timeout=DEFAULT_TIMEOUT,
+def call(command, # type: List[str]
+ desc=None, # type: Optional[str]
+ verbose=False, # type: bool
+ verbose_on_failure=True, # type: bool
+ timeout=DEFAULT_TIMEOUT, # type: Optional[int]
**kwargs):
"""
Wrap subprocess.Popen to
)
for fd in reads:
try:
- message = os.read(fd, 1024)
- if not isinstance(message, str):
- message = message.decode('utf-8')
+ message_b = os.read(fd, 1024)
+ if isinstance(message_b, bytes):
+ message = message_b.decode('utf-8')
+ if isinstance(message_b, str):
+ message = message_b
if fd == process.stdout.fileno():
out += message
message = out_buffer + message
def call_throws(command, **kwargs):
+ # type: (List[str], Any) -> Tuple[str, str, int]
out, err, ret = call(command, **kwargs)
if ret:
raise RuntimeError('Failed command: %s' % ' '.join(command))