--- /dev/null
+[mypy]
+ignore_missing_imports = True
\ No newline at end of file
try:
from subprocess import DEVNULL # py3k
except ImportError:
- DEVNULL = open(os.devnull, 'r+')
+ DEVNULL = open(os.devnull, 'r+') # type: ignore
DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
)
+def log_exc(func):
+ @wraps(func)
+ def wrapper(self):
+ try:
+ return func(self)
+ except:
+ self.log(traceback.format_exc())
+ raise
+ return wrapper
+
+
class PoolType:
REPLICATED = 1
ERASURE_CODED = 3
# Allow successful completion so gevent doesn't see an exception.
# The DaemonWatchdog will observe the error and tear down the test.
- def log_exc(func):
- @wraps(func)
- def wrapper(self):
- try:
- return func(self)
- except:
- self.log(traceback.format_exc())
- raise
- return wrapper
-
@log_exc
def do_sighup(self):
"""
ret[status] += 1
return ret
- @wait_for_pg_stats
+ @wait_for_pg_stats # type: ignore
def with_pg_state(self, pool, pgnum, check):
pgstr = self.get_pgid(pool, pgnum)
stats = self.get_single_pg_stats(pgstr)
assert(check(stats['state']))
- @wait_for_pg_stats
+ @wait_for_pg_stats # type: ignore
def with_pg(self, pool, pgnum, check):
pgstr = self.get_pgid(pool, pgnum)
stats = self.get_single_pg_stats(pgstr)
# requires REQUIRE_FILESYSTEM = True
REQUIRE_RECOVERY_FILESYSTEM = False
- LOAD_SETTINGS = []
+ LOAD_SETTINGS = [] # type: ignore
def setUp(self):
super(CephFSTestCase, self).setUp()
-
-
import json
import logging
import os
from textwrap import dedent
import time
+try:
+ from typing import Optional
+except:
+ # make it work for python2
+ pass
from teuthology.orchestra.run import CommandFailedError
from tasks.cephfs.fuse_mount import FuseMount
from tasks.cephfs.cephfs_test_case import CephFSTestCase
data_only = False
# Subclasses define how many bytes should be written to achieve fullness
- pool_capacity = None
+ pool_capacity = None # type: Optional[int]
fill_mb = None
# Subclasses define what fullness means to them
"""
Test per-pool fullness, which indicates quota limits exceeded
"""
- pool_capacity = 1024 * 1024 * 32 # arbitrary low-ish limit
- fill_mb = pool_capacity / (1024 * 1024)
+ pool_capacity = 1024 * 1024 * 32 # arbitrary low-ish limit
+ fill_mb = pool_capacity / (1024 * 1024) # type: ignore
# We are only testing quota handling on the data pool, not the metadata
# pool.
[tox]
-envlist = flake8-py2, flake8-py3
+envlist = flake8-py2, flake8-py3, mypy
skipsdist = True
[testenv:flake8-py2]
flake8
commands=flake8 --select=F,E9 --exclude=venv,.tox
+[testenv:mypy]
+basepython = python3
+deps = mypy
+commands = mypy {posargs:.}
import errno
try:
- from itertools import izip_longest as zip_longest
+ from itertools import izip_longest as zip_longest # type: ignore
except ImportError:
from itertools import zip_longest
from itertools import combinations
from nose.tools import eq_ as eq
try:
- from itertools import izip_longest as zip_longest
+ from itertools import izip_longest as zip_longest # type: ignore
except ImportError:
from itertools import zip_longest
from nose.tools import eq_ as eq
try:
- from itertools import izip_longest as zip_longest
+ from itertools import izip_longest as zip_longest # type: ignore
except ImportError:
from itertools import zip_longest
from boto.s3.deletemarker import DeleteMarker
try:
- from itertools import izip_longest as zip_longest
+ from itertools import izip_longest as zip_longest # type: ignore
except ImportError:
from itertools import zip_longest