]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: data changes log, log info by bucket shard id
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 2 Dec 2014 00:22:32 +0000 (16:22 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Wed, 14 Jan 2015 03:21:25 +0000 (19:21 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_bucket.cc
src/rgw/rgw_bucket.h
src/rgw/rgw_common.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 1242f546229c69d9afda772bafe25e99ffedae9f..3b178dac43c7eb2a25fb86e187c3a25735ed6025 100644 (file)
@@ -1102,9 +1102,9 @@ void rgw_data_change::dump(Formatter *f) const
 }
 
 
-int RGWDataChangesLog::choose_oid(rgw_bucket& bucket, int shard_id) {
-    string& name = bucket.name;
-    int shard_shift = (shard_id > 0 ? shard_id : 0);
+int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
+    const string& name = bs.bucket.name;
+    int shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
     uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards;
 
     return (int)r;
@@ -1117,19 +1117,22 @@ int RGWDataChangesLog::renew_entries()
 
   /* we can't keep the bucket name as part of the cls_log_entry, and we need
    * it later, so we keep two lists under the map */
-  map<int, pair<list<string>, list<cls_log_entry> > > m;
+  map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > > m;
 
   lock.Lock();
-  map<string, rgw_bucket> entries;
+  map<rgw_bucket_shard, bool> entries;
   entries.swap(cur_cycle);
   lock.Unlock();
 
-  map<string, rgw_bucket>::iterator iter;
+  map<rgw_bucket_shard, bool>::iterator iter;
   string section;
   utime_t ut = ceph_clock_now(cct);
   for (iter = entries.begin(); iter != entries.end(); ++iter) {
-    rgw_bucket& bucket = iter->second;
-    int index = choose_oid(bucket);
+    const rgw_bucket_shard& bs = iter->first;
+    const rgw_bucket& bucket = bs.bucket;
+    int shard_id = bs.shard_id;
+
+    int index = choose_oid(bs);
 
     cls_log_entry entry;
 
@@ -1137,16 +1140,21 @@ int RGWDataChangesLog::renew_entries()
     bufferlist bl;
     change.entity_type = ENTITY_TYPE_BUCKET;
     change.key = bucket.name + ":" + bucket.bucket_id;
+    if (shard_id >= 0) {
+      char buf[16];
+      snprintf(buf, sizeof(buf), ":%d", shard_id);
+      change.key += buf;
+    }
     change.timestamp = ut;
     ::encode(change, bl);
 
     store->time_log_prepare_entry(entry, ut, section, bucket.name, bl);
 
-    m[index].first.push_back(bucket.name);
+    m[index].first.push_back(bs);
     m[index].second.push_back(entry);
   }
 
-  map<int, pair<list<string>, list<cls_log_entry> > >::iterator miter;
+  map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > >::iterator miter;
   for (miter = m.begin(); miter != m.end(); ++miter) {
     list<cls_log_entry>& entries = miter->second.second;
 
@@ -1163,8 +1171,8 @@ int RGWDataChangesLog::renew_entries()
     utime_t expiration = now;
     expiration += utime_t(cct->_conf->rgw_data_log_window, 0);
 
-    list<string>& buckets = miter->second.first;
-    list<string>::iterator liter;
+    list<rgw_bucket_shard>& buckets = miter->second.first;
+    list<rgw_bucket_shard>::iterator liter;
     for (liter = buckets.begin(); liter != buckets.end(); ++liter) {
       update_renewed(*liter, expiration);
     }
@@ -1173,28 +1181,28 @@ int RGWDataChangesLog::renew_entries()
   return 0;
 }
 
-void RGWDataChangesLog::_get_change(string& bucket_name, ChangeStatusPtr& status)
+void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status)
 {
   assert(lock.is_locked());
-  if (!changes.find(bucket_name, status)) {
+  if (!changes.find(bs, status)) {
     status = ChangeStatusPtr(new ChangeStatus);
-    changes.add(bucket_name, status);
+    changes.add(bs, status);
   }
 }
 
-void RGWDataChangesLog::register_renew(rgw_bucket& bucket)
+void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs)
 {
   Mutex::Locker l(lock);
-  cur_cycle[bucket.name] = bucket;
+  cur_cycle[bs] = true;
 }
 
-void RGWDataChangesLog::update_renewed(string& bucket_name, utime_t& expiration)
+void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, utime_t& expiration)
 {
   Mutex::Locker l(lock);
   ChangeStatusPtr status;
-  _get_change(bucket_name, status);
+  _get_change(bs, status);
 
-  ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bucket_name << " expiration=" << expiration << dendl;
+  ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs.bucket.name << " shard_id=" << bs.shard_id << " expiration=" << expiration << dendl;
   status->cur_expiration = expiration;
 }
 
