]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: handle object removal in incremental data sync
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 8 Oct 2015 22:01:56 +0000 (15:01 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:12:58 +0000 (16:12 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_cr_rados.cc
src/rgw/rgw_cr_rados.h
src/rgw/rgw_data_sync.cc

index e0695b5fe28344022f0a48fc03d7af2b1d31173d..a0107950f5cb3162061c2205f743e506ed94ff87 100644 (file)
@@ -442,3 +442,58 @@ int RGWAsyncFetchRemoteObj::_send_request()
   return r;
 }
 
+
+int RGWAsyncRemoveObj::_send_request()
+{
+  RGWObjectCtx obj_ctx(store);
+
+  rgw_obj obj(bucket_info.bucket, key.name);
+  obj.set_instance(key.instance);
+
+  ldout(store->ctx(), 0) << __func__ << "(): deleting obj=" << obj << dendl;
+
+  obj_ctx.set_atomic(obj);
+
+  RGWObjState *state;
+
+  int ret = store->get_obj_state(&obj_ctx, obj, &state);
+  if (ret < 0) {
+    ldout(store->ctx(), 20) << __func__ << "(): get_obj_state() obj=" << obj << " returned ret=" << ret << dendl;
+    return ret;
+  }
+
+  /* has there been any racing object write? */
+  if (del_if_older && (state->mtime > timestamp)) {
+    ldout(store->ctx(), 20) << __func__ << "(): skipping object removal obj=" << obj << " (obj mtime=" << state->mtime << ", request timestamp=" << timestamp << ")" << dendl;
+    return 0;
+  }
+
+  RGWAccessControlPolicy policy;
+
+  /* decode policy */
+  map<string, bufferlist>::iterator iter = state->attrset.find(RGW_ATTR_ACL);
+  if (iter != state->attrset.end()) {
+    bufferlist::iterator bliter = iter->second.begin();
+    try {
+      policy.decode(bliter);
+    } catch (buffer::error& err) {
+      ldout(store->ctx(), 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
+      return -EIO;
+    }
+  }
+
+  RGWRados::Object del_target(store, bucket_info, obj_ctx, obj);
+  RGWRados::Object::Delete del_op(&del_target);
+
+  del_op.params.bucket_owner = bucket_info.owner;
+  del_op.params.obj_owner = policy.get_owner();
+  if (del_if_older) {
+    del_op.params.unmod_since = timestamp;
+  }
+
+  ret = del_op.delete_obj();
+  if (ret < 0) {
+    ldout(store->ctx(), 20) << __func__ << "(): delete_obj() obj=" << obj << " returned ret=" << ret << dendl;
+  }
+  return ret;
+}
index f73263c4d42894df4e243bf38ee0dfbce22154d3..c61ef1b928e1a7c83db971df1c08a09466cfd196 100644 (file)
@@ -652,4 +652,84 @@ public:
   }
 };
 
+class RGWAsyncRemoveObj : public RGWAsyncRadosRequest {
+  RGWRados *store;
+  string source_zone;
+
+  RGWBucketInfo bucket_info;
+
+  rgw_obj_key key;
+  uint64_t versioned_epoch;
+
+  bool del_if_older;
+  utime_t timestamp;
+
+protected:
+  int _send_request();
+public:
+  RGWAsyncRemoveObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
+                         const string& _source_zone,
+                         RGWBucketInfo& _bucket_info,
+                         const rgw_obj_key& _key,
+                         uint64_t _versioned_epoch,
+                         bool _if_older,
+                         utime_t& _timestamp) : RGWAsyncRadosRequest(cn), store(_store),
+                                                      source_zone(_source_zone),
+                                                      bucket_info(_bucket_info),
+                                                      key(_key),
+                                                      versioned_epoch(_versioned_epoch),
+                                                      del_if_older(_if_older),
+                                                      timestamp(_timestamp) {}
+};
+
+class RGWRemoveObjCR : public RGWSimpleCoroutine {
+  CephContext *cct;
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  string source_zone;
+
+  RGWBucketInfo bucket_info;
+
+  rgw_obj_key key;
+  uint64_t versioned_epoch;
+
+  bool del_if_older;
+  utime_t timestamp;
+
+  RGWAsyncRemoveObj *req;
+
+public:
+  RGWRemoveObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                      const string& _source_zone,
+                      RGWBucketInfo& _bucket_info,
+                      const rgw_obj_key& _key,
+                      uint64_t _versioned_epoch,
+                      utime_t *_timestamp) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
+                                       async_rados(_async_rados), store(_store),
+                                       source_zone(_source_zone),
+                                       bucket_info(_bucket_info),
+                                       key(_key),
+                                       versioned_epoch(_versioned_epoch) {
+    del_if_older = (_timestamp != NULL);
+    if (_timestamp) {
+      timestamp = *_timestamp;
+    }
+  }
+
+  ~RGWRemoveObjCR() {
+    delete req;
+  }
+
+  int send_request() {
+    req = new RGWAsyncRemoveObj(stack->create_completion_notifier(), store, source_zone, bucket_info,
+                                key, versioned_epoch, del_if_older, timestamp);
+    async_rados->queue(req);
+    return 0;
+  }
+
+  int request_complete() {
+    return req->get_ret_status();
+  }
+};
+
 #endif
