class RGWGCIOManager {
CephContext *cct;
RGWGC *gc;
- int index;
struct IO {
librados::AioCompletion *c{nullptr};
string oid;
+ int index{-1};
string tag;
};
list<IO> ios;
- std::list<string> remove_tags;
+ map<int, std::list<string> > remove_tags;
public:
- RGWGCIOManager(CephContext *_cct, RGWGC *_gc, int _index) : cct(_cct),
- gc(_gc),
- index(_index) {}
+ RGWGCIOManager(CephContext *_cct, RGWGC *_gc) : cct(_cct),
+ gc(_gc) {}
~RGWGCIOManager() {
for (auto io : ios) {
io.c->release();
}
}
- int schedule_io(IoCtx *ioctx, const string& oid, ObjectWriteOperation *op, const string& tag) {
+ int schedule_io(IoCtx *ioctx, const string& oid, ObjectWriteOperation *op, int index, const string& tag) {
#warning configurable
#define MAX_CONCURRENT_IO 5
while (ios.size() > MAX_CONCURRENT_IO) {
if (ret < 0) {
return ret;
}
- ios.push_back(IO{c, oid, tag});
+ ios.push_back(IO{c, oid, index, tag});
return 0;
}
int ret = io.c->get_return_value();
io.c->release();
+ auto& rt = remove_tags[io.index];
+
if (ret == -ENOENT) {
ret = 0;
}
goto done;
}
- remove_tags.push_back(io.tag);
+ rt.push_back(io.tag);
#define MAX_REMOVE_CHUNK 16
- if (remove_tags.size() > MAX_REMOVE_CHUNK) {
- drain_remove_tags();
+ if (rt.size() > MAX_REMOVE_CHUNK) {
+ drain_remove_tags(io.index, rt);
}
done:
ios.pop_front();
drain_remove_tags();
}
+ void drain_remove_tags(int index, list<string>& rt) {
+ gc->remove(index, rt);
+ rt.clear();
+ }
+
void drain_remove_tags() {
- gc->remove(index, remove_tags);
- remove_tags.clear();
+ for (auto iter : remove_tags) {
+ drain_remove_tags(iter.first, iter.second);
+ }
}
};
-int RGWGC::process(int index, int max_secs, bool expired_only)
+int RGWGC::process(int index, int max_secs, bool expired_only,
+ RGWGCIOManager& io_manager)
{
rados::cls::lock::Lock l(gc_index_lock_name);
utime_t end = ceph_clock_now();
- RGWGCIOManager io_manager(store->ctx(), this, index);
-
/* max_secs should be greater than zero. We don't want a zero max_secs
* to be translated as no timeout, since we'd then need to break the
* lock and that would require a manual intervention. In this case
ObjectWriteOperation op;
cls_refcount_put(op, info.tag, true);
- ret = io_manager.schedule_io(ctx, oid, &op, info.tag);
+ ret = io_manager.schedule_io(ctx, oid, &op, index, info.tag);
if (ret < 0) {
ldout(store->ctx(), 0) << "WARNING: failed to schedule deletion for oid=" << oid << dendl;
}
}
} while (truncated);
- io_manager.drain();
-
done:
/* we don't drain here, because if we're going down we don't want to hold the system
* if backend is unresponsive
if (ret < 0)
return ret;
+ RGWGCIOManager io_manager(store->ctx(), this);
+
for (int i = 0; i < max_objs; i++) {
int index = (i + start) % max_objs;
- int ret = process(index, max_secs, expired_only);
+ int ret = process(index, max_secs, expired_only, io_manager);
if (ret < 0)
return ret;
}
+ if (!going_down()) {
+ io_manager.drain();
+ }
return 0;
}