import re
from radosgw_agent import worker, client
-from radosgw_agent.exceptions import HttpError, NotFound, BucketEmpty
+from radosgw_agent.exceptions import HttpError, NotFound, BucketEmpty, SyncTimedOut
class TestSyncObject(object):
exc_message = exc.exconly()
assert 'state not found' in exc_message
+ def test_wait_for_object_state_is_empty_sync_timesout(self):
+ self.client.get_op_state = lambda *a: []
+ with patch('radosgw_agent.worker.client', self.client):
+ w = worker.DataWorker(None, None, None, self.src, None, daemon_id=1)
+ with py.test.raises(SyncTimedOut) as exc:
+ w.wait_for_object(None, None, time.time() + 1, None)
+
def test_wait_for_object_timeout(self):
msg = 'should not have called get_op_state'
self.client.get_op_state = Mock(side_effect=AssertionError(msg))