return r;
}
+
+int RGWAsyncRemoveObj::_send_request()
+{
+ RGWObjectCtx obj_ctx(store);
+
+ rgw_obj obj(bucket_info.bucket, key.name);
+ obj.set_instance(key.instance);
+
+ ldout(store->ctx(), 0) << __func__ << "(): deleting obj=" << obj << dendl;
+
+ obj_ctx.set_atomic(obj);
+
+ RGWObjState *state;
+
+ int ret = store->get_obj_state(&obj_ctx, obj, &state);
+ if (ret < 0) {
+ ldout(store->ctx(), 20) << __func__ << "(): get_obj_state() obj=" << obj << " returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ /* has there been any racing object write? */
+ if (del_if_older && (state->mtime > timestamp)) {
+ ldout(store->ctx(), 20) << __func__ << "(): skipping object removal obj=" << obj << " (obj mtime=" << state->mtime << ", request timestamp=" << timestamp << ")" << dendl;
+ return 0;
+ }
+
+ RGWAccessControlPolicy policy;
+
+ /* decode policy */
+ map<string, bufferlist>::iterator iter = state->attrset.find(RGW_ATTR_ACL);
+ if (iter != state->attrset.end()) {
+ bufferlist::iterator bliter = iter->second.begin();
+ try {
+ policy.decode(bliter);
+ } catch (buffer::error& err) {
+ ldout(store->ctx(), 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
+ return -EIO;
+ }
+ }
+
+ RGWRados::Object del_target(store, bucket_info, obj_ctx, obj);
+ RGWRados::Object::Delete del_op(&del_target);
+
+ del_op.params.bucket_owner = bucket_info.owner;
+ del_op.params.obj_owner = policy.get_owner();
+ if (del_if_older) {
+ del_op.params.unmod_since = timestamp;
+ }
+
+ ret = del_op.delete_obj();
+ if (ret < 0) {
+ ldout(store->ctx(), 20) << __func__ << "(): delete_obj() obj=" << obj << " returned ret=" << ret << dendl;
+ }
+ return ret;
+}
}
};
+class RGWAsyncRemoveObj : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ string source_zone;
+
+ RGWBucketInfo bucket_info;
+
+ rgw_obj_key key;
+ uint64_t versioned_epoch;
+
+ bool del_if_older;
+ utime_t timestamp;
+
+protected:
+ int _send_request();
+public:
+ RGWAsyncRemoveObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
+ const string& _source_zone,
+ RGWBucketInfo& _bucket_info,
+ const rgw_obj_key& _key,
+ uint64_t _versioned_epoch,
+ bool _if_older,
+ utime_t& _timestamp) : RGWAsyncRadosRequest(cn), store(_store),
+ source_zone(_source_zone),
+ bucket_info(_bucket_info),
+ key(_key),
+ versioned_epoch(_versioned_epoch),
+ del_if_older(_if_older),
+ timestamp(_timestamp) {}
+};
+
+class RGWRemoveObjCR : public RGWSimpleCoroutine {
+ CephContext *cct;
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ string source_zone;
+
+ RGWBucketInfo bucket_info;
+
+ rgw_obj_key key;
+ uint64_t versioned_epoch;
+
+ bool del_if_older;
+ utime_t timestamp;
+
+ RGWAsyncRemoveObj *req;
+
+public:
+ RGWRemoveObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ const string& _source_zone,
+ RGWBucketInfo& _bucket_info,
+ const rgw_obj_key& _key,
+ uint64_t _versioned_epoch,
+ utime_t *_timestamp) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
+ async_rados(_async_rados), store(_store),
+ source_zone(_source_zone),
+ bucket_info(_bucket_info),
+ key(_key),
+ versioned_epoch(_versioned_epoch) {
+ del_if_older = (_timestamp != NULL);
+ if (_timestamp) {
+ timestamp = *_timestamp;
+ }
+ }
+
+ ~RGWRemoveObjCR() {
+ delete req;
+ }
+
+ int send_request() {
+ req = new RGWAsyncRemoveObj(stack->create_completion_notifier(), store, source_zone, bucket_info,
+ key, versioned_epoch, del_if_older, timestamp);
+ async_rados->queue(req);
+ return 0;
+ }
+
+ int request_complete() {
+ return req->get_ret_status();
+ }
+};
+
#endif
rgw_obj_key key;
uint64_t versioned_epoch;
+ utime_t timestamp;
+ RGWModifyOp op;
T entry_marker;
RGWSyncShardMarkerTrack<T> *marker_tracker;
RGWBucketSyncSingleEntryCR(RGWRados *_store, RGWAsyncRadosProcessor *_async_rados,
const string& _source_zone, RGWBucketInfo *_bucket_info, int _shard_id,
const rgw_obj_key& _key, uint64_t _versioned_epoch,
+ utime_t& _timestamp,
+ RGWModifyOp _op,
const T& _entry_marker, RGWSyncShardMarkerTrack<T> *_marker_tracker) : RGWCoroutine(_store->ctx()), store(_store),
async_rados(_async_rados),
source_zone(_source_zone),
bucket_info(_bucket_info), shard_id(_shard_id),
- key(_key),
+ key(_key), versioned_epoch(_versioned_epoch),
+ timestamp(_timestamp), op(_op),
entry_marker(_entry_marker),
marker_tracker(_marker_tracker),
sync_status(0) {
int operate() {
reenter(this) {
yield {
- int r = call(new RGWFetchRemoteObjCR(async_rados, store, source_zone, *bucket_info,
- key, versioned_epoch,
- true));
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to call RGWFetchRemoteObjCR()" << dendl;
- return r;
+ int r;
+ if (op == CLS_RGW_OP_ADD) {
+ r = call(new RGWFetchRemoteObjCR(async_rados, store, source_zone, *bucket_info,
+ key, versioned_epoch,
+ true));
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to call RGWFetchRemoteObjCR()" << dendl;
+ return r;
+ }
+ } else if (op == CLS_RGW_OP_DEL) {
+ r = call(new RGWRemoveObjCR(async_rados, store, source_zone, *bucket_info, key, versioned_epoch, ×tamp));
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to call RGWRemoveObjCR()" << dendl;
+ return r;
+ }
}
}
if (retcode < 0 && retcode != -ENOENT) {
marker_tracker->start(entry.key);
list_marker = entry.key;
spawn(new RGWBucketSyncSingleEntryCR<rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
- entry.key, entry.versioned_epoch, entry.key, marker_tracker), false);
+ entry.key, entry.versioned_epoch, entry.mtime, CLS_RGW_OP_ADD, entry.key, marker_tracker), false);
}
while ((int)num_spawned() > spawn_window) {
yield wait_for_child();
versioned_epoch = entry.ver.epoch;
}
spawn(new RGWBucketSyncSingleEntryCR<string>(store, async_rados, source_zone, bucket_info, shard_id,
- key, versioned_epoch, entry.id, marker_tracker), false);
+ key, versioned_epoch, entry.timestamp, entry.op, entry.id, marker_tracker), false);
}
while ((int)num_spawned() > spawn_window) {
yield wait_for_child();