]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgwlc: make rgwlc entries extensible and extend
authorMatt Benjamin <mbenjamin@redhat.com>
Wed, 1 Apr 2020 23:21:27 +0000 (19:21 -0400)
committerNathan Cutler <ncutler@suse.com>
Sun, 9 Aug 2020 20:49:11 +0000 (22:49 +0200)
Add generation/run tracking to LC entries.  Define the entry as
a versioned structure, adapt accordingly.

N.B., has extra debug prints from later commit.

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
(cherry picked from commit 394750597656d4f3ab7b8220af7046753117d39b)

Conflicts:
src/cls/rgw/cls_rgw.cc
src/cls/rgw/cls_rgw_client.h
src/cls/rgw/cls_rgw_ops.h
- adapt for Adam Emerson post-Octopus refactoring

src/cls/rgw/cls_rgw.cc
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/cls/rgw/cls_rgw_ops.h
src/cls/rgw/cls_rgw_types.h
src/rgw/rgw_admin.cc
src/rgw/rgw_lc.cc
src/rgw/rgw_lc.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index bdc3f78433246eae219a412123b64e4cf216699b..bd08ed0c72d8cb58c16c67c82e4daff639435592 100644 (file)
@@ -3660,7 +3660,7 @@ static int rgw_cls_lc_get_entry(cls_method_context_t hctx, bufferlist *in, buffe
     return -EINVAL;
   }
 
-  rgw_lc_entry_t lc_entry;
+  cls_rgw_lc_entry lc_entry;
   int ret = read_omap_entry(hctx, op.marker, &lc_entry);
   if (ret < 0)
     return ret;
@@ -3686,7 +3686,7 @@ static int rgw_cls_lc_set_entry(cls_method_context_t hctx, bufferlist *in, buffe
   bufferlist bl;
   encode(op.entry, bl);
 
-  int ret = cls_cxx_map_set_val(hctx, op.entry.first, &bl);
+  int ret = cls_cxx_map_set_val(hctx, op.entry.bucket, &bl);
   return ret;
 }
 
@@ -3702,7 +3702,7 @@ static int rgw_cls_lc_rm_entry(cls_method_context_t hctx, bufferlist *in, buffer
     return -EINVAL;
   }
 
-  int ret = cls_cxx_map_remove_key(hctx, op.entry.first);
+  int ret = cls_cxx_map_remove_key(hctx, op.entry.bucket);
   return ret;
 }
 