index 35117d905e03fca8c0add2ac7ba8bcecf72920a8..983a7c9c7bd60f47502ad9b26898552c2c42f017 100644 (file)
@@ -1735,6 +1735,8 @@ class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
 
   rgw_obj_key key;
   uint64_t versioned_epoch;
+  utime_t timestamp;
+  RGWModifyOp op;
 
   T entry_marker;
   RGWSyncShardMarkerTrack<T> *marker_tracker;
@@ -1746,11 +1748,14 @@ public:
   RGWBucketSyncSingleEntryCR(RGWRados *_store, RGWAsyncRadosProcessor *_async_rados,
                              const string& _source_zone, RGWBucketInfo *_bucket_info, int _shard_id,
                              const rgw_obj_key& _key, uint64_t _versioned_epoch,
+                             utime_t& _timestamp,
+                             RGWModifyOp _op,
                             const T& _entry_marker, RGWSyncShardMarkerTrack<T> *_marker_tracker) : RGWCoroutine(_store->ctx()), store(_store),
                                                      async_rados(_async_rados),
                                                      source_zone(_source_zone),
                                                       bucket_info(_bucket_info), shard_id(_shard_id),
-                                                      key(_key),
+                                                      key(_key), versioned_epoch(_versioned_epoch),
+                                                      timestamp(_timestamp), op(_op),
                                                       entry_marker(_entry_marker),
                                                       marker_tracker(_marker_tracker),
                                                       sync_status(0) {
@@ -1760,12 +1765,21 @@ public:
   int operate() {
     reenter(this) {
       yield {
-        int r = call(new RGWFetchRemoteObjCR(async_rados, store, source_zone, *bucket_info,
-                                             key, versioned_epoch,
-                                             true));
-        if (r < 0) {
-          ldout(store->ctx(), 0) << "ERROR: failed to call RGWFetchRemoteObjCR()" << dendl;
-          return r;
+        int r;
+        if (op == CLS_RGW_OP_ADD) {
+          r = call(new RGWFetchRemoteObjCR(async_rados, store, source_zone, *bucket_info,
+                                           key, versioned_epoch,
+                                           true));
+          if (r < 0) {
+            ldout(store->ctx(), 0) << "ERROR: failed to call RGWFetchRemoteObjCR()" << dendl;
+            return r;
+          }
+        } else if (op == CLS_RGW_OP_DEL) {
+          r = call(new RGWRemoveObjCR(async_rados, store, source_zone, *bucket_info, key, versioned_epoch, &timestamp));
+          if (r < 0) {
+            ldout(store->ctx(), 0) << "ERROR: failed to call RGWRemoveObjCR()" << dendl;
+            return r;
+          }
         }
       }
       if (retcode < 0 && retcode != -ENOENT) {
@@ -1862,7 +1876,7 @@ int RGWBucketShardFullSyncCR::operate()
           marker_tracker->start(entry.key);
           list_marker = entry.key;
           spawn(new RGWBucketSyncSingleEntryCR<rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
-                                               entry.key, entry.versioned_epoch, entry.key, marker_tracker), false);
+                                               entry.key, entry.versioned_epoch, entry.mtime, CLS_RGW_OP_ADD, entry.key, marker_tracker), false);
         }
         while ((int)num_spawned() > spawn_window) {
           yield wait_for_child();
@@ -1974,7 +1988,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
             versioned_epoch = entry.ver.epoch;
           }
           spawn(new RGWBucketSyncSingleEntryCR<string>(store, async_rados, source_zone, bucket_info, shard_id,
-                                               key, versioned_epoch, entry.id, marker_tracker), false);
+                                               key, versioned_epoch, entry.timestamp, entry.op, entry.id, marker_tracker), false);
         }
         while ((int)num_spawned() > spawn_window) {
           yield wait_for_child();