Mutex lock;
Cond cond;
+ void wait() {
+ Mutex::Locker l(lock);
+ cond.Wait(lock);
+ };
+
+ void wait_interval(const utime_t& wait_time) {
+ Mutex::Locker l(lock);
+ cond.WaitInterval(lock, wait_time);
+ }
+
public:
Worker(CephContext *_cct, RGWRadosThread *_p) : cct(_cct), processor(_p), lock("RGWRadosThread::Worker") {}
void *entry() override;
- void stop() {
+ void signal() {
Mutex::Locker l(lock);
cond.Signal();
}
void start();
void stop();
+
+ void signal() {
+ if (worker) {
+ worker->signal();
+ }
+ }
};
void RGWRadosThread::start()
down_flag = true;
stop_process();
if (worker) {
- worker->stop();
+ worker->signal();
worker->join();
}
delete worker;
utime_t wait_time = interval;
wait_time -= end;
- lock.Lock();
- cond.WaitInterval(lock, wait_time);
- lock.Unlock();
+ wait_interval(wait_time);
} else {
- lock.Lock();
- cond.Wait(lock);
- lock.Unlock();
+ wait();
}
} while (!processor->going_down());
return get_max_chunk_size(pool, max_chunk_size);
}
+class RGWIndexCompletionManager;
+
+struct complete_op_data {
+ RGWIndexCompletionManager *manager{nullptr};
+ rgw_obj obj;
+ RGWModifyOp op;
+ string tag;
+ rgw_bucket_entry_ver ver;
+ cls_rgw_obj_key key;
+ rgw_bucket_dir_entry_meta dir_meta;
+ list<cls_rgw_obj_key> remove_objs;
+ bool log_op;
+ uint16_t bilog_op;
+};
+
+class RGWIndexCompletionThread : public RGWRadosThread {
+ RGWRados *store;
+
+ uint64_t interval_msec() override {
+ return 0;
+ }
+
+ list<complete_op_data *> completions;
+
+ Mutex completions_lock;
+public:
+ RGWIndexCompletionThread(RGWRados *_store)
+ : RGWRadosThread(_store, "index-complete"), store(_store), completions_lock("RGWIndexCompletionThread::completions_lock") {}
+
+ int process() override;
+
+ void add_completion(complete_op_data *completion) {
+ {
+ Mutex::Locker l(completions_lock);
+ completions.push_back(completion);
+ }
+
+ signal();
+ }
+};
+
+int RGWIndexCompletionThread::process()
+{
+ list<complete_op_data *> comps;
+
+ {
+ Mutex::Locker l(completions_lock);
+ completions.swap(comps);
+ }
+
+ for (auto c: comps) {
+ if (!going_down()) {
+ ldout(store->ctx(), 20) << __func__ << "(): handling completion for key=" << c->key << dendl;
+
+ RGWRados::BucketShard bs(store);
+
+ int r = bs.init(c->obj.bucket, c->obj);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to initialize BucketShard, obj=" << c->obj << " r=" << r << dendl;
+ /* not much to do */
+ delete c;
+ continue;
+ }
+
+ r = store->guard_reshard(&bs, c->obj, [&](RGWRados::BucketShard *bs) -> int {
+ librados::ObjectWriteOperation o;
+ cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
+ cls_rgw_bucket_complete_op(o, c->op, c->tag, c->ver, c->key, c->dir_meta, &c->remove_objs,
+ c->log_op, c->bilog_op);
+
+ return bs->index_ctx.operate(bs->bucket_obj, &o);
+ });
+ ldout(cct, 0) << "ERROR: " << __func__ << "(): bucket index completion failed, obj=" << c->obj << " r=" << r << dendl;
+
+ /* ignoring error, can't do anything about it */
+ }
+ delete c;
+ }
+
+ return 0;
+}
+
+class RGWIndexCompletionManager {
+ RGWRados *store{nullptr};
+ Mutex lock{"RGWIndexCompletionManager::lock"};
+ set<complete_op_data *> completions;
+
+ RGWIndexCompletionThread *completion_thread{nullptr};
+
+
+public:
+ RGWIndexCompletionManager(RGWRados *_store) : store(_store) {}
+ ~RGWIndexCompletionManager() {
+ delete completion_thread;
+ }
+
+ void create_completion(const rgw_obj& obj,
+ RGWModifyOp op, string& tag,
+ rgw_bucket_entry_ver& ver,
+ const cls_rgw_obj_key& key,
+ rgw_bucket_dir_entry_meta& dir_meta,
+ list<cls_rgw_obj_key>& remove_objs, bool log_op,
+ uint16_t bilog_op,
+ complete_op_data **result);
+ void handle_completion(completion_t cb, complete_op_data *arg);
+
+ int start() {
+ completion_thread = new RGWIndexCompletionThread(store);
+ int ret = completion_thread->init();
+ if (ret < 0) {
+ return ret;
+ }
+ completion_thread->start();
+ return 0;
+ }
+ void stop() {
+ if (completion_thread) {
+ completion_thread->stop();
+ }
+ }
+};
+
+void RGWIndexCompletionManager::create_completion(const rgw_obj& obj,
+ RGWModifyOp op, string& tag,
+ rgw_bucket_entry_ver& ver,
+ const cls_rgw_obj_key& key,
+ rgw_bucket_dir_entry_meta& dir_meta,
+ list<cls_rgw_obj_key>& remove_objs, bool log_op,
+ uint16_t bilog_op,
+ complete_op_data **result)
+{
+ complete_op_data *entry = new complete_op_data;
+
+ entry->manager = this;
+ entry->obj = obj;
+ entry->op = op;
+ entry->tag = tag;
+ entry->ver = ver;
+ entry->key = key;
+ entry->dir_meta = dir_meta;
+ entry->log_op = log_op;
+ entry->bilog_op = bilog_op;
+ entry->remove_objs = remove_objs;
+
+ *result = entry;
+
+ Mutex::Locker l(lock);
+ completions.insert(entry);
+}
+
+void RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_data *arg)
+{
+ {
+ Mutex::Locker l(lock);
+
+ auto iter = completions.find(arg);
+ if (iter == completions.end()) {
+ return;
+ }
+
+ completions.erase(iter);
+ }
+
+ int r = rados_aio_get_return_value(cb);
+ if (r != -ERR_BUSY_RESHARDING) {
+ delete arg;
+ return;
+ }
+ completion_thread->add_completion(arg);
+}
+
void RGWRados::finalize()
{
if (run_sync_thread) {
delete binfo_cache;
delete obj_tombstone_cache;
delete sync_modules_manager;
+
+ delete reshard;
+ delete index_completion_manager;
}
/**
}
reshard = new RGWReshard(this);
+ index_completion_manager = new RGWIndexCompletionManager(this);
+ ret = index_completion_manager->start();
return ret;
}
ent.meta.owner_display_name = owner.get_display_name();
ent.meta.content_type = content_type;
- ret = store->cls_obj_complete_add(*bs, optag, poolid, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
+ ret = store->cls_obj_complete_add(*bs, obj, optag, poolid, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
int r = store->data_log->add_entry(bs->bucket, bs->shard_id);
if (r < 0) {
return bs.index_ctx.operate(bs.bucket_obj, &o);
}
-int RGWRados::cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag,
+static void obj_complete_cb(completion_t cb, void *arg)
+{
+ ((complete_op_data *)arg)->manager->handle_completion(cb, (complete_op_data *)arg);
+}
+
+int RGWRados::cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModifyOp op, string& tag,
int64_t pool, uint64_t epoch,
rgw_bucket_dir_entry& ent, RGWObjCategory category,
list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *_zones_trace)
cls_rgw_bucket_complete_op(o, op, tag, ver, key, dir_meta, pro,
get_zone().log_data, bilog_flags, zones_trace);
- AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
+ complete_op_data *arg;
+ index_completion_manager->create_completion(obj, op, tag, ver, key, dir_meta, ro,
+ get_zone().log_data, bilog_flags, &arg);
+ AioCompletion *c = librados::Rados::aio_create_completion(arg, NULL, obj_complete_cb);
int ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &o);
c->release();
return ret;
}
-int RGWRados::cls_obj_complete_add(BucketShard& bs, string& tag,
+int RGWRados::cls_obj_complete_add(BucketShard& bs, const rgw_obj& obj, string& tag,
int64_t pool, uint64_t epoch,
rgw_bucket_dir_entry& ent, RGWObjCategory category,
list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *zones_trace)
{
- return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
+ return cls_obj_complete_op(bs, obj, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
}
int RGWRados::cls_obj_complete_del(BucketShard& bs, string& tag,
rgw_bucket_dir_entry ent;
ent.meta.mtime = removed_mtime;
obj.key.get_index_key(&ent.key);
- return cls_obj_complete_op(bs, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, remove_objs, bilog_flags, zones_trace);
+ return cls_obj_complete_op(bs, obj, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, remove_objs, bilog_flags, zones_trace);
}
int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, rgw_obj& obj, uint16_t bilog_flags, rgw_zone_set *zones_trace)
{
rgw_bucket_dir_entry ent;
obj.key.get_index_key(&ent.key);
- return cls_obj_complete_op(bs, CLS_RGW_OP_CANCEL, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL, bilog_flags, zones_trace);
+ return cls_obj_complete_op(bs, obj, CLS_RGW_OP_CANCEL, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL, bilog_flags, zones_trace);
}
int RGWRados::cls_obj_set_bucket_tag_timeout(RGWBucketInfo& bucket_info, uint64_t timeout)