return 0;
}
+class RGWGCIOManager {
+ CephContext *cct;
+ RGWGC *gc;
+ int index;
+
+ struct IO {
+ librados::AioCompletion *c{nullptr};
+ string oid;
+ string tag;
+ };
+
+ list<IO> ios;
+ std::list<string> remove_tags;
+
+public:
+ RGWGCIOManager(CephContext *_cct, RGWGC *_gc, int _index) : cct(_cct),
+ gc(_gc),
+ index(_index) {}
+ ~RGWGCIOManager() {
+ for (auto io : ios) {
+ io.c->release();
+ }
+ }
+
+ int schedule_io(IoCtx *ioctx, const string& oid, ObjectWriteOperation *op, const string& tag) {
+#warning configurable
+#define MAX_CONCURRENT_IO 5
+ while (ios.size() > MAX_CONCURRENT_IO) {
+ if (gc->going_down()) {
+ return 0;
+ }
+ handle_next_completion();
+ }
+
+ AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
+ int ret = ioctx->aio_operate(oid, c, op);
+ if (ret < 0) {
+ return ret;
+ }
+ ios.push_back(IO{c, oid, tag});
+
+ return 0;
+ }
+
+ void handle_next_completion() {
+ assert(!ios.empty());
+ IO& io = ios.front();
+ io.c->wait_for_safe();
+ int ret = io.c->get_return_value();
+ io.c->release();
+
+ if (ret == -ENOENT) {
+ ret = 0;
+ }
+ if (ret < 0) {
+ ldout(cct, 0) << "WARNING: could not remove oid=" << io.oid << ", ret=" << ret << dendl;
+ goto done;
+ }
+
+ remove_tags.push_back(io.tag);
+#define MAX_REMOVE_CHUNK 16
+ if (remove_tags.size() > MAX_REMOVE_CHUNK) {
+ drain_remove_tags();
+ }
+done:
+ ios.pop_front();
+ }
+
+ void drain() {
+ while (!ios.empty()) {
+ if (gc->going_down()) {
+ return;
+ }
+ handle_next_completion();
+ }
+
+ drain_remove_tags();
+ }
+
+ void drain_remove_tags() {
+ gc->remove(index, remove_tags);
+ remove_tags.clear();
+ }
+};
+
int RGWGC::process(int index, int max_secs, bool expired_only)
{
rados::cls::lock::Lock l(gc_index_lock_name);
utime_t end = ceph_clock_now();
- std::list<string> remove_tags;
+
+ RGWGCIOManager io_manager(store->ctx(), this, index);
/* max_secs should be greater than zero. We don't want a zero max_secs
* to be translated as no timeout, since we'd then need to break the
string last_pool;
std::list<cls_rgw_gc_obj_info>::iterator iter;
for (iter = entries.begin(); iter != entries.end(); ++iter) {
- bool remove_tag;
cls_rgw_gc_obj_info& info = *iter;
std::list<cls_rgw_obj>::iterator liter;
cls_rgw_obj_chain& chain = info.chain;
if (now >= end)
goto done;
- remove_tag = true;
for (liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) {
cls_rgw_obj& obj = *liter;
dout(5) << "gc::process: removing " << obj.pool << ":" << obj.key.name << dendl;
ObjectWriteOperation op;
cls_refcount_put(op, info.tag, true);
- ret = ctx->operate(oid, &op);
- if (ret == -ENOENT)
- ret = 0;
+
+ ret = io_manager.schedule_io(ctx, oid, &op, info.tag);
if (ret < 0) {
- remove_tag = false;
- dout(0) << "failed to remove " << obj.pool << ":" << oid << "@" << obj.loc << dendl;
+ ldout(store->ctx(), 0) << "WARNING: failed to schedule deletion for oid=" << oid << dendl;
}
if (going_down()) // leave early, even if tag isn't removed, it's ok
goto done;
}
- if (remove_tag) {
- remove_tags.push_back(info.tag);
-#define MAX_REMOVE_CHUNK 16
- if (remove_tags.size() > MAX_REMOVE_CHUNK) {
- RGWGC::remove(index, remove_tags);
- remove_tags.clear();
- }
- }
- }
- if (!remove_tags.empty()) {
- RGWGC::remove(index, remove_tags);
- remove_tags.clear();
}
} while (truncated);
+ io_manager.drain();
+
done:
- if (!remove_tags.empty())
- RGWGC::remove(index, remove_tags);
+ /* we don't drain here, because if we're going down we don't want to hold the system
+ * if backend is unresponsive
+ */
l.unlock(&store->gc_pool_ctx, obj_names[index]);
delete ctx;
return 0;