try:
# have mypy ignore this line. We use the attribute error to detect if we
# have fromisoformat or not
- parse_timestamp = datetime.fromisoformat # type: ignore
+ ts_parser = datetime.fromisoformat # type: ignore
log.debug('found datetime.fromisoformat')
except AttributeError:
log.info(('Couldn\'t find datetime.fromisoformat, falling back to '
f'static timestamp parsing ({SNAP_DB_TS_FORMAT}'))
- def parse_timestamp(ts):
+ def ts_parser(ts):
try:
date = datetime.strptime(ts, SNAP_DB_TS_FORMAT)
return date
raise ValueError(msg)
+def parse_timestamp(ts):
+ date = ts_parser(ts)
+ # normalize any non utc timezone to utc. If no tzinfo is supplied, assume
+ # its already utc
+ # import pdb; pdb.set_trace()
+ if date.tzinfo is not timezone.utc and date.tzinfo is not None:
+ date = date.astimezone(timezone.utc)
+ return date
+
+
def parse_retention(retention):
ret = {}
log.debug(f'parse_retention({retention})')
now.day,
tzinfo=now.tzinfo)
else:
- self.start = parse_timestamp(start).astimezone(timezone.utc)
+ self.start = parse_timestamp(start)
if created is None:
self.created = datetime.now(timezone.utc)
else:
pass
db.execute(self.INSERT_SCHEDULE_META,
(sched_id,
- self.start.isoformat(),
- self.created.isoformat(),
+ self.start.strftime(SNAP_DB_TS_FORMAT),
+ self.created.strftime(SNAP_DB_TS_FORMAT),
self.repeat,
self.schedule,
1))
return self.report_json()
def report_json(self):
- return json.dumps(dict(self.__dict__), default=lambda o: o.isoformat())
+ return json.dumps(dict(self.__dict__),
+ default=lambda o: o.strftime(SNAP_DB_TS_FORMAT))
@property
def repeat(self):
def update_last(self, time, db):
with db:
- db.execute(self.UPDATE_LAST, (time.isoformat(),
- time.isoformat(),
+ db.execute(self.UPDATE_LAST, (time.strftime(SNAP_DB_TS_FORMAT),
+ time.strftime(SNAP_DB_TS_FORMAT),
self.path,
- self.start.isoformat(),
+ self.start.strftime(SNAP_DB_TS_FORMAT),
self.repeat))
self.created_count += 1
self.last = time
with db:
log.debug(f'Deactivating schedule ({self.repeat}, {self.start}) on path {self.path}')
db.execute(self.UPDATE_INACTIVE, (self.path,
- self.start.isoformat(),
+ self.start.strftime(SNAP_DB_TS_FORMAT),
self.repeat))
self.active = False
with db:
log.debug(f'Activating schedule ({self.repeat}, {self.start}) on path {self.path}')
db.execute(self.UPDATE_ACTIVE, (self.path,
- self.start.isoformat(),
+ self.start.strftime(SNAP_DB_TS_FORMAT),
self.repeat))
self.active = True
def update_pruned(self, time, db, pruned):
with db:
- db.execute(self.UPDATE_PRUNED, (time.isoformat(), pruned,
+ db.execute(self.UPDATE_PRUNED, (time.strftime(SNAP_DB_TS_FORMAT), pruned,
self.path,
- self.start.isoformat(),
+ self.start.strftime(SNAP_DB_TS_FORMAT),
self.repeat))
self.pruned_count += pruned
self.last_pruned = time
for var in vars(new):
if var in update_expected:
- assert getattr(new, var) == update_expected.get(var), f'new did not update value for {var}'
+ expected_val = update_expected.get(var)
+ new_val = getattr(new, var)
+ if isinstance(expected_val, datetime.datetime):
+ assert new_val.year == expected_val.year
+ assert new_val.month == expected_val.month
+ assert new_val.day == expected_val.day
+ assert new_val.hour == expected_val.hour
+ assert new_val.minute == expected_val.minute
+ assert new_val.second == expected_val.second
+ else:
+ assert new_val == expected_val, f'new did not update value for {var}'
else:
- assert getattr(new, var) == getattr(old, var), f'new changed unexpectedly in value for {var}'
+ expected_val = getattr(old, var)
+ new_val = getattr(new, var)
+ if isinstance(expected_val, datetime.datetime):
+ assert new_val.year == expected_val.year
+ assert new_val.month == expected_val.month
+ assert new_val.day == expected_val.day
+ assert new_val.hour == expected_val.hour
+ assert new_val.minute == expected_val.minute
+ assert new_val.second == expected_val.second
+ else:
+ assert new_val == expected_val, f'new changed unexpectedly in value for {var}'
class TestSchedule(object):