@@ -3725,7 +3725,7 @@ static int rgw_cls_lc_get_next_entry(cls_method_context_t hctx, bufferlist *in,
   if (ret < 0)
     return ret;
   map<string, bufferlist>::iterator it;
-  pair<string, int> entry;
+  cls_rgw_lc_entry entry;
   if (!vals.empty()) {
     it=vals.begin();
     in_iter = it->second.begin();
@@ -3741,7 +3741,8 @@ static int rgw_cls_lc_get_next_entry(cls_method_context_t hctx, bufferlist *in,
   return 0;
 }
 
-static int rgw_cls_lc_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+static int rgw_cls_lc_list_entries(cls_method_context_t hctx, bufferlist *in,
+                                  bufferlist *out)
 {
   cls_rgw_lc_list_entries_op op;
   auto in_iter = in->cbegin();
@@ -3752,24 +3753,33 @@ static int rgw_cls_lc_list_entries(cls_method_context_t hctx, bufferlist *in, bu
     return -EINVAL;
   }
 
-  cls_rgw_lc_list_entries_ret op_ret;
+  cls_rgw_lc_list_entries_ret op_ret(op.compat_v);
   bufferlist::const_iterator iter;
   map<string, bufferlist> vals;
   string filter_prefix;
-  int ret = cls_cxx_map_get_vals(hctx, op.marker, filter_prefix, op.max_entries, &vals, &op_ret.is_truncated);
+  int ret = cls_cxx_map_get_vals(hctx, op.marker, filter_prefix, op.max_entries,
+                                &vals, &op_ret.is_truncated);
   if (ret < 0)
     return ret;
   map<string, bufferlist>::iterator it;
-  pair<string, int> entry;
-  for (it = vals.begin(); it != vals.end(); ++it) {
+  for (auto it = vals.begin(); it != vals.end(); ++it) {
+    cls_rgw_lc_entry entry;
     iter = it->second.cbegin();
     try {
-    decode(entry, iter);
+      decode(entry, iter);
     } catch (buffer::error& err) {
-    CLS_LOG(1, "ERROR: rgw_cls_lc_list_entries(): failed to decode entry\n");
-    return -EIO;
-   }
-   op_ret.entries.insert(entry);
+      /* try backward compat */
+      pair<string, int> oe;
+      try {
+       decode(oe, iter);
+       entry = {oe.first, 0 /* start */, uint32_t(oe.second)};
+      } catch(buffer::error& err) {
+       CLS_LOG(
+         1, "ERROR: rgw_cls_lc_list_entries(): failed to decode entry\n");
+      }
+      return -EIO;
+    }
+   op_ret.entries.push_back(entry);
   }
   encode(op_ret, *out);
   return 0;
index 65e59047ebada1547a95de8a50061080f6af42e0..270f9aa2e2bdb1b93b657dcbd6cd3b3d57381481 100644 (file)
@@ -839,7 +839,8 @@ int cls_rgw_lc_put_head(IoCtx& io_ctx, const string& oid, cls_rgw_lc_obj_head& h
   return r;
 }
 
-int cls_rgw_lc_get_next_entry(IoCtx& io_ctx, const string& oid, string& marker, pair<string, int>& entry)
+int cls_rgw_lc_get_next_entry(IoCtx& io_ctx, const string& oid, string& marker,
+                             cls_rgw_lc_entry& entry)
 {
   bufferlist in, out;
   cls_rgw_lc_get_next_entry_op call;
@@ -861,7 +862,8 @@ int cls_rgw_lc_get_next_entry(IoCtx& io_ctx, const string& oid, string& marker,
  return r;
 }
 
-int cls_rgw_lc_rm_entry(IoCtx& io_ctx, const string& oid, const pair<string, int>& entry)
+int cls_rgw_lc_rm_entry(IoCtx& io_ctx, const string& oid,
+                       const cls_rgw_lc_entry& entry)
 {
   bufferlist in, out;
   cls_rgw_lc_rm_entry_op call;
@@ -871,7 +873,8 @@ int cls_rgw_lc_rm_entry(IoCtx& io_ctx, const string& oid, const pair<string, int
  return r;
 }
 
-int cls_rgw_lc_set_entry(IoCtx& io_ctx, const string& oid, const pair<string, int>& entry)
+int cls_rgw_lc_set_entry(IoCtx& io_ctx, const string& oid,
+                        const cls_rgw_lc_entry& entry)
 {
   bufferlist in, out;
   cls_rgw_lc_set_entry_op call;
@@ -881,7 +884,8 @@ int cls_rgw_lc_set_entry(IoCtx& io_ctx, const string& oid, const pair<string, in
   return r;
 }
 
-int cls_rgw_lc_get_entry(IoCtx& io_ctx, const string& oid, const std::string& marker, rgw_lc_entry_t& entry)
+int cls_rgw_lc_get_entry(IoCtx& io_ctx, const string& oid,
+                        const std::string& marker, cls_rgw_lc_entry& entry)
 {
   bufferlist in, out;
   cls_rgw_lc_get_entry_op call{marker};;
@@ -907,7 +911,7 @@ int cls_rgw_lc_get_entry(IoCtx& io_ctx, const string& oid, const std::string& ma
 int cls_rgw_lc_list(IoCtx& io_ctx, const string& oid,
                     const string& marker,
                     uint32_t max_entries,
-                    map<string, int>& entries)
+                    vector<cls_rgw_lc_entry>& entries)
 {
   bufferlist in, out;
   cls_rgw_lc_list_entries_op op;
@@ -930,8 +934,11 @@ int cls_rgw_lc_list(IoCtx& io_ctx, const string& oid,
   } catch (buffer::error& err) {
     return -EIO;
   }
-  entries.insert(ret.entries.begin(),ret.entries.end());
 
+  std::sort(std::begin(ret.entries), std::end(ret.entries),
+           [](const cls_rgw_lc_entry& a, const cls_rgw_lc_entry& b)
+             { return a.bucket < b.bucket; });
+  entries = std::move(ret.entries);
  return r;
 }
 
index bd13710e86686b29484ae64d4e55533f279d38df..c68703c6958f7c60091d542ddf1b5ec0926d0b8a 100644 (file)
@@ -607,14 +607,14 @@ int cls_rgw_gc_list(librados::IoCtx& io_ctx, string& oid, string& marker, uint32
 #ifndef CLS_CLIENT_HIDE_IOCTX
 int cls_rgw_lc_get_head(librados::IoCtx& io_ctx, const string& oid, cls_rgw_lc_obj_head& head);
 int cls_rgw_lc_put_head(librados::IoCtx& io_ctx, const string& oid, cls_rgw_lc_obj_head& head);
-int cls_rgw_lc_get_next_entry(librados::IoCtx& io_ctx, const string& oid, string& marker, pair<string, int>& entry);
-int cls_rgw_lc_rm_entry(librados::IoCtx& io_ctx, const string& oid, const pair<string, int>& entry);
-int cls_rgw_lc_set_entry(librados::IoCtx& io_ctx, const string& oid, const pair<string, int>& entry);
-int cls_rgw_lc_get_entry(librados::IoCtx& io_ctx, const string& oid, const std::string& marker, rgw_lc_entry_t& entry);
+int cls_rgw_lc_get_next_entry(librados::IoCtx& io_ctx, const string& oid, string& marker, cls_rgw_lc_entry& entry);
+int cls_rgw_lc_rm_entry(librados::IoCtx& io_ctx, const string& oid, const cls_rgw_lc_entry& entry);
+int cls_rgw_lc_set_entry(librados::IoCtx& io_ctx, const string& oid, const cls_rgw_lc_entry& entry);
+int cls_rgw_lc_get_entry(librados::IoCtx& io_ctx, const string& oid, const std::string& marker, cls_rgw_lc_entry& entry);
 int cls_rgw_lc_list(librados::IoCtx& io_ctx, const string& oid,
                     const string& marker,
                     uint32_t max_entries,
-                    map<string, int>& entries);
+                    vector<cls_rgw_lc_entry>& entries);
 #endif
 
 /* resharding */
index d752118b2fb19d0cd7b2f130298356d1d3bf8082..785c862dd109dda995f37880fd3dcd99ca8472ba 100644 (file)
@@ -1030,21 +1030,26 @@ struct cls_rgw_lc_get_next_entry_op {
 };
 WRITE_CLASS_ENCODER(cls_rgw_lc_get_next_entry_op)
 
-using rgw_lc_entry_t = std::pair<std::string, int>;
-
 struct cls_rgw_lc_get_next_entry_ret {
-  rgw_lc_entry_t entry;
+  cls_rgw_lc_entry entry;
+
   cls_rgw_lc_get_next_entry_ret() {}
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 2, bl);
     encode(entry, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
-    decode(entry, bl);
+    DECODE_START(2, bl);
+    if (struct_v < 1) {
+      std::pair<std::string, int> oe;
+      decode(oe, bl);
+      entry = {oe.first, 0 /* start */, uint32_t(oe.second)};
+    } else {
+      decode(entry, bl);
+    }
     DECODE_FINISH(bl);
   }
 
@@ -1071,9 +1076,11 @@ struct cls_rgw_lc_get_entry_op {
 WRITE_CLASS_ENCODER(cls_rgw_lc_get_entry_op)
 
 struct cls_rgw_lc_get_entry_ret {
-  rgw_lc_entry_t entry;
+  cls_rgw_lc_entry entry;
+
   cls_rgw_lc_get_entry_ret() {}
-  cls_rgw_lc_get_entry_ret(rgw_lc_entry_t&& _entry) : entry(std::move(_entry)) {}
+  cls_rgw_lc_get_entry_ret(cls_rgw_lc_entry&& _entry)
+    : entry(std::move(_entry)) {}
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
@@ -1090,38 +1097,49 @@ struct cls_rgw_lc_get_entry_ret {
 };
 WRITE_CLASS_ENCODER(cls_rgw_lc_get_entry_ret)
 
-
 struct cls_rgw_lc_rm_entry_op {
-  rgw_lc_entry_t entry;
+  cls_rgw_lc_entry entry;
   cls_rgw_lc_rm_entry_op() {}
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 2, bl);
     encode(entry, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
-    decode(entry, bl);
+    DECODE_START(2, bl);
+    if (struct_v < 1) {
+      std::pair<std::string, int> oe;
+      decode(oe, bl);
+      entry = {oe.first, 0 /* start */, uint32_t(oe.second)};
+    } else {
+      decode(entry, bl);
+    }
     DECODE_FINISH(bl);
   }
 };
 WRITE_CLASS_ENCODER(cls_rgw_lc_rm_entry_op)
 
 struct cls_rgw_lc_set_entry_op {
-  rgw_lc_entry_t entry;
+  cls_rgw_lc_entry entry;
   cls_rgw_lc_set_entry_op() {}
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 2, bl);
     encode(entry, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
-    decode(entry, bl);
+    DECODE_START(2, bl);
+    if (struct_v < 1) {
+      std::pair<std::string, int> oe;
+      decode(oe, bl);
+      entry = {oe.first, 0 /* start */, uint32_t(oe.second)};
+    } else {
+      decode(entry, bl);
+    }
     DECODE_FINISH(bl);
   }
 };
@@ -1171,18 +1189,20 @@ WRITE_CLASS_ENCODER(cls_rgw_lc_get_head_ret)
 struct cls_rgw_lc_list_entries_op {
   string marker;
   uint32_t max_entries = 0;
+  uint8_t compat_v{0};
 
   cls_rgw_lc_list_entries_op() {}
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 1, bl);
     encode(marker, bl);
     encode(max_entries, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
+    DECODE_START(2, bl);
+    compat_v = struct_v;
     decode(marker, bl);
     decode(max_entries, bl);
     DECODE_FINISH(bl);
@@ -1192,27 +1212,46 @@ struct cls_rgw_lc_list_entries_op {
 WRITE_CLASS_ENCODER(cls_rgw_lc_list_entries_op)
 
 struct cls_rgw_lc_list_entries_ret {
-  map<string, int> entries;
+  vector<cls_rgw_lc_entry> entries;
   bool is_truncated{false};
+  uint8_t compat_v;
 
-  cls_rgw_lc_list_entries_ret() {}
+cls_rgw_lc_list_entries_ret(uint8_t compat_v = 3)
+  : compat_v(compat_v) {}
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(2, 1, bl);
-    encode(entries, bl);
+    ENCODE_START(compat_v, 1, bl);
+    if (compat_v <= 2) {
+      map<string, int> oes;
+      std::for_each(entries.begin(), entries.end(),
+                   [&oes](const cls_rgw_lc_entry& elt)
+                     {oes.insert({elt.bucket, elt.status});});
+      encode(oes, bl);
+    } else {
+      encode(entries, bl);
+    }
     encode(is_truncated, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(2, bl);
-    decode(entries, bl);
+    DECODE_START(3, bl);
+    compat_v = struct_v;
+    if (struct_v <= 2) {
+      map<string, int> oes;
+      decode(oes, bl);
+      std::for_each(oes.begin(), oes.end(),
+                   [this](const std::pair<string, int>& oe)
+                     {entries.push_back({oe.first, 0 /* start */,
+                                         uint32_t(oe.second)});});
+    } else {
+      decode(entries, bl);
+    }
     if (struct_v >= 2) {
       decode(is_truncated, bl);
     }
     DECODE_FINISH(bl);
   }
-
 };
 WRITE_CLASS_ENCODER(cls_rgw_lc_list_entries_ret)
 
index 0bd197ae856ad61c0d6c4c832e19a6e7dc400292..620811dbc4a25925a12561f531826749c2d4b0f7 100644 (file)
@@ -1214,6 +1214,37 @@ struct cls_rgw_lc_obj_head
 };
 WRITE_CLASS_ENCODER(cls_rgw_lc_obj_head)
 
+struct cls_rgw_lc_entry {
+  std::string bucket;
+  uint64_t start_time; // if in_progress
+  uint32_t status;
+
+  cls_rgw_lc_entry()
+    : start_time(0), status(0) {}
+
+  cls_rgw_lc_entry(const cls_rgw_lc_entry& rhs) = default;
+
+  cls_rgw_lc_entry(const std::string& b, uint64_t t, uint32_t s)
+    : bucket(b), start_time(t), status(s) {};
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(bucket, bl);
+    encode(start_time, bl);
+    encode(status, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(bucket, bl);
+    decode(start_time, bl);
+    decode(status, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_rgw_lc_entry);
+
 struct cls_rgw_reshard_entry
 {
   ceph::real_time time;
index d4f3d73442da9a047912dc6b2fa7560c270c6b43..84239157c7cfbdbd34da6a97acbc055e25abd446 100644 (file)
@@ -7224,27 +7224,35 @@ next:
 
   if (opt_cmd == OPT::LC_LIST) {
     formatter->open_array_section("lifecycle_list");
-    map<string, int> bucket_lc_map;
+    vector<cls_rgw_lc_entry> bucket_lc_map;
     string marker;
 #define MAX_LC_LIST_ENTRIES 100
     if (max_entries < 0) {
       max_entries = MAX_LC_LIST_ENTRIES;
     }
     do {
-      int ret = store->getRados()->list_lc_progress(marker, max_entries, &bucket_lc_map);
+      int ret = store->getRados()->list_lc_progress(marker, max_entries,
+                                                   bucket_lc_map);
       if (ret < 0) {
-        cerr << "ERROR: failed to list objs: " << cpp_strerror(-ret) << std::endl;
+        cerr << "ERROR: failed to list objs: " << cpp_strerror(-ret)
+            << std::endl;
         return 1;
       }
-      map<string, int>::iterator iter;
-      for (iter = bucket_lc_map.begin(); iter != bucket_lc_map.end(); ++iter) {
+      for (const auto& entry : bucket_lc_map) {
         formatter->open_object_section("bucket_lc_info");
-        formatter->dump_string("bucket", iter->first);
-        string lc_status = LC_STATUS[iter->second];
+        formatter->dump_string("bucket", entry.bucket);
+       char exp_buf[100];
+       time_t t{time_t(entry.start_time)};
+       if (std::strftime(
+             exp_buf, sizeof(exp_buf),
+             "%a, %d %b %Y %T %Z", std::gmtime(&t))) {
+         formatter->dump_string("started", exp_buf);
+       }
+        string lc_status = LC_STATUS[entry.status];
         formatter->dump_string("status", lc_status);
         formatter->close_section(); // objs
         formatter->flush(cout);
-        marker = iter->first;
+        marker = entry.bucket;
       }
     } while (!bucket_lc_map.empty());
 
index dea19cbe33a7c6be4a029f25a4a0e3a653c462e2..92840ab31e1991915653ced5d3b01301ed82a0f5 100644 (file)
@@ -26,6 +26,7 @@
 #include "rgw_zone.h"
 #include "rgw_string.h"
 #include "rgw_multi.h"
+#include "rgw_sal.h"
 
 // this seems safe to use, at least for now--arguably, we should
 // prefer header-only fmt, in general
@@ -288,8 +289,7 @@ bool RGWLC::if_already_run_today(time_t& start_date)
 
 int RGWLC::bucket_lc_prepare(int index, LCWorker* worker)
 {
-  map<string, int > entries;
-
+  vector<cls_rgw_lc_entry> entries;
   string marker;
 
 #define MAX_LC_LIST_ENTRIES 100
@@ -298,20 +298,22 @@ int RGWLC::bucket_lc_prepare(int index, LCWorker* worker)
                              marker, MAX_LC_LIST_ENTRIES, entries);
     if (ret < 0)
       return ret;
-    map<string, int>::iterator iter;
-    for (iter = entries.begin(); iter != entries.end(); ++iter) {
-      pair<string, int > entry(iter->first, lc_uninitial);
+
+    for (auto& entry : entries) {
+      entry.start_time = ceph_clock_now();
+      entry.status = lc_uninitial; // lc_uninitial? really?
       ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
-                                obj_names[index],  entry);
+                                obj_names[index], entry);
       if (ret < 0) {
-        ldpp_dout(this, 0) << "RGWLC::bucket_lc_prepare() failed to set entry on "
-            << obj_names[index] << dendl;
+        ldpp_dout(this, 0)
+         << "RGWLC::bucket_lc_prepare() failed to set entry on "
+         << obj_names[index] << dendl;
         return ret;
       }
     }
 
-    if (!entries.empty()) {
-      marker = std::move(entries.rbegin()->first);
+    if (! entries.empty()) {
+      marker = std::move(entries.back().bucket);
     }
   } while (!entries.empty());
 
@@ -1334,7 +1336,7 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker)
 }
 
 int RGWLC::bucket_lc_post(int index, int max_lock_sec,
-                         pair<string, int>& entry, int& result,
+                         cls_rgw_lc_entry& entry, int& result,
                          LCWorker* worker)
 {
   utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0);
@@ -1343,6 +1345,10 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec,
   l.set_cookie(cookie);
   l.set_duration(lock_duration);
 
+  dout(5) << "RGWLC::bucket_lc_post(): POST " << entry
+         << " index: " << index << " worker ix: " << worker->ix
+         << dendl;
+
   do {
     int ret = l.lock_exclusive(
       &store->getRados()->lc_pool_ctx, obj_names[index]);
@@ -1364,9 +1370,9 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec,
       }
       goto clean;
     } else if (result < 0) {
-      entry.second = lc_failed;
+      entry.status = lc_failed;
     } else {
-      entry.second = lc_complete;
+      entry.status = lc_complete;
     }
 
     ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
@@ -1383,15 +1389,13 @@ clean:
 }
 
 int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries,
-                           map<string, int>* progress_map)
+                           vector<cls_rgw_lc_entry>& progress_map)
 {
   int index = 0;
-  progress_map->clear();
   for(; index <max_objs; index++) {
-    map<string, int > entries;
     int ret =
       cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index], marker,
-                     max_entries, entries);
+                     max_entries, progress_map);
     if (ret < 0) {
       if (ret == -ENOENT) {
         ldpp_dout(this, 10) << __func__ << "() ignoring unfound lc object="
@@ -1401,10 +1405,6 @@ int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries,
         return ret;
       }
     }
-    map<string, int>::iterator iter;
-    for (iter = entries.begin(); iter != entries.end(); ++iter) {
-      progress_map->insert(*iter);
-    }
   }
   return 0;
 }
@@ -1441,7 +1441,8 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker)
   rados::cls::lock::Lock l(lc_index_lock_name);
   do {
     utime_t now = ceph_clock_now();
-    pair<string, int > entry;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
+    //string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
+    cls_rgw_lc_entry entry;
     if (max_lock_secs <= 0)
       return -EAGAIN;
 
@@ -1468,6 +1469,18 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker)
       goto exit;
     }
 
+    if (! (cct->_conf->rgw_lc_lock_max_time == 9969)) {
+      ret = cls_rgw_lc_get_entry(store->getRados()->lc_pool_ctx,
+                                obj_names[index], head.marker, entry);
+      if ((entry.status == lc_processing) &&
+         (true /* XXXX expired epoch! */)) {
+       dout(5) << "RGWLC::process(): ACTIVE entry: " << entry
+               << " index: " << index << " worker ix: " << worker->ix
+               << dendl;
+       goto exit;
+      }
+    }
+
     if(!if_already_run_today(head.start_date)) {
       head.start_date = now;
       head.marker.clear();
@@ -1490,32 +1503,38 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker)
     }
 
     /* termination condition (eof) */
