]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: changed data log renew thread
authorYehuda Sadeh <yehuda@inktank.com>
Thu, 2 May 2013 04:34:38 +0000 (21:34 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Wed, 8 May 2013 18:22:08 +0000 (11:22 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/cls/log/cls_log.cc
src/cls/log/cls_log_client.cc
src/cls/log/cls_log_client.h
src/cls/log/cls_log_ops.h
src/rgw/rgw_bucket.cc
src/rgw/rgw_bucket.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index c2d50fb6a34f37682548b5ece06cfcc4b61dbde8..ac5efc4f0b5cf76035c9cdf8f4d3537f04b7c671 100644 (file)
@@ -70,17 +70,20 @@ static int cls_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *ou
     return -EINVAL;
   }
 
-  cls_log_entry& entry = op.entry;
+  for (list<cls_log_entry>::iterator iter = op.entries.begin();
+       iter != op.entries.end(); ++iter) {
+    cls_log_entry& entry = *iter;
 
-  string index;
+    string index;
 
-  get_index(hctx, entry.timestamp, index);
+    get_index(hctx, entry.timestamp, index);
 
-  CLS_LOG(0, "storing entry at %s", index.c_str());
+    CLS_LOG(0, "storing entry at %s", index.c_str());
 
-  int ret = write_log_entry(hctx, index, entry);
-  if (ret < 0)
-    return ret;
+    int ret = write_log_entry(hctx, index, entry);
+    if (ret < 0)
+      return ret;
+  }
   
   return 0;
 }
index d1c199ba26384bd329871045ae221888177bd1f1..c551f40735876c74569878d663c2d645c1d35292 100644 (file)
@@ -9,25 +9,39 @@ using namespace librados;
 
 
 
+void cls_log_add(librados::ObjectWriteOperation& op, list<cls_log_entry>& entries)
+{
+  bufferlist in;
+  cls_log_add_op call;
+  call.entries = entries;
+  ::encode(call, in);
+  op.exec("log", "add", in);
+}
+
 void cls_log_add(librados::ObjectWriteOperation& op, cls_log_entry& entry)
 {
   bufferlist in;
   cls_log_add_op call;
-  call.entry = entry;
+  call.entries.push_back(entry);
   ::encode(call, in);
   op.exec("log", "add", in);
 }
 