@@ -1202,10 +1210,12 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) {
   if (!store->need_to_log_data())
     return 0;
 
+  rgw_bucket_shard bs(bucket, shard_id);
+
   lock.Lock();
 
   ChangeStatusPtr status;
-  _get_change(bucket.name, status);
+  _get_change(bs, status);
 
   lock.Unlock();
 
@@ -1213,13 +1223,13 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) {
 
   status->lock->Lock();
 
-  ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
+  ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
 
   if (now < status->cur_expiration) {
     /* no need to send, recently completed */
     status->lock->Unlock();
 
-    register_renew(bucket);
+    register_renew(bs);
     return 0;
   }
 
@@ -1236,7 +1246,7 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) {
     int ret = cond->wait();
     cond->put();
     if (!ret) {
-      register_renew(bucket);
+      register_renew(bs);
     }
     return ret;
   }
@@ -1244,7 +1254,7 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) {
   status->cond = new RefCountedCond;
   status->pending = true;
 
-  string& oid = oids[choose_oid(bucket, shard_id)];
+  string& oid = oids[choose_oid(bs)];
   utime_t expiration;
 
   int ret;
@@ -1261,6 +1271,11 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) {
     rgw_data_change change;
     change.entity_type = ENTITY_TYPE_BUCKET;
     change.key = bucket.name + ":" + bucket.bucket_id;
+    if (shard_id >= 0) {
+      char buf[16];
+      snprintf(buf, sizeof(buf), ":%d", shard_id);
+      change.key += buf;
+    }
     change.timestamp = now;
     ::encode(change, bl);
     string section;
index 03215f1a4cf501121d72fbfc02207f60eb6a719f..d0c2f4b184938a9cd7cebfddd5d31e51e3d18e40 100644 (file)
@@ -316,13 +316,13 @@ class RGWDataChangesLog {
 
   typedef ceph::shared_ptr<ChangeStatus> ChangeStatusPtr;
 
-  lru_map<string, ChangeStatusPtr> changes;
+  lru_map<rgw_bucket_shard, ChangeStatusPtr> changes;
 
-  map<string, rgw_bucket> cur_cycle;
+  map<rgw_bucket_shard, bool> cur_cycle;
 
-  void _get_change(string& bucket_name, ChangeStatusPtr& status);
-  void register_renew(rgw_bucket& bucket);
-  void update_renewed(string& bucket_name, utime_t& expiration);
+  void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status);
+  void register_renew(rgw_bucket_shard& bs);
+  void update_renewed(rgw_bucket_shard& bs, utime_t& expiration);
 
   class ChangesRenewThread : public Thread {
     CephContext *cct;
@@ -364,7 +364,7 @@ public:
 
   ~RGWDataChangesLog();
 
-  int choose_oid(rgw_bucket& bucket);
+  int choose_oid(const rgw_bucket_shard& bs);
   int add_entry(rgw_bucket& bucket, int shard_id);
   int renew_entries();
   int list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries,
index 22af8cbd553aa382b5088c6f0b0de13bc8ba172e..463c1d4d4b9550f3144f4e8fcb3959edadf984bc 100644 (file)
@@ -681,6 +681,25 @@ inline ostream& operator<<(ostream& out, const rgw_bucket &b) {
   return out;
 }
 
+struct rgw_bucket_shard {
+  rgw_bucket bucket;
+  int shard_id;
+
+  rgw_bucket_shard() : shard_id(-1) {}
+  rgw_bucket_shard(rgw_bucket& _b, int _sid) : bucket(_b), shard_id(_sid) {}
+
+  bool operator<(const rgw_bucket_shard& b) const {
+    if (bucket < b.bucket) {
+      return true;
+    }
+    if (b.bucket < bucket) {
+      return false;
+    }
+    return shard_id < b.shard_id;
+  }
+};
+
+
 struct RGWObjVersionTracker {
   obj_version read_version;
   obj_version write_version;
index 7db1e9d0be34ff22e1a1976422871e1333ed8b83..149e7347de7215a04b0dd66095b358eebe802020 100644 (file)
@@ -2007,7 +2007,7 @@ void RGWRados::shard_name(const string& prefix, unsigned max_shards, const strin
   name = prefix + buf;
 }
 
-void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl)
+void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, const string& section, const string& key, bufferlist& bl)
 {
   cls_log_add_prepare_entry(entry, ut, section, key, bl);
 }
index 66ef2dbfd9b1c144ed0831f2887b6f924fa20203..1873ed47e5834f74a5401d366c21c605e3fae937 100644 (file)
@@ -1895,7 +1895,7 @@ public:
 
   void shard_name(const string& prefix, unsigned max_shards, const string& key, string& name);
   void shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name);
-  void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl);
+  void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, const string& section, const string& key, bufferlist& bl);
   int time_log_add(const string& oid, list<cls_log_entry>& entries);
   int time_log_add(const string& oid, const utime_t& ut, const string& section, const string& key, bufferlist& bl);
   int time_log_list(const string& oid, utime_t& start_time, utime_t& end_time,