From: Yehuda Sadeh Date: Sat, 13 May 2017 00:07:04 +0000 (-0700) Subject: rgw: guard resharding in async completions X-Git-Tag: ses5-milestone6~8^2~7^2~48 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=245cd0f9af395a09331618864b93384f3b330e13;p=ceph.git rgw: guard resharding in async completions Catch return value from async index completions, resend operations synchronously on a separate worker thread if needed. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 55da65a1ace1..02d827154ada 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2952,10 +2952,20 @@ class RGWRadosThread { Mutex lock; Cond cond; + void wait() { + Mutex::Locker l(lock); + cond.Wait(lock); + }; + + void wait_interval(const utime_t& wait_time) { + Mutex::Locker l(lock); + cond.WaitInterval(lock, wait_time); + } + public: Worker(CephContext *_cct, RGWRadosThread *_p) : cct(_cct), processor(_p), lock("RGWRadosThread::Worker") {} void *entry() override; - void stop() { + void signal() { Mutex::Locker l(lock); cond.Signal(); } @@ -2987,6 +2997,12 @@ public: void start(); void stop(); + + void signal() { + if (worker) { + worker->signal(); + } + } }; void RGWRadosThread::start() @@ -3000,7 +3016,7 @@ void RGWRadosThread::stop() down_flag = true; stop_process(); if (worker) { - worker->stop(); + worker->signal(); worker->join(); } delete worker; @@ -3037,13 +3053,9 @@ void *RGWRadosThread::Worker::entry() { utime_t wait_time = interval; wait_time -= end; - lock.Lock(); - cond.WaitInterval(lock, wait_time); - lock.Unlock(); + wait_interval(wait_time); } else { - lock.Lock(); - cond.Wait(lock); - lock.Unlock(); + wait(); } } while (!processor->going_down()); @@ -3368,6 +3380,177 @@ int RGWRados::get_max_chunk_size(const string& placement_rule, const rgw_obj& ob return get_max_chunk_size(pool, max_chunk_size); } +class RGWIndexCompletionManager; + +struct complete_op_data { + RGWIndexCompletionManager *manager{nullptr}; + rgw_obj obj; + RGWModifyOp op; + string tag; + rgw_bucket_entry_ver ver; + cls_rgw_obj_key key; + rgw_bucket_dir_entry_meta dir_meta; + list remove_objs; + bool log_op; + uint16_t bilog_op; +}; + +class RGWIndexCompletionThread : public RGWRadosThread { + RGWRados *store; + + uint64_t interval_msec() override { + return 0; + } + + list completions; + + Mutex completions_lock; +public: + RGWIndexCompletionThread(RGWRados *_store) + : RGWRadosThread(_store, "index-complete"), store(_store), completions_lock("RGWIndexCompletionThread::completions_lock") {} + + int process() override; + + void add_completion(complete_op_data *completion) { + { + Mutex::Locker l(completions_lock); + completions.push_back(completion); + } + + signal(); + } +}; + +int RGWIndexCompletionThread::process() +{ + list comps; + + { + Mutex::Locker l(completions_lock); + completions.swap(comps); + } + + for (auto c: comps) { + if (!going_down()) { + ldout(store->ctx(), 20) << __func__ << "(): handling completion for key=" << c->key << dendl; + + RGWRados::BucketShard bs(store); + + int r = bs.init(c->obj.bucket, c->obj); + if (r < 0) { + ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to initialize BucketShard, obj=" << c->obj << " r=" << r << dendl; + /* not much to do */ + delete c; + continue; + } + + r = store->guard_reshard(&bs, c->obj, [&](RGWRados::BucketShard *bs) -> int { + librados::ObjectWriteOperation o; + cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING); + cls_rgw_bucket_complete_op(o, c->op, c->tag, c->ver, c->key, c->dir_meta, &c->remove_objs, + c->log_op, c->bilog_op); + + return bs->index_ctx.operate(bs->bucket_obj, &o); + }); + ldout(cct, 0) << "ERROR: " << __func__ << "(): bucket index completion failed, obj=" << c->obj << " r=" << r << dendl; + + /* ignoring error, can't do anything about it */ + } + delete c; + } + + return 0; +} + +class RGWIndexCompletionManager { + RGWRados *store{nullptr}; + Mutex lock{"RGWIndexCompletionManager::lock"}; + set completions; + + RGWIndexCompletionThread *completion_thread{nullptr}; + + +public: + RGWIndexCompletionManager(RGWRados *_store) : store(_store) {} + ~RGWIndexCompletionManager() { + delete completion_thread; + } + + void create_completion(const rgw_obj& obj, + RGWModifyOp op, string& tag, + rgw_bucket_entry_ver& ver, + const cls_rgw_obj_key& key, + rgw_bucket_dir_entry_meta& dir_meta, + list& remove_objs, bool log_op, + uint16_t bilog_op, + complete_op_data **result); + void handle_completion(completion_t cb, complete_op_data *arg); + + int start() { + completion_thread = new RGWIndexCompletionThread(store); + int ret = completion_thread->init(); + if (ret < 0) { + return ret; + } + completion_thread->start(); + return 0; + } + void stop() { + if (completion_thread) { + completion_thread->stop(); + } + } +}; + +void RGWIndexCompletionManager::create_completion(const rgw_obj& obj, + RGWModifyOp op, string& tag, + rgw_bucket_entry_ver& ver, + const cls_rgw_obj_key& key, + rgw_bucket_dir_entry_meta& dir_meta, + list& remove_objs, bool log_op, + uint16_t bilog_op, + complete_op_data **result) +{ + complete_op_data *entry = new complete_op_data; + + entry->manager = this; + entry->obj = obj; + entry->op = op; + entry->tag = tag; + entry->ver = ver; + entry->key = key; + entry->dir_meta = dir_meta; + entry->log_op = log_op; + entry->bilog_op = bilog_op; + entry->remove_objs = remove_objs; + + *result = entry; + + Mutex::Locker l(lock); + completions.insert(entry); +} + +void RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_data *arg) +{ + { + Mutex::Locker l(lock); + + auto iter = completions.find(arg); + if (iter == completions.end()) { + return; + } + + completions.erase(iter); + } + + int r = rados_aio_get_return_value(cb); + if (r != -ERR_BUSY_RESHARDING) { + delete arg; + return; + } + completion_thread->add_completion(arg); +} + void RGWRados::finalize() { if (run_sync_thread) { @@ -3459,6 +3642,9 @@ void RGWRados::finalize() delete binfo_cache; delete obj_tombstone_cache; delete sync_modules_manager; + + delete reshard; + delete index_completion_manager; } /** @@ -4271,6 +4457,8 @@ int RGWRados::init_complete() } reshard = new RGWReshard(this); + index_completion_manager = new RGWIndexCompletionManager(this); + ret = index_completion_manager->start(); return ret; } @@ -9646,7 +9834,7 @@ int RGWRados::Bucket::UpdateIndex::complete(int64_t poolid, uint64_t epoch, ent.meta.owner_display_name = owner.get_display_name(); ent.meta.content_type = content_type; - ret = store->cls_obj_complete_add(*bs, optag, poolid, epoch, ent, category, remove_objs, bilog_flags, zones_trace); + ret = store->cls_obj_complete_add(*bs, obj, optag, poolid, epoch, ent, category, remove_objs, bilog_flags, zones_trace); int r = store->data_log->add_entry(bs->bucket, bs->shard_id); if (r < 0) { @@ -12213,7 +12401,12 @@ int RGWRados::cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, return bs.index_ctx.operate(bs.bucket_obj, &o); } -int RGWRados::cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag, +static void obj_complete_cb(completion_t cb, void *arg) +{ + ((complete_op_data *)arg)->manager->handle_completion(cb, (complete_op_data *)arg); +} + +int RGWRados::cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch, rgw_bucket_dir_entry& ent, RGWObjCategory category, list *remove_objs, uint16_t bilog_flags, rgw_zone_set *_zones_trace) @@ -12249,18 +12442,21 @@ int RGWRados::cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag, cls_rgw_bucket_complete_op(o, op, tag, ver, key, dir_meta, pro, get_zone().log_data, bilog_flags, zones_trace); - AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL); + complete_op_data *arg; + index_completion_manager->create_completion(obj, op, tag, ver, key, dir_meta, ro, + get_zone().log_data, bilog_flags, &arg); + AioCompletion *c = librados::Rados::aio_create_completion(arg, NULL, obj_complete_cb); int ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &o); c->release(); return ret; } -int RGWRados::cls_obj_complete_add(BucketShard& bs, string& tag, +int RGWRados::cls_obj_complete_add(BucketShard& bs, const rgw_obj& obj, string& tag, int64_t pool, uint64_t epoch, rgw_bucket_dir_entry& ent, RGWObjCategory category, list *remove_objs, uint16_t bilog_flags, rgw_zone_set *zones_trace) { - return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_objs, bilog_flags, zones_trace); + return cls_obj_complete_op(bs, obj, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_objs, bilog_flags, zones_trace); } int RGWRados::cls_obj_complete_del(BucketShard& bs, string& tag, @@ -12274,14 +12470,14 @@ int RGWRados::cls_obj_complete_del(BucketShard& bs, string& tag, rgw_bucket_dir_entry ent; ent.meta.mtime = removed_mtime; obj.key.get_index_key(&ent.key); - return cls_obj_complete_op(bs, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, remove_objs, bilog_flags, zones_trace); + return cls_obj_complete_op(bs, obj, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, remove_objs, bilog_flags, zones_trace); } int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, rgw_obj& obj, uint16_t bilog_flags, rgw_zone_set *zones_trace) { rgw_bucket_dir_entry ent; obj.key.get_index_key(&ent.key); - return cls_obj_complete_op(bs, CLS_RGW_OP_CANCEL, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL, bilog_flags, zones_trace); + return cls_obj_complete_op(bs, obj, CLS_RGW_OP_CANCEL, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL, bilog_flags, zones_trace); } int RGWRados::cls_obj_set_bucket_tag_timeout(RGWBucketInfo& bucket_info, uint64_t timeout) diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 28fd2dd97937..3b7f5a362e84 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -2169,6 +2169,8 @@ struct tombstone_entry { pg_ver(state.pg_ver) {} }; +class RGWIndexCompletionManager; + class RGWRados { friend class RGWGC; @@ -2310,6 +2312,8 @@ protected: uint32_t zone_short_id; RGWPeriod current_period; + + RGWIndexCompletionManager *index_completion_manager{nullptr}; public: RGWRados() : lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL), gc(NULL), lc(NULL), obj_expirer(NULL), use_gc_thread(false), use_lc_thread(false), quota_threads(false), @@ -3347,9 +3351,9 @@ public: int cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid); int cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, rgw_obj& obj, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr); - int cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch, + int cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch, rgw_bucket_dir_entry& ent, RGWObjCategory category, list *remove_objs, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr); - int cls_obj_complete_add(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, rgw_bucket_dir_entry& ent, + int cls_obj_complete_add(BucketShard& bs, const rgw_obj& obj, string& tag, int64_t pool, uint64_t epoch, rgw_bucket_dir_entry& ent, RGWObjCategory category, list *remove_objs, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr); int cls_obj_complete_del(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, rgw_obj& obj, ceph::real_time& removed_mtime, list *remove_objs, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);