]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cls_rgw, rgw: pending_log can hold multiple entries per epoch
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 19 Nov 2014 01:29:30 +0000 (17:29 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 19 Jan 2015 23:57:54 +0000 (15:57 -0800)
This is needed, so that we can later replay operations, using the epoch
for idempotency.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/cls/rgw/cls_rgw.cc
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/cls/rgw/cls_rgw_ops.cc
src/cls/rgw/cls_rgw_ops.h
src/cls/rgw/cls_rgw_types.h
src/common/ceph_json.h
src/rgw/rgw_admin.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index a974dff1c4d2bf5f4e079543dd672db153e68118..9444e6b44fa1e705f89592b9ca48625ac0f4d3b7 100644 (file)
@@ -967,12 +967,14 @@ static int read_olh(cls_method_context_t hctx,cls_rgw_obj_key& obj_key, struct r
 static void update_olh_log(struct rgw_bucket_olh_entry& olh_data_entry, OLHLogOp op, const string& op_tag,
                            cls_rgw_obj_key& key, bool delete_marker)
 {
-  rgw_bucket_olh_log_entry& log_entry = olh_data_entry.pending_log[olh_data_entry.epoch];
+  vector<rgw_bucket_olh_log_entry>& log = olh_data_entry.pending_log[olh_data_entry.epoch];
+  rgw_bucket_olh_log_entry log_entry;
   log_entry.epoch = olh_data_entry.epoch;
   log_entry.op = op;
   log_entry.op_tag = op_tag;
   log_entry.key = key;
   log_entry.delete_marker = delete_marker;
+  log.push_back(log_entry);
 }
 
 static string escape_str(const string& s)
@@ -1558,13 +1560,13 @@ static int rgw_bucket_read_olh_log(cls_method_context_t hctx, bufferlist *in, bu
   rgw_cls_read_olh_log_ret op_ret;
 
 #define MAX_OLH_LOG_ENTRIES 1000
-  map<uint64_t, rgw_bucket_olh_log_entry>& log = olh_data_entry.pending_log;
+  map<uint64_t, vector<rgw_bucket_olh_log_entry> >& log = olh_data_entry.pending_log;
 
   if (log.begin()->first > op.ver_marker && log.size() <= MAX_OLH_LOG_ENTRIES) {
     op_ret.log = log;
     op_ret.is_truncated = false;
   } else {
-    map<uint64_t, rgw_bucket_olh_log_entry>::iterator iter = log.upper_bound(op.ver_marker);
+    map<uint64_t, vector<rgw_bucket_olh_log_entry> >::iterator iter = log.upper_bound(op.ver_marker);
 
     for (int i = 0; i < MAX_OLH_LOG_ENTRIES && iter != log.end(); ++i, ++iter) {
       op_ret.log[iter->first] = iter->second;
@@ -1610,10 +1612,10 @@ static int rgw_bucket_trim_olh_log(cls_method_context_t hctx, bufferlist *in, bu
   }
 
   /* remove all versions up to and including ver from the pending map */
-  map<uint64_t, rgw_bucket_olh_log_entry>& log = olh_data_entry.pending_log;
-  map<uint64_t, rgw_bucket_olh_log_entry>::iterator liter = log.begin();
+  map<uint64_t, vector<rgw_bucket_olh_log_entry> >& log = olh_data_entry.pending_log;
+  map<uint64_t, vector<rgw_bucket_olh_log_entry> >::iterator liter = log.begin();
   while (liter != log.end() && liter->first <= op.ver) {
-    map<uint64_t, rgw_bucket_olh_log_entry>::iterator rm_iter = liter;
+    map<uint64_t, vector<rgw_bucket_olh_log_entry> >::iterator rm_iter = liter;
     ++liter;
     log.erase(rm_iter);
   }
index aef2a85711a7e4026502b215328129b11562ac07..db7ce4a9185088543389ad6ef8f216a6e96fa39a 100644 (file)
@@ -207,7 +207,7 @@ int cls_rgw_bucket_unlink_instance(librados::IoCtx& io_ctx, const string& oid,
 
 int cls_rgw_get_olh_log(IoCtx& io_ctx, string& oid, librados::ObjectReadOperation& op, const cls_rgw_obj_key& olh, uint64_t ver_marker,
                         const string& olh_tag,
-                        map<uint64_t, struct rgw_bucket_olh_log_entry> *log, bool *is_truncated)
+                        map<uint64_t, vector<struct rgw_bucket_olh_log_entry> > *log, bool *is_truncated)
 {
   bufferlist in, out;
   struct rgw_cls_read_olh_log_op call;
index d1ac36579f055c793d78db83a8b32ec87f3c4125..ccbcc54aac0e9448f1536fc58ae8ced90ccd05df 100644 (file)
@@ -47,7 +47,7 @@ int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid, const cl
 int cls_rgw_bucket_unlink_instance(librados::IoCtx& io_ctx, const string& oid, const cls_rgw_obj_key& key, const string& op_tag);
 int cls_rgw_get_olh_log(librados::IoCtx& io_ctx, string& oid, librados::ObjectReadOperation& op, const cls_rgw_obj_key& olh, uint64_t ver_marker,
                         const string& olh_tag,
-                        map<uint64_t, struct rgw_bucket_olh_log_entry> *log, bool *is_truncated);
+                        map<uint64_t, vector<struct rgw_bucket_olh_log_entry> > *log, bool *is_truncated);
 void cls_rgw_trim_olh_log(librados::ObjectWriteOperation& op, string& oid, const cls_rgw_obj_key& olh, uint64_t ver, const string& olh_tag);
 
 int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid,
index e6bbbf2957ce1ab0c3096fb362e26ab6f0c1950b..54947e19ca5646d64125be48f5a33c6c8610779f 100644 (file)
@@ -211,7 +211,7 @@ void rgw_cls_read_olh_log_ret::generate_test_instances(list<rgw_cls_read_olh_log
   list<rgw_bucket_olh_log_entry *> l;
   rgw_bucket_olh_log_entry::generate_test_instances(l);
   list<rgw_bucket_olh_log_entry *>::iterator iter = l.begin();
-  r->log[1] = *(*iter);
+  r->log[1].push_back(*(*iter));
 
   o.push_back(r);
 
index c11a943de615449e77a23910e4add834938ec7f4..37d8fdeb42ee2b2b6b9139dc991f0147fe0332a1 100644 (file)
@@ -234,7 +234,7 @@ WRITE_CLASS_ENCODER(rgw_cls_read_olh_log_op)
 
 struct rgw_cls_read_olh_log_ret
 {
-  map<uint64_t, struct rgw_bucket_olh_log_entry> log;
+  map<uint64_t, vector<struct rgw_bucket_olh_log_entry> > log;
   bool is_truncated;
 
   rgw_cls_read_olh_log_ret() : is_truncated(false) {}
index 5884b41fe37f0c948c7e53aefda092c309279d4a..7562c02d646cdca3874c7ebbe2cb603218649e3e 100644 (file)
@@ -416,7 +416,7 @@ struct rgw_bucket_olh_entry {
   cls_rgw_obj_key key;
   bool delete_marker;
   uint64_t epoch;
-  map<uint64_t, struct rgw_bucket_olh_log_entry> pending_log;
+  map<uint64_t, vector<struct rgw_bucket_olh_log_entry> > pending_log;
   string tag;
   bool exists;
 
index 5f8642dfa461e909743765050b666b2c372bdf59..f27ee02891053a537d6789b52101ec0dd1e1acb9 100644 (file)
@@ -153,6 +153,21 @@ void decode_json_obj(list<T>& l, JSONObj *obj)
   }
 }
 
+template<class T>
+void decode_json_obj(vector<T>& l, JSONObj *obj)
+{
+  l.clear();
+
+  JSONObjIter iter = obj->find_first();
+
+  for (; !iter.end(); ++iter) {
+    T val;
+    JSONObj *o = *iter;
+    decode_json_obj(val, o);
+    l.push_back(val);
+  }
+}
+
 template<class K, class V>
 void decode_json_obj(map<K, V>& m, JSONObj *obj)
 {
@@ -297,6 +312,16 @@ static void encode_json(const char *name, const std::list<T>& l, ceph::Formatter
   f->close_section();
 }
 
+template<class T>
+static void encode_json(const char *name, const std::vector<T>& l, ceph::Formatter *f)
+{
+  f->open_array_section(name);
+  for (typename std::vector<T>::const_iterator iter = l.begin(); iter != l.end(); ++iter) {
+    encode_json("obj", *iter, f);
+  }
+  f->close_section();
+}
+
 template<class K, class V>
 void encode_json_map(const char *name, const map<K, V>& m, ceph::Formatter *f)
 {
index f4a4ffcda95aa0f2ab97f770c22c38a7c9fdd7b9..689dd659a67e26b05fc5af2e7ab3627de21e7cd9 100644 (file)
@@ -1972,7 +1972,7 @@ next:
   }
 
   if (opt_cmd == OPT_OLH_READLOG) {
-    map<uint64_t, rgw_bucket_olh_log_entry> log;
+    map<uint64_t, vector<rgw_bucket_olh_log_entry> > log;
     bool is_truncated;
 
     RGWObjectCtx rctx(store);
index 725368b2930d16da269eb974b560bccf3488c907..3fc0bc1e34c306b43e109af4c8d591cfce8ea934 100644 (file)
@@ -5591,7 +5591,7 @@ int RGWRados::bucket_index_unlink_instance(rgw_obj& obj_instance, const string&
 }
 
 int RGWRados::bucket_index_read_olh_log(RGWObjState& state, rgw_obj& obj_instance, uint64_t ver_marker,
-                                        map<uint64_t, rgw_bucket_olh_log_entry> *log,
+                                        map<uint64_t, vector<rgw_bucket_olh_log_entry> > *log,
                                         bool *is_truncated)
 {
   rgw_rados_ref ref;
@@ -5655,7 +5655,7 @@ int RGWRados::bucket_index_trim_olh_log(RGWObjState& state, rgw_obj& obj_instanc
 }
 
 int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const string& bucket_owner, rgw_obj& obj,
-                            bufferlist& olh_tag, map<uint64_t, rgw_bucket_olh_log_entry>& log,
+                            bufferlist& olh_tag, map<uint64_t, vector<rgw_bucket_olh_log_entry> >& log,
                             uint64_t *plast_ver)
 {
   if (log.empty()) {
@@ -5667,7 +5667,7 @@ int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const str
   uint64_t last_ver = log.rbegin()->first;
   *plast_ver = last_ver;
 
-  map<uint64_t, rgw_bucket_olh_log_entry>::iterator iter = log.begin();
+  map<uint64_t, vector<rgw_bucket_olh_log_entry> >::iterator iter = log.begin();
 
   op.cmpxattr(RGW_ATTR_OLH_ID_TAG, CEPH_OSD_CMPXATTR_OP_EQ, olh_tag);
   op.cmpxattr(RGW_ATTR_OLH_VER, CEPH_OSD_CMPXATTR_OP_GT, last_ver);
@@ -5678,32 +5678,35 @@ int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const str
   list<cls_rgw_obj_key> remove_instances;
 
   for (iter = log.begin(); iter != log.end(); ++iter) {
-    rgw_bucket_olh_log_entry& entry = iter->second;
-    ldout(cct, 20) << "olh_log_entry: op=" << (int)entry.op
-                   << " key=" << entry.key.name << "[" << entry.key.instance << "] "
-                   << (entry.delete_marker ? "(delete)" : "") << dendl;
-    switch (entry.op) {
-    case CLS_RGW_OLH_OP_REMOVE_INSTANCE:
-      remove_instances.push_back(entry.key);
-      break;
-    case CLS_RGW_OLH_OP_LINK_OLH:
-      need_to_link = true;
-      key = entry.key;
-      delete_marker = entry.delete_marker;
-      break;
-    case CLS_RGW_OLH_OP_UNLINK_OLH:
-      /* treat this as linking into a delete marker */
-      need_to_link = true;
-      key = entry.key;
-      delete_marker = true;
-      break;
-    default:
-      ldout(cct, 0) << "ERROR: apply_olh_log: invalid op: " << (int)entry.op << dendl;
-      return -EIO;
+    vector<rgw_bucket_olh_log_entry>::iterator viter = iter->second.begin();
+    for (; viter != iter->second.end(); ++viter) {
+      rgw_bucket_olh_log_entry& entry = *viter;
+      ldout(cct, 20) << "olh_log_entry: op=" << (int)entry.op
+                     << " key=" << entry.key.name << "[" << entry.key.instance << "] "
+                     << (entry.delete_marker ? "(delete)" : "") << dendl;
+      switch (entry.op) {
+      case CLS_RGW_OLH_OP_REMOVE_INSTANCE:
+        remove_instances.push_back(entry.key);
+        break;
+      case CLS_RGW_OLH_OP_LINK_OLH:
+        need_to_link = true;
+        key = entry.key;
+        delete_marker = entry.delete_marker;
+        break;
+      case CLS_RGW_OLH_OP_UNLINK_OLH:
+        /* treat this as linking into a delete marker */
+        need_to_link = true;
+        key = entry.key;
+        delete_marker = true;
+        break;
+      default:
+        ldout(cct, 0) << "ERROR: apply_olh_log: invalid op: " << (int)entry.op << dendl;
+        return -EIO;
+      }
+      string attr_name = RGW_ATTR_OLH_PENDING_PREFIX;
+      attr_name.append(entry.op_tag);
+      op.rmxattr(attr_name.c_str());
     }
-    string attr_name = RGW_ATTR_OLH_PENDING_PREFIX;
-    attr_name.append(entry.op_tag);
-    op.rmxattr(attr_name.c_str());
   }
 
   rgw_rados_ref ref;
@@ -5756,7 +5759,7 @@ int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const str
  */
 int RGWRados::update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const string& bucket_owner, rgw_obj& obj)
 {
-  map<uint64_t, rgw_bucket_olh_log_entry> log;
+  map<uint64_t, vector<rgw_bucket_olh_log_entry> > log;
   bool is_truncated;
   uint64_t ver_marker = 0;
 
index 2aca4b1c525071a7cee42878ea43b3b432b201f3..279a4bb03c58ac71a3c543ef51e06160afebf700 100644 (file)
@@ -1720,10 +1720,10 @@ public:
                             const string& op_tag, struct rgw_bucket_dir_entry_meta *meta);
   int bucket_index_unlink_instance(rgw_obj& obj_instance, const string& op_tag);
   int bucket_index_read_olh_log(RGWObjState& state, rgw_obj& obj_instance, uint64_t ver_marker,
-                                map<uint64_t, rgw_bucket_olh_log_entry> *log, bool *is_truncated);
+                                map<uint64_t, vector<rgw_bucket_olh_log_entry> > *log, bool *is_truncated);
   int bucket_index_trim_olh_log(RGWObjState& obj_state, rgw_obj& obj_instance, uint64_t ver);
   int apply_olh_log(RGWObjectCtx& ctx, RGWObjState& obj_state, const string& bucket_owner, rgw_obj& obj,
-                    bufferlist& obj_tag, map<uint64_t, rgw_bucket_olh_log_entry>& log,
+                    bufferlist& obj_tag, map<uint64_t, vector<rgw_bucket_olh_log_entry> >& log,
                     uint64_t *plast_ver);
   int update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const string& bucket_owner, rgw_obj& obj);
   int set_olh(RGWObjectCtx& obj_ctx, const string& bucket_owner, rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta);