From: Yehuda Sadeh Date: Tue, 2 Dec 2014 00:22:32 +0000 (-0800) Subject: rgw: data changes log, log info by bucket shard id X-Git-Tag: v0.92~12^2~23 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a33ca59e26b389e8232756971867cfec11aa1219;p=ceph.git rgw: data changes log, log info by bucket shard id Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 1242f546229c..3b178dac43c7 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -1102,9 +1102,9 @@ void rgw_data_change::dump(Formatter *f) const } -int RGWDataChangesLog::choose_oid(rgw_bucket& bucket, int shard_id) { - string& name = bucket.name; - int shard_shift = (shard_id > 0 ? shard_id : 0); +int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) { + const string& name = bs.bucket.name; + int shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0); uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards; return (int)r; @@ -1117,19 +1117,22 @@ int RGWDataChangesLog::renew_entries() /* we can't keep the bucket name as part of the cls_log_entry, and we need * it later, so we keep two lists under the map */ - map, list > > m; + map, list > > m; lock.Lock(); - map entries; + map entries; entries.swap(cur_cycle); lock.Unlock(); - map::iterator iter; + map::iterator iter; string section; utime_t ut = ceph_clock_now(cct); for (iter = entries.begin(); iter != entries.end(); ++iter) { - rgw_bucket& bucket = iter->second; - int index = choose_oid(bucket); + const rgw_bucket_shard& bs = iter->first; + const rgw_bucket& bucket = bs.bucket; + int shard_id = bs.shard_id; + + int index = choose_oid(bs); cls_log_entry entry; @@ -1137,16 +1140,21 @@ int RGWDataChangesLog::renew_entries() bufferlist bl; change.entity_type = ENTITY_TYPE_BUCKET; change.key = bucket.name + ":" + bucket.bucket_id; + if (shard_id >= 0) { + char buf[16]; + snprintf(buf, sizeof(buf), ":%d", shard_id); + change.key += buf; + } change.timestamp = ut; ::encode(change, bl); store->time_log_prepare_entry(entry, ut, section, bucket.name, bl); - m[index].first.push_back(bucket.name); + m[index].first.push_back(bs); m[index].second.push_back(entry); } - map, list > >::iterator miter; + map, list > >::iterator miter; for (miter = m.begin(); miter != m.end(); ++miter) { list& entries = miter->second.second; @@ -1163,8 +1171,8 @@ int RGWDataChangesLog::renew_entries() utime_t expiration = now; expiration += utime_t(cct->_conf->rgw_data_log_window, 0); - list& buckets = miter->second.first; - list::iterator liter; + list& buckets = miter->second.first; + list::iterator liter; for (liter = buckets.begin(); liter != buckets.end(); ++liter) { update_renewed(*liter, expiration); } @@ -1173,28 +1181,28 @@ int RGWDataChangesLog::renew_entries() return 0; } -void RGWDataChangesLog::_get_change(string& bucket_name, ChangeStatusPtr& status) +void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status) { assert(lock.is_locked()); - if (!changes.find(bucket_name, status)) { + if (!changes.find(bs, status)) { status = ChangeStatusPtr(new ChangeStatus); - changes.add(bucket_name, status); + changes.add(bs, status); } } -void RGWDataChangesLog::register_renew(rgw_bucket& bucket) +void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs) { Mutex::Locker l(lock); - cur_cycle[bucket.name] = bucket; + cur_cycle[bs] = true; } -void RGWDataChangesLog::update_renewed(string& bucket_name, utime_t& expiration) +void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, utime_t& expiration) { Mutex::Locker l(lock); ChangeStatusPtr status; - _get_change(bucket_name, status); + _get_change(bs, status); - ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bucket_name << " expiration=" << expiration << dendl; + ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs.bucket.name << " shard_id=" << bs.shard_id << " expiration=" << expiration << dendl; status->cur_expiration = expiration; } @@ -1202,10 +1210,12 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) { if (!store->need_to_log_data()) return 0; + rgw_bucket_shard bs(bucket, shard_id); + lock.Lock(); ChangeStatusPtr status; - _get_change(bucket.name, status); + _get_change(bs, status); lock.Unlock(); @@ -1213,13 +1223,13 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) { status->lock->Lock(); - ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl; + ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl; if (now < status->cur_expiration) { /* no need to send, recently completed */ status->lock->Unlock(); - register_renew(bucket); + register_renew(bs); return 0; } @@ -1236,7 +1246,7 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) { int ret = cond->wait(); cond->put(); if (!ret) { - register_renew(bucket); + register_renew(bs); } return ret; } @@ -1244,7 +1254,7 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) { status->cond = new RefCountedCond; status->pending = true; - string& oid = oids[choose_oid(bucket, shard_id)]; + string& oid = oids[choose_oid(bs)]; utime_t expiration; int ret; @@ -1261,6 +1271,11 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) { rgw_data_change change; change.entity_type = ENTITY_TYPE_BUCKET; change.key = bucket.name + ":" + bucket.bucket_id; + if (shard_id >= 0) { + char buf[16]; + snprintf(buf, sizeof(buf), ":%d", shard_id); + change.key += buf; + } change.timestamp = now; ::encode(change, bl); string section; diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index 03215f1a4cf5..d0c2f4b18493 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -316,13 +316,13 @@ class RGWDataChangesLog { typedef ceph::shared_ptr ChangeStatusPtr; - lru_map changes; + lru_map changes; - map cur_cycle; + map cur_cycle; - void _get_change(string& bucket_name, ChangeStatusPtr& status); - void register_renew(rgw_bucket& bucket); - void update_renewed(string& bucket_name, utime_t& expiration); + void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status); + void register_renew(rgw_bucket_shard& bs); + void update_renewed(rgw_bucket_shard& bs, utime_t& expiration); class ChangesRenewThread : public Thread { CephContext *cct; @@ -364,7 +364,7 @@ public: ~RGWDataChangesLog(); - int choose_oid(rgw_bucket& bucket); + int choose_oid(const rgw_bucket_shard& bs); int add_entry(rgw_bucket& bucket, int shard_id); int renew_entries(); int list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries, diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index 22af8cbd553a..463c1d4d4b95 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -681,6 +681,25 @@ inline ostream& operator<<(ostream& out, const rgw_bucket &b) { return out; } +struct rgw_bucket_shard { + rgw_bucket bucket; + int shard_id; + + rgw_bucket_shard() : shard_id(-1) {} + rgw_bucket_shard(rgw_bucket& _b, int _sid) : bucket(_b), shard_id(_sid) {} + + bool operator<(const rgw_bucket_shard& b) const { + if (bucket < b.bucket) { + return true; + } + if (b.bucket < bucket) { + return false; + } + return shard_id < b.shard_id; + } +}; + + struct RGWObjVersionTracker { obj_version read_version; obj_version write_version; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 7db1e9d0be34..149e7347de72 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2007,7 +2007,7 @@ void RGWRados::shard_name(const string& prefix, unsigned max_shards, const strin name = prefix + buf; } -void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl) +void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, const string& section, const string& key, bufferlist& bl) { cls_log_add_prepare_entry(entry, ut, section, key, bl); } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 66ef2dbfd9b1..1873ed47e583 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1895,7 +1895,7 @@ public: void shard_name(const string& prefix, unsigned max_shards, const string& key, string& name); void shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name); - void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl); + void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, const string& section, const string& key, bufferlist& bl); int time_log_add(const string& oid, list& entries); int time_log_add(const string& oid, const utime_t& ut, const string& section, const string& key, bufferlist& bl); int time_log_list(const string& oid, utime_t& start_time, utime_t& end_time,