import random
import requests
import urllib
+import string
from urlparse import urlparse
from boto.connection import AWSAuthConnection
special_first_param='work_bound',
)
-def del_worker_bound(connection, type_, daemon_id, id_):
+def del_worker_bound(connection, type_, daemon_id, id_, index_by_instance = True):
key = _id_name(type_)
+ p ={
+ 'type': type_,
+ key: id_,
+ 'daemon_id': daemon_id,
+ }
+ if not index_by_instance:
+ p['index-by-instance'] = 'false'
+
return request(
connection, 'delete', 'admin/replica_log',
- params={
- 'type': type_,
- key: id_,
- 'daemon_id': daemon_id,
- },
+ params=p,
special_first_param='work_bound',
expect_json=False,
)
def get_worker_bound(connection, type_, id_):
key = _id_name(type_)
- out = request(
- connection, 'get', 'admin/replica_log',
- params={
- 'type': type_,
- key: id_,
- },
- special_first_param='bounds',
- )
- boto.log.debug('get_worker_bound returned: %r', out)
+ try:
+ out = request(
+ connection, 'get', 'admin/replica_log',
+ params={
+ 'type': type_,
+ key: id_,
+ },
+ special_first_param='bounds',
+ )
+ boto.log.debug('get_worker_bound returned: %r', out)
+ except NotFound:
+ if type_ == 'bucket-index':
+ boto.log.debug('get_worker_bound returned NotFound, retrying without bucket instance indexing (old format)')
+ out = request(
+ connection, 'get', 'admin/replica_log',
+ params={
+ 'type': type_,
+ key: id_,
+ 'index-by-instance': 'false',
+ },
+ special_first_param='bounds',
+ )
+ boto.log.debug('get_worker_bound returned: %r', out)
+ entities = set()
+ for item in out['markers']:
+ boto.log.debug('converting item: %r', item)
+ set_worker_bound(connection, type_, item['position_marker'], item['position_time'],
+ item['entity'], id_, json.dumps(item['items_in_progress']))
+
+ entity_names = [item['entity'] for item in out['markers']]
+ entities = entities.union(entity_names)
+
+ boto.log.debug('entities: %r', entities)
+
+ for e in entities:
+ boto.log.debug('removing entity: %r', e)
+ del_worker_bound(connection, type_, e, id_, index_by_instance = False)
+
+ raise
+
+ else:
+ raise
retries = set()
for item in out['markers']:
names = [retry['name'] for retry in item['items_in_progress']]