-    if (entry.first.empty())
+    if (entry.bucket.empty())
       goto exit;
 
-    entry.second = lc_processing;
+    ldpp_dout(this, 5) << "RGWLC::process(): START entry 1: " << entry
+           << " index: " << index << " worker ix: " << worker->ix
+           << dendl;
+
+    entry.status = lc_processing;
     ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
-                              obj_names[index],  entry);
+                              obj_names[index], entry);
     if (ret < 0) {
       ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry "
-                        << obj_names[index]
-                        << " (" << entry.first << ","
-                        << entry.second << ")"
-                        << dendl;
+             << obj_names[index] << entry.bucket << entry.status << dendl;
       goto exit;
     }
 
-    head.marker = entry.first;
-    ret = cls_rgw_lc_put_head(store->getRados()->lc_pool_ctx, obj_names[index],
-                             head);
+    head.marker = entry.bucket;
+    ret = cls_rgw_lc_put_head(store->getRados()->lc_pool_ctx,
+                             obj_names[index],  head);
     if (ret < 0) {
       ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
                         << obj_names[index]
-                        << dendl;
+             << dendl;
       goto exit;
     }
+
+    ldpp_dout(this, 5) << "RGWLC::process(): START entry 2: " << entry
+           << " index: " << index << " worker ix: " << worker->ix
+           << dendl;
+
     l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
