import sys
import time
+from typing import Union, Literal, Optional
+
import teuthology.lock.query
import teuthology.lock.util
from teuthology.config import config
libvirt = None
log = logging.getLogger(__name__)
+PowerOnOffState = Union[Literal["on"], Literal["off"]]
class RemoteConsole():
return
raise ConsoleError("Did not get a login prompt from %s!" % self.name)
- def check_power(self, state, timeout=None):
+ def check_power(self, state: Literal["on","off"]):
+ c = self._pexpect_spawn_ipmi('power status')
+ r = c.expect(['Chassis Power is {s}'.format(
+ s=state), pexpect.EOF, pexpect.TIMEOUT], timeout=1)
+ self.log.debug('check power output: %s', c.logfile_read.getvalue().strip())
+ return r == 0
+
+ def set_power(self, state: PowerOnOffState, timeout: Optional[int]):
+ self.log.info(f"Power {state}")
+ timeout = timeout or self.timeout
+ sleep_time = 4
+ reissue_after_failures = 5
+ failures = 0
+ issued = False
+ succeeded = False
+ with safe_while(
+ sleep=sleep_time,
+ tries=int(timeout / sleep_time),
+ _raise=False,
+ action='wait for power on') as proceed:
+ while proceed():
+ if not issued:
+ child = self._pexpect_spawn_ipmi(f"power {state}")
+ rc = child.expect(
+ [
+ "Up/On" if state.lower() == "on" else "Down/Off",
+ pexpect.EOF
+ ],
+ timeout=self.timeout
+ )
+ self.log.debug(
+ f"power {state} output: {child.logfile_read.getvalue().strip()}"
+ )
+ if rc == 0:
+ issued = True
+ continue
+
+ if not succeeded:
+ child = self._pexpect_spawn_ipmi('power status')
+ rc = child.expect(
+ [
+ f"Chassis Power is {state}",
+ pexpect.EOF,
+ pexpect.TIMEOUT
+ ],
+ timeout=1
+ )
+ self.log.debug(
+ f"check power output: {child.logfile_read.getvalue().strip()}"
+ )
+ if rc == 0:
+ succeeded = True
+ break
+ failures += 1
+ if failures == reissue_after_failures:
+ issued = False
+
+ if issued and succeeded:
+ self.log.info(f"Power {state} completed")
+ return True
+ raise RuntimeError(
+ f"Failed to power {state} {self.shortname} in {self.timeout}s"
+ )
+ return False
+
+ def check_power_retries(self, state, timeout=None):
"""
Check power. Retry if EOF encountered on power check read.
"""
"""
Physical power on. Loop checking cmd return.
"""
- self.log.info('Power on')
- start = time.time()
- while time.time() - start < self.timeout:
- child = self._pexpect_spawn_ipmi('power on')
- r = child.expect(['Chassis Power Control: Up/On', pexpect.EOF],
- timeout=self.timeout)
- self.log.debug('power on output: %s', child.logfile_read.getvalue().strip())
- if r == 0:
- break
- if self.check_power('on'):
- self.log.info('Power on completed')
- else:
- err_msg = 'Failed to power on {s}'.format(s=self.shortname)
- raise RuntimeError(err_msg)
+ return self.set_power("on", timeout=None)
def power_off(self):
"""
Physical power off. Loop checking cmd return.
"""
- self.log.info('Power off')
- start = time.time()
- while time.time() - start < self.timeout:
- child = self._pexpect_spawn_ipmi('power off')
- r = child.expect(['Chassis Power Control: Down/Off', pexpect.EOF],
- timeout=self.timeout)
- self.log.debug('power off output: %s', child.logfile_read.getvalue().strip())
- if r == 0:
- break
- if self.check_power('off', 60):
- self.log.info('Power off completed')
- else:
- self.log.error('Failed to power off')
+ try:
+ return self.set_power("off", timeout=None)
+ except Exception:
+ pass
def power_off_for_interval(self, interval=30):
"""