}
-int RGWDataChangesLog::choose_oid(rgw_bucket& bucket, int shard_id) {
- string& name = bucket.name;
- int shard_shift = (shard_id > 0 ? shard_id : 0);
+int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
+ const string& name = bs.bucket.name;
+ int shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards;
return (int)r;
/* we can't keep the bucket name as part of the cls_log_entry, and we need
* it later, so we keep two lists under the map */
- map<int, pair<list<string>, list<cls_log_entry> > > m;
+ map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > > m;
lock.Lock();
- map<string, rgw_bucket> entries;
+ map<rgw_bucket_shard, bool> entries;
entries.swap(cur_cycle);
lock.Unlock();
- map<string, rgw_bucket>::iterator iter;
+ map<rgw_bucket_shard, bool>::iterator iter;
string section;
utime_t ut = ceph_clock_now(cct);
for (iter = entries.begin(); iter != entries.end(); ++iter) {
- rgw_bucket& bucket = iter->second;
- int index = choose_oid(bucket);
+ const rgw_bucket_shard& bs = iter->first;
+ const rgw_bucket& bucket = bs.bucket;
+ int shard_id = bs.shard_id;
+
+ int index = choose_oid(bs);
cls_log_entry entry;
bufferlist bl;
change.entity_type = ENTITY_TYPE_BUCKET;
change.key = bucket.name + ":" + bucket.bucket_id;
+ if (shard_id >= 0) {
+ char buf[16];
+ snprintf(buf, sizeof(buf), ":%d", shard_id);
+ change.key += buf;
+ }
change.timestamp = ut;
::encode(change, bl);
store->time_log_prepare_entry(entry, ut, section, bucket.name, bl);
- m[index].first.push_back(bucket.name);
+ m[index].first.push_back(bs);
m[index].second.push_back(entry);
}
- map<int, pair<list<string>, list<cls_log_entry> > >::iterator miter;
+ map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > >::iterator miter;
for (miter = m.begin(); miter != m.end(); ++miter) {
list<cls_log_entry>& entries = miter->second.second;
utime_t expiration = now;
expiration += utime_t(cct->_conf->rgw_data_log_window, 0);
- list<string>& buckets = miter->second.first;
- list<string>::iterator liter;
+ list<rgw_bucket_shard>& buckets = miter->second.first;
+ list<rgw_bucket_shard>::iterator liter;
for (liter = buckets.begin(); liter != buckets.end(); ++liter) {
update_renewed(*liter, expiration);
}
return 0;
}
-void RGWDataChangesLog::_get_change(string& bucket_name, ChangeStatusPtr& status)
+void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status)
{
assert(lock.is_locked());
- if (!changes.find(bucket_name, status)) {
+ if (!changes.find(bs, status)) {
status = ChangeStatusPtr(new ChangeStatus);
- changes.add(bucket_name, status);
+ changes.add(bs, status);
}
}
-void RGWDataChangesLog::register_renew(rgw_bucket& bucket)
+void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs)
{
Mutex::Locker l(lock);
- cur_cycle[bucket.name] = bucket;
+ cur_cycle[bs] = true;
}
-void RGWDataChangesLog::update_renewed(string& bucket_name, utime_t& expiration)
+void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, utime_t& expiration)
{
Mutex::Locker l(lock);
ChangeStatusPtr status;
- _get_change(bucket_name, status);
+ _get_change(bs, status);
- ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bucket_name << " expiration=" << expiration << dendl;
+ ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs.bucket.name << " shard_id=" << bs.shard_id << " expiration=" << expiration << dendl;
status->cur_expiration = expiration;
}
if (!store->need_to_log_data())
return 0;
+ rgw_bucket_shard bs(bucket, shard_id);
+
lock.Lock();
ChangeStatusPtr status;
- _get_change(bucket.name, status);
+ _get_change(bs, status);
lock.Unlock();
status->lock->Lock();
- ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
+ ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
if (now < status->cur_expiration) {
/* no need to send, recently completed */
status->lock->Unlock();
- register_renew(bucket);
+ register_renew(bs);
return 0;
}
int ret = cond->wait();
cond->put();
if (!ret) {
- register_renew(bucket);
+ register_renew(bs);
}
return ret;
}
status->cond = new RefCountedCond;
status->pending = true;
- string& oid = oids[choose_oid(bucket, shard_id)];
+ string& oid = oids[choose_oid(bs)];
utime_t expiration;
int ret;
rgw_data_change change;
change.entity_type = ENTITY_TYPE_BUCKET;
change.key = bucket.name + ":" + bucket.bucket_id;
+ if (shard_id >= 0) {
+ char buf[16];
+ snprintf(buf, sizeof(buf), ":%d", shard_id);
+ change.key += buf;
+ }
change.timestamp = now;
::encode(change, bl);
string section;
typedef ceph::shared_ptr<ChangeStatus> ChangeStatusPtr;
- lru_map<string, ChangeStatusPtr> changes;
+ lru_map<rgw_bucket_shard, ChangeStatusPtr> changes;
- map<string, rgw_bucket> cur_cycle;
+ map<rgw_bucket_shard, bool> cur_cycle;
- void _get_change(string& bucket_name, ChangeStatusPtr& status);
- void register_renew(rgw_bucket& bucket);
- void update_renewed(string& bucket_name, utime_t& expiration);
+ void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status);
+ void register_renew(rgw_bucket_shard& bs);
+ void update_renewed(rgw_bucket_shard& bs, utime_t& expiration);
class ChangesRenewThread : public Thread {
CephContext *cct;
~RGWDataChangesLog();
- int choose_oid(rgw_bucket& bucket);
+ int choose_oid(const rgw_bucket_shard& bs);
int add_entry(rgw_bucket& bucket, int shard_id);
int renew_entries();
int list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries,