return ret;
}
}
+
+ // calculate time and time key
info.time = ceph::real_clock::now();
info.time += make_timespan(expiration_secs);
+ string time_key;
+ get_time_key(info.time, &time_key);
+
+ if (info.chain.objs.empty()) {
+ CLS_LOG(0,
+ "WARNING: %s setting GC log entry with zero-length chain, "
+ "tag='%s', timekey='%s'",
+ __func__, info.tag.c_str(), time_key.c_str());
+ }
+
ret = gc_omap_set(hctx, GC_OBJ_NAME_INDEX, info.tag, &info);
if (ret < 0)
return ret;
- string key;
- get_time_key(info.time, &key);
- ret = gc_omap_set(hctx, GC_OBJ_TIME_INDEX, key, &info);
+ ret = gc_omap_set(hctx, GC_OBJ_TIME_INDEX, time_key, &info);
if (ret < 0)
goto done_err;
return 0;
done_err:
- CLS_LOG(0, "ERROR: gc_set_entry error info.tag=%s, ret=%d\n", info.tag.c_str(), ret);
+
+ CLS_LOG(0, "ERROR: gc_set_entry error info.tag=%s, ret=%d\n",
+ info.tag.c_str(), ret);
gc_omap_remove(hctx, GC_OBJ_NAME_INDEX, info.tag);
+
return ret;
}
return gc_defer_entry(hctx, op.tag, op.expiration_secs);
}
-static int gc_iterate_entries(cls_method_context_t hctx, const string& marker, bool expired_only,
- string& key_iter, uint32_t max_entries, bool *truncated,
- int (*cb)(cls_method_context_t, const string&, cls_rgw_gc_obj_info&, void *),
+static int gc_iterate_entries(cls_method_context_t hctx,
+ const string& marker,
+ bool expired_only,
+ string& out_marker,
+ uint32_t max_entries,
+ bool *truncated,
+ int (*cb)(cls_method_context_t,
+ const string&,
+ cls_rgw_gc_obj_info&,
+ void *),
void *param)
{
- CLS_LOG(10, "gc_iterate_range");
+ CLS_LOG(10, "gc_iterate_entries");
map<string, bufferlist> keys;
string filter_prefix, end_key;
- uint32_t i = 0;
string key;
if (truncated)
string filter;
- int ret = cls_cxx_map_get_vals(hctx, start_key, filter, max_entries, &keys, truncated);
+ int ret = cls_cxx_map_get_vals(hctx, start_key, filter, max_entries,
+ &keys, truncated);
if (ret < 0)
return ret;
-
map<string, bufferlist>::iterator iter = keys.begin();
- if (iter == keys.end())
+ if (iter == keys.end()) {
+ // if keys empty must not come back as truncated
+ ceph_assert(!truncated || !(*truncated));
return 0;
+ }
- uint32_t num_keys = keys.size();
-
- for (; iter != keys.end(); ++iter, ++i) {
+ const string* last_key = nullptr; // last key processed, for end-marker
+ for (; iter != keys.end(); ++iter) {
const string& key = iter->first;
cls_rgw_gc_obj_info e;
return 0;
}
- if (!key_in_index(key, GC_OBJ_TIME_INDEX))
+ if (!key_in_index(key, GC_OBJ_TIME_INDEX)) {
+ if (truncated)
+ *truncated = false;
return 0;
+ }
ret = gc_record_decode(iter->second, e);
if (ret < 0)
ret = cb(hctx, key, e, param);
if (ret < 0)
return ret;
+ last_key = &(iter->first); // update when callback successful
+ }
- if (i == num_keys - 1) {
- key_iter = key;
- }
+ // set the out marker if either caller does not capture truncated or
+ // if they do capture and we are truncated
+ if (!truncated || *truncated) {
+ assert(last_key);
+ out_marker = *last_key;
}
return 0;
#include "include/random.h"
#include <list>
+#include <sstream>
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
remove_tags(cct->_conf->rgw_gc_max_objs) {
max_aio = cct->_conf->rgw_gc_max_concurrent_io;
}
+
~RGWGCIOManager() {
for (auto io : ios) {
io.c->release();
}
}
- int schedule_io(IoCtx *ioctx, const string& oid, ObjectWriteOperation *op, int index, const string& tag) {
+ int schedule_io(IoCtx *ioctx, const string& oid, ObjectWriteOperation *op,
+ int index, const string& tag) {
while (ios.size() > max_aio) {
if (gc->going_down()) {
return 0;
int ret = io.c->get_return_value();
io.c->release();
- auto& rt = remove_tags[io.index];
-
if (ret == -ENOENT) {
ret = 0;
}
if (io.type == IO::IndexIO) {
if (ret < 0) {
- ldout(cct, 0) << "WARNING: gc cleanup of tags on gc shard index=" << io.index << " returned error, ret=" << ret << dendl;
+ ldout(cct, 0) << "WARNING: gc cleanup of tags on gc shard index=" <<
+ io.index << " returned error, ret=" << ret << dendl;
}
goto done;
}
if (ret < 0) {
- ldout(cct, 0) << "WARNING: could not remove oid=" << io.oid << ", ret=" << ret << dendl;
+ ldout(cct, 0) << "WARNING: gc could not remove oid=" << io.oid <<
+ ", ret=" << ret << dendl;
goto done;
}
- rt.push_back(io.tag);
- if (rt.size() > (size_t)cct->_conf->rgw_gc_max_trim_chunk) {
- flush_remove_tags(io.index, rt);
- }
-done:
+ schedule_tag_removal(io.index, io.tag);
+
+ done:
ios.pop_front();
}
+ void schedule_tag_removal(int index, string tag) {
+ auto& rt = remove_tags[index];
+
+ // since every element of a chain tries to add the same tag, and
+ // since chains are handled sequentially, check to make sure it's
+ // not already on the list
+ if (rt.empty() || rt.back() != tag) {
+ rt.push_back(tag);
+ if (rt.size() >= (size_t)cct->_conf->rgw_gc_max_trim_chunk) {
+ flush_remove_tags(index, rt);
+ }
+ }
+ }
+
void drain_ios() {
while (!ios.empty()) {
if (gc->going_down()) {
index_io.type = IO::IndexIO;
index_io.index = index;
+ // use lambda to assemble list, so it will only get executed if
+ // we're at the appropirate logging level
+ auto lister = [&rt]() -> std::string {
+ std::stringstream out;
+ bool first = true;
+
+ for (const auto& s : rt) {
+ if (first) {
+ first = false;
+ } else {
+ out << ", ";
+ }
+ out << s;
+ }
+
+ return out.str();
+ };
+
+ ldout(cct, 20) << __func__ <<
+ " removing entries from gc log shard index=" << index << ", size=" <<
+ rt.size() << ", entries=[" << lister() << "]" << dendl;
+
int ret = gc->remove(index, rt, &index_io.c);
rt.clear();
if (ret < 0) {
- /* we already cleared list of tags, this prevents us from ballooning in case of
- * a persistent problem
+ /* we already cleared list of tags, this prevents us from
+ * ballooning in case of a persistent problem
*/
- ldout(cct, 0) << "WARNING: failed to remove tags on gc shard index=" << index << " ret=" << ret << dendl;
+ ldout(cct, 0) << "WARNING: failed to remove tags on gc shard index=" <<
+ index << " ret=" << ret << dendl;
return;
}
+
ios.push_back(index_io);
}
++index;
}
}
-};
+}; // class RGWGCIOManger
int RGWGC::process(int index, int max_secs, bool expired_only,
RGWGCIOManager& io_manager)
{
+ ldout(cct, 20) << "RGWGC::process entered with GC index_shard=" <<
+ index << ", max_secs=" << max_secs << ", expired_only=" <<
+ expired_only << dendl;
+
rados::cls::lock::Lock l(gc_index_lock_name);
utime_t end = ceph_clock_now();
int ret = l.lock_exclusive(&store->gc_pool_ctx, obj_names[index]);
if (ret == -EBUSY) { /* already locked by another gc processor */
- dout(10) << "RGWGC::process() failed to acquire lock on " << obj_names[index] << dendl;
+ dout(10) << "RGWGC::process failed to acquire lock on " <<
+ obj_names[index] << dendl;
return 0;
}
if (ret < 0)
do {
int max = 100;
std::list<cls_rgw_gc_obj_info> entries;
- ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, max, expired_only, entries, &truncated, next_marker);
+
+ ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, max,
+ expired_only, entries, &truncated, next_marker);
+ ldout(cct, 20) <<
+ "RGWGC::process cls_rgw_gc_list returned with returned:" << ret <<
+ ", entries.size=" << entries.size() << ", truncated=" << truncated <<
+ ", next_marker='" << next_marker << "'" << dendl;
+
if (ret == -ENOENT) {
ret = 0;
goto done;
if (ret < 0)
goto done;
+ marker = next_marker;
+
string last_pool;
std::list<cls_rgw_gc_obj_info>::iterator iter;
for (iter = entries.begin(); iter != entries.end(); ++iter) {
cls_rgw_gc_obj_info& info = *iter;
+
+ ldout(cct, 20) << "RGWGC::process iterating over entry tag='" <<
+ info.tag << "', time=" << info.time << ", chain.objs.size()=" <<
+ info.chain.objs.size() << dendl;
+
std::list<cls_rgw_obj>::iterator liter;
cls_rgw_obj_chain& chain = info.chain;
utime_t now = ceph_clock_now();
- if (now >= end)
+ if (now >= end) {
goto done;
+ }
- for (liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) {
- cls_rgw_obj& obj = *liter;
-
- if (obj.pool != last_pool) {
- delete ctx;
- ctx = new IoCtx;
- ret = rgw_init_ioctx(store->get_rados_handle(), obj.pool, *ctx);
- if (ret < 0) {
- last_pool = "";
- dout(0) << "ERROR: failed to create ioctx pool=" << obj.pool << dendl;
- continue;
+ if (chain.objs.empty()) {
+ io_manager.schedule_tag_removal(index, info.tag);
+ } else {
+ for (liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) {
+ cls_rgw_obj& obj = *liter;
+
+ if (obj.pool != last_pool) {
+ delete ctx;
+ ctx = new IoCtx;
+ ret = rgw_init_ioctx(store->get_rados_handle(), obj.pool, *ctx);
+ if (ret < 0) {
+ last_pool = "";
+ dout(0) << "ERROR: failed to create ioctx pool=" <<
+ obj.pool << dendl;
+ continue;
+ }
+ last_pool = obj.pool;
}
- last_pool = obj.pool;
- }
- ctx->locator_set_key(obj.loc);
+ ctx->locator_set_key(obj.loc);
- const string& oid = obj.key.name; /* just stored raw oid there */
+ const string& oid = obj.key.name; /* just stored raw oid there */
- dout(5) << "gc::process: removing " << obj.pool << ":" << obj.key.name << dendl;
- ObjectWriteOperation op;
- cls_refcount_put(op, info.tag, true);
+ dout(5) << "RGWGC::process removing " << obj.pool <<
+ ":" << obj.key.name << dendl;
+ ObjectWriteOperation op;
+ cls_refcount_put(op, info.tag, true);
- ret = io_manager.schedule_io(ctx, oid, &op, index, info.tag);
- if (ret < 0) {
- ldout(store->ctx(), 0) << "WARNING: failed to schedule deletion for oid=" << oid << dendl;
- }
-
- if (going_down()) // leave early, even if tag isn't removed, it's ok
- goto done;
- }
- }
+ ret = io_manager.schedule_io(ctx, oid, &op, index, info.tag);
+ if (ret < 0) {
+ dout(0) <<
+ "WARNING: failed to schedule deletion for oid=" << oid << dendl;
+ }
+ if (going_down()) {
+ // leave early, even if tag isn't removed, it's ok since it
+ // will be picked up next time around
+ goto done;
+ }
+ } // chains loop
+ } // else -- chains not empty
+ } // entries loop
} while (truncated);
done:
- /* we don't drain here, because if we're going down we don't want to hold the system
- * if backend is unresponsive
+ /* we don't drain here, because if we're going down we don't want to
+ * hold the system if backend is unresponsive
*/
l.unlock(&store->gc_pool_ctx, obj_names[index]);
delete ctx;
+
return 0;
}
Mutex::Locker l(lock);
cond.Signal();
}
-