int RGWReplicaLogger::update_bound(const string& oid, const string& pool,
const string& daemon_id,
const string& marker, const utime_t& time,
- const list<RGWReplicaItemMarker> *entries)
+ const list<RGWReplicaItemMarker> *entries,
+ bool need_to_exist)
{
cls_replica_log_progress_marker progress;
progress.entity_id = daemon_id;
}
librados::ObjectWriteOperation opw;
+ if (need_to_exist) {
+ opw.assert_exists();
+ }
cls_replica_log_update_bound(opw, progress);
return ioctx.operate(oid, &opw);
}
-int RGWReplicaLogger::delete_bound(const string& oid, const string& pool,
- const string& daemon_id, bool purge_all)
+int RGWReplicaLogger::write_bounds(const string& oid, const string& pool,
+ RGWReplicaBounds& bounds)
{
librados::IoCtx ioctx;
int r = open_ioctx(ioctx, pool);
return r;
}
- if (purge_all) {
- return ioctx.remove(oid);
+ librados::ObjectWriteOperation opw;
+ list<RGWReplicaProgressMarker>::iterator iter = bounds.markers.begin();
+ for (; iter != bounds.markers.end(); ++iter) {
+ RGWReplicaProgressMarker& progress = *iter;
+ cls_replica_log_update_bound(opw, progress);
+ }
+
+ r = ioctx.operate(oid, &opw);
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+}
+
+int RGWReplicaLogger::delete_bound(const string& oid, const string& pool,
+ const string& daemon_id, bool purge_all,
+ bool need_to_exist)
+{
+ librados::IoCtx ioctx;
+ int r = open_ioctx(ioctx, pool);
+ if (r < 0) {
+ return r;
}
librados::ObjectWriteOperation opw;
- cls_replica_log_delete_bound(opw, daemon_id);
+ if (need_to_exist) {
+ opw.assert_exists();
+ }
+ if (purge_all) {
+ opw.remove();
+ } else {
+ cls_replica_log_delete_bound(opw, daemon_id);
+ }
return ioctx.operate(oid, &opw);
}
int RGWReplicaBucketLogger::update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id,
const string& marker, const utime_t& time,
- const list<RGWReplicaItemMarker> *entries)
+ const list<RGWReplicaItemMarker> *entries,
+ bool index_by_instance)
{
if (shard_id >= 0 ||
!BucketIndexShardsManager::is_shards_marker(marker)) {
return RGWReplicaLogger::update_bound(obj_name(bucket, shard_id, true), pool,
- daemon_id, marker, time, entries);
+ daemon_id, marker, time, entries,
+ false);
}
BucketIndexShardsManager sm;
ret = 0;
+ bool no_shards = (vals.size() == 1);
+
map<int, string>::iterator iter;
for (iter = vals.begin(); iter != vals.end(); ++iter) {
+ bool need_to_exist = index_by_instance && no_shards; /*
+ * don't need to exist if not indexing by instance,
+ * also, we only care about non-sharded
+ * buckets, as these are the ones that
+ * might not be indexed by instance id
+ */
ldout(cct, 20) << "updating bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl;
int r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first, true), pool,
- daemon_id, iter->second, time, entries);
+ daemon_id, iter->second, time, entries,
+ need_to_exist);
+
+ if (r == -ENOENT && need_to_exist) {
+ RGWReplicaBounds bounds;
+ r = convert_old_bounds(bucket, -1, bounds);
+ if (r < 0 && r != -ENOENT) {
+ return r;
+ }
+ r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first, true), pool,
+ daemon_id, marker, time, entries, false);
+ }
if (r < 0) {
ldout(cct, 0) << "failed to update bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl;
ret = r;
return ret;
}
+
+int RGWReplicaBucketLogger::delete_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id, bool index_by_instance, bool purge_all)
+{
+ bool need_to_exist = index_by_instance; /* don't need to exist if not indexing by instance */
+ int r = RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id, index_by_instance), pool, daemon_id, purge_all, need_to_exist);
+ if (r != -ENOENT) {
+ return r;
+ }
+ /*
+ * can only get here if need_to_exist == true,
+ * entry is not found, let's convert old entry if exists
+ */
+ RGWReplicaBounds bounds;
+ r = convert_old_bounds(bucket, shard_id, bounds);
+ if (r < 0 && r != -ENOENT) {
+ return r;
+ }
+ return RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id, index_by_instance), pool, daemon_id, purge_all, false);
+}
+
+int RGWReplicaBucketLogger::get_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds, bool index_by_instance) {
+ int r = RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id, index_by_instance), pool, bounds);
+ if (r != -ENOENT || !index_by_instance) {
+ return r;
+ }
+
+ r = convert_old_bounds(bucket, shard_id, bounds);
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+}
+
+int RGWReplicaBucketLogger::convert_old_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds) {
+ string old_key = obj_name(bucket, shard_id, false);
+ string new_key = obj_name(bucket, shard_id, true);
+
+ /* couldn't find when indexed by instance, retry with old key by bucket name only */
+ int r = RGWReplicaLogger::get_bounds(old_key, pool, bounds);
+ if (r < 0) {
+ return r;
+ }
+ /* convert to new keys */
+ r = RGWReplicaLogger::write_bounds(new_key, pool, bounds);
+ if (r < 0) {
+ return r;
+ }
+
+ string daemon_id;
+ r = RGWReplicaLogger::delete_bound(old_key, pool, daemon_id, true, false); /* purge all */
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
int update_bound(const string& oid, const string& pool,
const string& daemon_id, const string& marker,
const utime_t& time,
- const list<RGWReplicaItemMarker> *entries);
+ const list<RGWReplicaItemMarker> *entries,
+ bool need_to_exist);
+ int write_bounds(const string& oid, const string& pool,
+ RGWReplicaBounds& bounds);
int delete_bound(const string& oid, const string& pool,
- const string& daemon_id, bool purge_all);
+ const string& daemon_id, bool purge_all,
+ bool need_to_exist);
int get_bounds(const string& oid, const string& pool,
RGWReplicaBounds& bounds);
};
string oid;
get_shard_oid(shard, oid);
return RGWReplicaLogger::update_bound(oid, pool,
- daemon_id, marker, time, entries);
+ daemon_id, marker, time, entries, false);
}
int delete_bound(int shard, const string& daemon_id, bool purge_all) {
string oid;
get_shard_oid(shard, oid);
return RGWReplicaLogger::delete_bound(oid, pool,
- daemon_id, purge_all);
+ daemon_id, purge_all, false);
}
int get_bounds(int shard, RGWReplicaBounds& bounds) {
string oid;
RGWReplicaBucketLogger(RGWRados *_store);
int update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id,
const string& marker, const utime_t& time,
- const list<RGWReplicaItemMarker> *entries);
- int delete_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id, bool index_by_instance, bool purge_all) {
- return RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id, index_by_instance), pool,
- daemon_id, purge_all);
- }
- int get_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds, bool index_by_instance) {
- return RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id, index_by_instance), pool,
- bounds);
- }
+ const list<RGWReplicaItemMarker> *entries,
+ bool index_by_instance);
+ int delete_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id, bool index_by_instance, bool purge_all);
+ int get_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds, bool index_by_instance);
+ int convert_old_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds);
};
#endif /* RGW_REPLICA_LOG_H_ */