{
time_t start_date = 0;
std::string marker;
+ time_t shard_rollover_date = 0;
cls_rgw_lc_obj_head() {}
void encode(ceph::buffer::list& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 2, bl);
uint64_t t = start_date;
encode(t, bl);
encode(marker, bl);
+ encode(shard_rollover_date, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
uint64_t t;
decode(t, bl);
start_date = static_cast<time_t>(t);
decode(marker, bl);
+ if (struct_v < 2) {
+ shard_rollover_date = 0;
+ } else {
+ decode(t, bl);
+ shard_rollover_date = static_cast<time_t>(t);
+ }
DECODE_FINISH(bl);
}
return ret;
} /* RGWLC::process_bucket */
+static inline bool allow_shard_rollover(CephContext* cct, time_t now, time_t shard_rollover_date)
+{
+ /* return true iff:
+ * - non-debug scheduling is in effect, and
+ * - the current shard has not rolled over in the last 24 hours
+ */
+ if (((shard_rollover_date < now) &&
+ (now - shard_rollover_date > 24*60*60)) ||
+ (! shard_rollover_date /* no rollover date stored */) ||
+ (cct->_conf->rgw_lc_debug_interval > 0 /* defaults to -1 == disabled */)) {
+ return true;
+ }
+ return false;
+}
+
int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
bool once = false)
{
/* if there is nothing at head, try to reinitialize head.marker with the
* first entry in the queue */
- if (head.marker.empty()) {
+ if (head.marker.empty() &&
+ allow_shard_rollover(cct, now, head.shard_rollover_date) /* prevent multiple passes by diff.
+ * rgws,in same cycle */) {
+
+ ldpp_dout(this, 5) << "RGWLC::process() process shard rollover lc_shard=" << lc_shard
+ << " head.marker=" << head.marker
+ << " head.shard_rollover_date=" << head.shard_rollover_date
+ << dendl;
+
vector<rgw::sal::Lifecycle::LCEntry> entries;
int ret = sal_lc->list_entries(lc_shard, head.marker, 1, entries);
if (ret < 0) {
entry = entries.front();
head.marker = entry.bucket;
head.start_date = now;
+ head.shard_rollover_date = 0;
}
} else {
ldpp_dout(this, 0) << "RGWLC::process() head.marker !empty() at START for shard=="
}
}
} else {
- ldpp_dout(this, 0) << "RGWLC::process() entry.bucket.empty() == true at START 1"
- << " (this is impossible, but stop now)"
+ ldpp_dout(this, 5) << "RGWLC::process() entry.bucket.empty() == true at START 1"
+ << " (this is possible mainly before any lc policy has been stored"
+ << " or after removal of an lc_shard object)"
<< dendl;
goto exit;
}
"RGWLC::process() cycle finished lc_shard="
<< lc_shard
<< dendl;
+ head.shard_rollover_date = ceph_clock_now();
ret = sal_lc->put_head(lc_shard, head);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
head.marker = cls_head.marker;
head.start_date = cls_head.start_date;
+ head.shard_rollover_date = cls_head.shard_rollover_date;
return ret;
}
cls_head.marker = head.marker;
cls_head.start_date = head.start_date;
+ cls_head.shard_rollover_date = head.shard_rollover_date;
return cls_rgw_lc_put_head(*store->getRados()->get_lc_pool_ctx(), oid, cls_head);
}