-void cls_log_add(librados::ObjectWriteOperation& op, const utime_t& timestamp,
+void cls_log_add_prepare_entry(cls_log_entry& entry, const utime_t& timestamp,
                  const string& section, const string& name, bufferlist& bl)
 {
-  cls_log_entry entry;
-
   entry.timestamp = timestamp;
   entry.section = section;
   entry.name = name;
   entry.data = bl;
+}
+
+void cls_log_add(librados::ObjectWriteOperation& op, const utime_t& timestamp,
+                 const string& section, const string& name, bufferlist& bl)
+{
+  cls_log_entry entry;
 
+  cls_log_add_prepare_entry(entry, timestamp, section, name, bl);
   cls_log_add(op, entry);
 }
 
index 6c0046b26f29f34b8dea9f50e5706151b0430e47..4171adbda113f311091f4a65055f0ec4edbbfb69 100644 (file)
@@ -9,6 +9,10 @@
  * log objclass
  */
 
+void cls_log_add_prepare_entry(cls_log_entry& entry, const utime_t& timestamp,
+                 const string& section, const string& name, bufferlist& bl);
+
+void cls_log_add(librados::ObjectWriteOperation& op, list<cls_log_entry>& entry);
 void cls_log_add(librados::ObjectWriteOperation& op, cls_log_entry& entry);
 void cls_log_add(librados::ObjectWriteOperation& op, const utime_t& timestamp,
                  const string& section, const string& name, bufferlist& bl);
index 6dc457ed5fd3acfa05bb2d38a31e70b50d86e1d6..6a5366f5a6607701cef6885004cda87d580b9ad5 100644 (file)
@@ -8,19 +8,19 @@
 #include "cls_log_types.h"
 
 struct cls_log_add_op {
-  cls_log_entry entry;
+  list<cls_log_entry> entries;
 
   cls_log_add_op() {}
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
-    ::encode(entry, bl);
+    ::encode(entries, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::iterator& bl) {
     DECODE_START(1, bl);
-    ::decode(entry, bl);
+    ::decode(entries, bl);
     DECODE_FINISH(bl);
   }
 };
index b256ceb93f46c0bdf349929f0e5d4d8869127150..df4ef93aa3c438639dac2e60d67edeb5c7f7cefc 100644 (file)
@@ -934,105 +934,6 @@ int RGWBucketAdminOp::info(RGWRados *store, RGWBucketAdminOpState& op_state,
 }
 
 
-#if 0
-
-class CompletionMap {
-  map<rgw_bucket, RefCountedCond *> entries;
-  Mutex lock;
-
-public:
-
-  void add(string& s) {
-    Mutex::Locker l(lock);
-
-    entries[s] = new RefCountedObject;
-  }
-
-
-  bool wait(string& s) {
-    map<string, RefCountedCond *>::iterator iter;
-    l.Lock();
-    iter = entries.find(s);
-    if (iter == entries.end()) {
-      l.Unlock();
-      return false;
-    }
-
-    RefCountedCond *rcc = iter->second;
-    rcc->get();
-    l.Unlock();
-
-    rcc->wait();
-    rcc->put();
-
-    return true;
-
-  }
-
-  void complete(string& s) {
-    lock.Lock();
-
-    map<string, RefCountedCond *>::iterator iter = entries.find(s);
-    if (iter == entries.end()) {
-      lock.Unlock();
-      return;
-    }
-
-    RefCountedCond *rcc = iter->second;
-
-    entries.erase(iter);
-
-    lock.Unlock();
-
-    rcc->complete();
-    rcc->put();
-  }
-
-};
-
-
-class RGWChangedBucketsTracker {
-  CephContext *cct;
-  RGWRados *store;
-
-  map<rgw_bucket, utime_t> last_reported;
-
-  struct PendingInfo : public RefCountedCond {
-    PendingInfo() {}
-  };
-
-  CompletionMap pending;
-
-  Mutex lock;
-public:
-  RGWChangedBucketsTracker(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("RGWChanedBucketsTracker") {}
-
-  int report_bucket_changed(rgw_bucket& bucket) {
-    lock.Lock();
-
-    map<rgw_bucket, utime_t>::iteartor iter = last_reported.find(bucket);
-
-    bool exists = (iter != iter.end());
-    if (exists) {
-      utime_t& t = iter->second;
-      utime_t now = ceph_clock_now(cct);
-
-      if (now > t + get_resolution_sec())
-        exists = false;
-    }
-
-    lock.Unlock();
-
-    if (exists)
-      return true;
-  }
-
-  uint32_t get_resolution_sec();
-};
-
-
-#endif
-
 void rgw_data_change::dump(Formatter *f) const
 {
   string type;
@@ -1055,6 +956,44 @@ int RGWDataChangesLog::choose_oid(rgw_bucket& bucket) {
     return (int)r;
 }
 
+int RGWDataChangesLog::renew_entries()
+{
+  map<int, list<cls_log_entry> > m;
+
+  map<string, rgw_bucket>::iterator iter;
+  string section;
+  utime_t ut = ceph_clock_now(cct);
+  for (iter = cur_cycle.begin(); iter != cur_cycle.end(); ++iter) {
+    rgw_bucket& bucket = iter->second;
+    int index = choose_oid(bucket);
+
+    cls_log_entry entry;
+
+    rgw_data_change change;
+    bufferlist bl;
+    change.entity_type = ENTITY_TYPE_BUCKET;
+    change.key = bucket.name;
+    ::encode(change, bl);
+
+    store->time_log_prepare_entry(entry, ut, section, bucket.name, bl);
+
+    m[index].push_back(entry);
+  }
+
+  map<int, list<cls_log_entry> >::iterator miter;
+  for (miter = m.begin(); miter != m.end(); ++miter) {
+    list<cls_log_entry>& entries = miter->second;
+
+    int ret = store->time_log_add(oids[miter->first], entries);
+    if (ret < 0) {
+      lderr(cct) << "ERROR: store->time_log_add() returned " << ret << dendl;
+      return ret;
+    }
+  }
+
+  return 0;
+}
+
 int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
   lock.Lock();
 
@@ -1064,6 +1003,8 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
     changes.add(bucket.name, status);
   }
 
+  cur_cycle[bucket.name] = bucket;
+
   lock.Unlock();
 
   utime_t now = ceph_clock_now(cct);
@@ -1179,3 +1120,41 @@ int RGWDataChangesLog::list_entries(utime_t& start_time, utime_t& end_time, int
 
   return 0;
 }
+
+bool RGWDataChangesLog::going_down()
+{
+  return (down_flag.read() != 0);
+}
+
+RGWDataChangesLog::~RGWDataChangesLog() {
+  down_flag.set(1);
+  renew_thread->stop();
+  renew_thread->join();
+  delete[] oids;
+}
+
+void *RGWDataChangesLog::ChangesRenewThread::entry() {
+  do {
+    dout(2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl;
+    int r = log->renew_entries();
+    if (r < 0) {
+      dout(0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl;
+    }
+
+    if (log->going_down())
+      break;
+
+    lock.Lock();
+    cond.WaitInterval(cct, lock, utime_t(20, 0));
+    lock.Unlock();
+  } while (!log->going_down());
+
+  return NULL;
+}
+
+void RGWDataChangesLog::ChangesRenewThread::stop()
+{
+  Mutex::Locker l(lock);
+  cond.Signal();
+}
+
index ff4992a84554e806a617418ef5f2b4ecf343d436..ecc4cb693a59aa9aac0ad9908ea8b872f812e721 100644 (file)
@@ -260,6 +260,8 @@ class RGWDataChangesLog {
 
   Mutex lock;
 
+  atomic_t down_flag;
+
   struct ChangeStatus {
     utime_t cur_expiration;
     utime_t cur_sent;
@@ -280,6 +282,22 @@ class RGWDataChangesLog {
 
   lru_map<string, ChangeStatusPtr> changes;
 
+  map<string, rgw_bucket> cur_cycle;
+
+  class ChangesRenewThread : public Thread {
+    CephContext *cct;
+    RGWDataChangesLog *log;
+    Mutex lock;
+    Cond cond;
+
+  public:
+    ChangesRenewThread(CephContext *_cct, RGWDataChangesLog *_log) : cct(_cct), log(_log), lock("ChangesRenewThread") {}
+    void *entry();
+    void stop();
+  };
+
+  ChangesRenewThread *renew_thread;
+
 public:
 
   RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("RGWDataChangesLog"),
@@ -294,14 +312,16 @@ public:
       snprintf(buf, sizeof(buf), "%s.%d", prefix, i);
       oids[i] = buf;
     }
-  }
 
-  ~RGWDataChangesLog() {
-    delete[] oids;
+    renew_thread = new ChangesRenewThread(cct, this);
+    renew_thread->create();
   }
 
+  ~RGWDataChangesLog();
+
   int choose_oid(rgw_bucket& bucket);
   int add_entry(rgw_bucket& bucket);
+  int renew_entries();
   int list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries,
                list<rgw_data_change>& entries, string& marker, bool *truncated);
 
@@ -313,6 +333,8 @@ public:
   };
   int list_entries(utime_t& start_time, utime_t& end_time, int max_entries,
                list<rgw_data_change>& entries, LogMarker& marker, bool *ptruncated);
+
+  bool going_down();
 };
 
 
index 419b1fbac648b0d208d8eb3d7f040d9b3fd505cf..0b818cacd5c1bbc7a2b3c5b7c9c93c7a3d6ea9b4 100644 (file)
@@ -1096,6 +1096,11 @@ void RGWRados::shard_name(const string& prefix, unsigned max_shards, string& sec
   name = prefix + buf;
 }
 
+void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl)
+{
+  cls_log_add_prepare_entry(entry, ut, section, key, bl);
+}
+
 int RGWRados::time_log_add(const string& oid, const utime_t& ut, string& section, string& key, bufferlist& bl)
 {
   librados::IoCtx io_ctx;
@@ -1121,6 +1126,31 @@ int RGWRados::time_log_add(const string& oid, const utime_t& ut, string& section
   return r;
 }
 
+int RGWRados::time_log_add(const string& oid, list<cls_log_entry>& entries)
+{
+  librados::IoCtx io_ctx;
+
+  const char *log_pool = zone.log_pool.name.c_str();
+  int r = rados->ioctx_create(log_pool, io_ctx);
+  if (r == -ENOENT) {
+    rgw_bucket pool(log_pool);
+    r = create_pool(pool);
+    if (r < 0)
+      return r;
+    // retry
+    r = rados->ioctx_create(log_pool, io_ctx);
+  }
+  if (r < 0)
+    return r;
+
+  ObjectWriteOperation op;
+  cls_log_add(op, entries);
+
+  r = io_ctx.operate(oid, &op);
+  return r;
+}
+
 int RGWRados::time_log_list(const string& oid, utime_t& start_time, utime_t& end_time,
                             int max_entries, list<cls_log_entry>& entries, string& marker, bool *truncated)
 {
index f3d095f92d400f8f459f747325c040970f752b96..5d59f9a2a7b65ea4a84a66d3ea3cdce7bfcfe9a1 100644 (file)
@@ -987,6 +987,8 @@ public:
 
   void shard_name(const string& prefix, unsigned max_shards, string& key, string& name);
   void shard_name(const string& prefix, unsigned max_shards, string& section, string& key, string& name);
+  void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl);
+  int time_log_add(const string& oid, list<cls_log_entry>& entries);
   int time_log_add(const string& oid, const utime_t& ut, string& section, string& key, bufferlist& bl);
   int time_log_list(const string& oid, utime_t& start_time, utime_t& end_time,
                     int max_entries, list<cls_log_entry>& entries, string& marker, bool *truncated);