]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: apply olh log functionality
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 9 Sep 2014 23:39:10 +0000 (16:39 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 16 Jan 2015 22:33:51 +0000 (14:33 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index b4907cbc7ccfc0f33fef71457fcf63250a1a918e..ed37b0dca1c22f09d632a7822827afb5bb295dd8 100644 (file)
@@ -5313,6 +5313,115 @@ int RGWRados::bucket_index_read_olh_log(rgw_obj& obj_instance, uint64_t ver_mark
   return 0;
 }
 
+static void op_setxattr(librados::ObjectWriteOperation& op, const char *name, const string& val)
+{
+  bufferlist bl;
+  bl.append(val);
+  op.setxattr(name, bl);
+}
+
+int RGWRados::apply_olh_log(void *ctx, const string& bucket_owner, rgw_obj& obj,
+                            const string& obj_tag, map<uint64_t, rgw_bucket_olh_log_entry>& log)
+{
+  if (log.empty()) {
+    return 0;
+  }
+
+  librados::ObjectWriteOperation op;
+
+  uint64_t last_ver = log.rbegin()->first;
+
+  map<uint64_t, rgw_bucket_olh_log_entry>::iterator iter = log.begin();
+
+  bufferlist bl;
+  bl.append(obj_tag);
+  op.cmpxattr(RGW_ATTR_ID_TAG, CEPH_OSD_CMPXATTR_OP_EQ, bl);
+  op.cmpxattr(RGW_ATTR_OLH_VER, CEPH_OSD_CMPXATTR_OP_GT, last_ver);
+
+  bool need_to_link = false;
+  cls_rgw_obj_key key;
+  bool delete_marker = false;
+  list<cls_rgw_obj_key> remove_instances;
+
+  for (iter = log.begin(); iter != log.end(); ++iter) {
+    rgw_bucket_olh_log_entry& entry = iter->second;
+    switch (entry.op) {
+    case CLS_RGW_OLH_OP_REMOVE_INSTANCE:
+      remove_instances.push_back(entry.key);
+      break;
+    case CLS_RGW_OLH_OP_LINK_OLH:
+      need_to_link = true;
+      key = entry.key;
+      delete_marker = entry.delete_marker;
+      break;
+    default:
+      ldout(cct, 0) << "ERROR: apply_olh_log: invalid op: " << (int)entry.op << dendl;
+      return -EIO;
+    }
+    string attr_name = RGW_ATTR_OLH_PENDING_PREFIX;
+    attr_name.append(entry.op_tag);
+    op.rmxattr(attr_name.c_str());
+  }
+
+  rgw_rados_ref ref;
+  rgw_bucket bucket;
+  int r = get_obj_ref(obj, &ref, &bucket);
+  if (r < 0) {
+    return r;
+  }
+
+  if (need_to_link) {
+    rgw_obj target(bucket, key.name);
+    target.set_instance(key.instance);
+    RGWOLHInfo info;
+    info.target = target;
+    info.removed = delete_marker;
+    bufferlist bl;
+    ::encode(info, bl);
+    op.setxattr(RGW_ATTR_OLH_INFO, bl);
+  }
+
+  /* first remove object instances */
+  for (list<cls_rgw_obj_key>::iterator liter = remove_instances.begin();
+       liter != remove_instances.end(); ++liter) {
+    cls_rgw_obj_key& key = *liter;
+    rgw_obj obj_instance(bucket, key.name);
+    obj_instance.set_instance(key.instance);
+    int ret = delete_obj(ctx, bucket_owner, obj_instance);
+    if (ret < 0 && ret != -ENOENT) {
+      ldout(cct, 0) << "ERROR: delete_obj() returned " << ret << " obj_instance=" << obj_instance << dendl;
+      return ret;
+    }
+  }
+
+  /* update olh object */
+  r = ref.ioctx.operate(ref.oid, &op);
+  return r;
+}
+
+/*
+ * read olh log and apply it
+ */
+int RGWRados::update_olh(void *ctx, const string& bucket_owner, rgw_obj& obj, const string& obj_tag)
+{
+  map<uint64_t, rgw_bucket_olh_log_entry> log;
+  bool is_truncated;
+  uint64_t ver_marker = 0;
+
+  do {
+    int ret = bucket_index_read_olh_log(obj, ver_marker, &log, &is_truncated);
+    if (ret < 0) {
+      return ret;
+    }
+    ret = apply_olh_log(ctx, bucket_owner, obj, obj_tag, log, &ver_marker);
+    if (ret < 0) {
+      return ret;
+    }
+  } while (is_truncated);
+
+  return 0;
+}
+
 static void filter_attrset(map<string, bufferlist>& unfiltered_attrset, const string& check_prefix,
                            map<string, bufferlist> *attrset)
 {
index 210c9801c160a848eb57342b28fac2ac89b16828..1af9240d29202f4c8e1113b669ccaa4f80b5989b 100644 (file)
@@ -1753,6 +1753,8 @@ public:
   int bucket_index_link_olh(rgw_obj& obj_instance, bool delete_marker, const string& op_tag);
   int bucket_index_read_olh_log(rgw_obj& obj_instance, uint64_t ver_marker,
                                 map<uint64_t, rgw_bucket_olh_log_entry> *log, bool *is_truncated);
+  int apply_olh_log(void *ctx, const string& bucket_owner, rgw_obj& obj,
+                    const string& obj_tag, map<uint64_t, rgw_bucket_olh_log_entry>& log);
 
   int follow_olh(map<string, bufferlist>& attrset, rgw_obj& target);
   int get_olh(rgw_obj& obj, RGWOLHInfo *olh);