-    ret = bucket_lc_process(entry.first, worker);
+    ret = bucket_lc_process(entry.bucket, worker);
     bucket_lc_post(index, max_lock_secs, entry, ret, worker);
   } while(1);
 
@@ -1655,9 +1674,9 @@ static std::string get_lc_shard_name(const rgw_bucket& bucket){
 }
 
 template<typename F>
-static int guard_lc_modify(
-  rgw::sal::RGWRadosStore* store, const rgw_bucket& bucket,
-  const string& cookie, const F& f) {
+static int guard_lc_modify(rgw::sal::RGWRadosStore* store,
+                          const rgw_bucket& bucket, const string& cookie,
+                          const F& f) {
   CephContext *cct = store->ctx();
 
   string shard_id = get_lc_shard_name(bucket);
@@ -1665,7 +1684,10 @@ static int guard_lc_modify(
   string oid; 
   get_lc_oid(cct, shard_id, &oid);
 
-  pair<string, int> entry(shard_id, lc_uninitial);
+  /* XXX it makes sense to take shard_id for a bucket_id? */
+  cls_rgw_lc_entry entry;
+  entry.bucket = shard_id;
+  entry.status = lc_uninitial;
   int max_lock_secs = cct->_conf->rgw_lc_lock_max_time;
 
   rados::cls::lock::Lock l(lc_index_lock_name); 
@@ -1718,9 +1740,10 @@ int RGWLC::set_bucket_config(RGWBucketInfo& bucket_info,
 
   rgw_bucket& bucket = bucket_info.bucket;
 
+
   ret = guard_lc_modify(store, bucket, cookie,
                        [&](librados::IoCtx *ctx, const string& oid,
-                           const pair<string, int>& entry) {
+                           const cls_rgw_lc_entry& entry) {
     return cls_rgw_lc_set_entry(*ctx, oid, entry);
   });
 
@@ -1747,7 +1770,7 @@ int RGWLC::remove_bucket_config(RGWBucketInfo& bucket_info,
 
   ret = guard_lc_modify(store, bucket, cookie,
                        [&](librados::IoCtx *ctx, const string& oid,
-                           const pair<string, int>& entry) {
+                           const cls_rgw_lc_entry& entry) {
     return cls_rgw_lc_rm_entry(*ctx, oid, entry);
   });
 
@@ -1775,7 +1798,7 @@ int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store,
   std::string lc_oid;
   get_lc_oid(store->ctx(), shard_name, &lc_oid);
 
-  rgw_lc_entry_t entry;
+  cls_rgw_lc_entry entry;
   // There are multiple cases we need to encounter here
   // 1. entry exists and is already set to marker, happens in plain buckets & newly resharded buckets
   // 2. entry doesn't exist, which usually happens when reshard has happened prior to update and next LC process has already dropped the update
@@ -1799,8 +1822,9 @@ int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store,
 
     ret = guard_lc_modify(
       store, bucket_info.bucket, cookie,
-      [&lc_pool_ctx, &lc_oid](librados::IoCtx *ctx, const string& oid,
-                             const pair<string, int>& entry) {
+      [&lc_pool_ctx, &lc_oid](librados::IoCtx* ctx,
+                             const string& oid,
+                             const cls_rgw_lc_entry& entry) {
        return cls_rgw_lc_set_entry(*lc_pool_ctx, lc_oid, entry);
       });
 
index 57f02a63e81f61f5046b2ddf015d812a93e1628b..793e6f90b7b87e4ccc6202fbf3cda7e672c8e9d8 100644 (file)
@@ -467,6 +467,7 @@ public:
     const DoutPrefixProvider *dpp;
     CephContext *cct;
     RGWLC *lc;
+    int ix;
     ceph::mutex lock = ceph::make_mutex("LCWorker");
     ceph::condition_variable cond;
     WorkPool* workpool{nullptr};
@@ -497,10 +498,12 @@ public:
   int process(LCWorker* worker);
   int process(int index, int max_secs, LCWorker* worker);
   bool if_already_run_today(time_t& start_date);
-  int list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map);
+  int list_lc_progress(const string& marker, uint32_t max_entries,
+                      vector<cls_rgw_lc_entry>&);
   int bucket_lc_prepare(int index, LCWorker* worker);
   int bucket_lc_process(string& shard_id, LCWorker* worker);
-  int bucket_lc_post(int index, int max_lock_sec, pair<string, int >& entry, int& result, LCWorker* worker);
+  int bucket_lc_post(int index, int max_lock_sec,
+                    cls_rgw_lc_entry& entry, int& result, LCWorker* worker);
   bool going_down();
   void start_processor();
   void stop_processor();
index 44527c9e0fa35125e967f1c84c10698229402091..1623c12f9efc41f254db405f90a44b1fbd934690 100644 (file)
@@ -8035,7 +8035,8 @@ int RGWRados::process_gc(bool expired_only)
   return gc->process(expired_only);
 }
 
-int RGWRados::list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map)
+int RGWRados::list_lc_progress(const string& marker, uint32_t max_entries,
+                              vector<cls_rgw_lc_entry>& progress_map)
 {
   return lc->list_lc_progress(marker, max_entries, progress_map);
 }
index e19b8d44811e63265c31c9ca79f7c85bbae9f57c..6ac58fc305792f95b08bdd10d1e4f7a42e04f746 100644 (file)
@@ -1444,7 +1444,8 @@ public:
   int defer_gc(void *ctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj, optional_yield y);
 
   int process_lc();
-  int list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map);
+  int list_lc_progress(const string& marker, uint32_t max_entries,
+                      vector<cls_rgw_lc_entry>& progress_map);
   
   int bucket_check_index(RGWBucketInfo& bucket_info,
                          map<RGWObjCategory, RGWStorageStats> *existing_stats,