From: Yehuda Sadeh Date: Fri, 23 Oct 2015 00:00:08 +0000 (-0700) Subject: rgw: guard metadata sync initialization step X-Git-Tag: v10.1.0~354^2~310 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f055fae8bc55190c18aa2b7de00b79e450883cd4;p=ceph.git rgw: guard metadata sync initialization step Guard the fetch-meta stage Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index 5194fb15c90a..b2146f46c942 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -119,8 +119,8 @@ int RGWCoroutinesStack::operate(RGWCoroutinesEnv *_env) { env = _env; RGWCoroutine *op = *pos; - ldout(cct, 20) << *op << ": operate()" << dendl; op->stack = this; + ldout(cct, 20) << *op << ": operate()" << dendl; int r = op->operate(); if (r < 0) { ldout(cct, 0) << "ERROR: op->operate() returned r=" << r << dendl; @@ -492,11 +492,11 @@ ostream& operator<<(ostream& out, const RGWCoroutine& cr) return out; } -bool RGWCoroutine::drain_children() +bool RGWCoroutine::drain_children(int num_cr_left) { bool done = false; reenter(&drain_cr) { - while (num_spawned() > 0) { + while (num_spawned() > num_cr_left) { yield wait_for_child(); int ret; while (collect(&ret)) { diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h index a0aae9b09f02..9237b1c80a2a 100644 --- a/src/rgw/rgw_coroutine.h +++ b/src/rgw/rgw_coroutine.h @@ -178,7 +178,7 @@ public: bool collect(int *ret); /* returns true if needs to be called again */ int wait(const utime_t& interval); - bool drain_children(); /* returns true if needed to be called again */ + bool drain_children(int num_cr_left); /* returns true if needed to be called again */ void wakeup(); size_t num_spawned() { @@ -205,7 +205,10 @@ do { \ } while (0) #define drain_all() \ - yield_until_true(drain_children()) + yield_until_true(drain_children(0)) + +#define drain_all_but(n) \ + yield_until_true(drain_children(n)) template class RGWConsumerCR : public RGWCoroutine { diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc index 1c37dd92ba53..c6200e33af19 100644 --- a/src/rgw/rgw_cr_rados.cc +++ b/src/rgw/rgw_cr_rados.cc @@ -497,3 +497,35 @@ int RGWAsyncRemoveObj::_send_request() } return ret; } + +int RGWContinuousLeaseCR::operate() +{ + reenter(this) { + while (!going_down.read()) { + yield { + int r = call(new RGWSimpleRadosLockCR(async_rados, store, pool, oid, lock_name, cookie, interval)); + if (r < 0) { + ldout(store->ctx(), 0) << *this << ": ERROR: failed to call RGWSimpleRadosLockCR()" << dendl; + return set_state(RGWCoroutine_Error, r); + } + } + if (retcode < 0) { + set_locked(false); + ldout(store->ctx(), 20) << *this << ": couldn't lock " << pool.name << ":" << oid << ":" << lock_name << ": retcode=" << retcode << dendl; + return set_state(RGWCoroutine_Error, retcode); + } + set_locked(true); + yield wait(utime_t(interval / 2, 0)); + } + set_locked(false); /* moot at this point anyway */ + yield { + int r = call(new RGWSimpleRadosUnlockCR(async_rados, store, pool, oid, lock_name, cookie)); + if (r < 0) { + ldout(store->ctx(), 0) << *this << ": ERROR: failed to call RGWSimpleRadosUnlockCR()" << dendl; + return set_state(RGWCoroutine_Error, r); + } + } + return set_state(RGWCoroutine_Done); + } + return 0; +} diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index 327b14f5fe2e..e21727a36c57 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -729,4 +729,51 @@ public: } }; +class RGWContinuousLeaseCR : public RGWCoroutine { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + + rgw_bucket pool; + string oid; + + string lock_name; + string cookie; + + int interval; + + Mutex lock; + atomic_t going_down; + bool locked; + +public: + RGWContinuousLeaseCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, rgw_bucket& _pool, const string& _oid, + const string& _lock_name, int _interval) : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), + pool(_pool), oid(_oid), lock_name(_lock_name), interval(_interval), + lock("RGWContimuousLeaseCR"), locked(false) { +#define COOKIE_LEN 16 + char buf[COOKIE_LEN + 1]; + + gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1); + cookie = buf; + } + + int operate(); + + bool is_locked() { + Mutex::Locker l(lock); + return locked; + } + + void set_locked(bool status) { + Mutex::Locker l(lock); + locked = status; + } + + void go_down() { + going_down.set(1); + wakeup(); + } +}; + + #endif diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 1ad451b4b596..e3f546657060 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -395,23 +395,21 @@ class RGWInitSyncStatusCoroutine : public RGWCoroutine { RGWHTTPManager *http_manager; RGWObjectCtx& obj_ctx; - string lock_name; - string cookie; rgw_meta_sync_info status; map shards_info; + RGWContinuousLeaseCR *lease_cr; public: RGWInitSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWHTTPManager *_http_mgr, RGWObjectCtx& _obj_ctx, uint32_t _num_shards) : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), http_manager(_http_mgr), - obj_ctx(_obj_ctx) { - lock_name = "sync_lock"; + obj_ctx(_obj_ctx), lease_cr(NULL) { status.num_shards = _num_shards; + } -#define COOKIE_LEN 16 - char buf[COOKIE_LEN + 1]; - - gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1); - string cookie = buf; + ~RGWInitSyncStatusCoroutine() { + if (lease_cr) { + lease_cr->put(); + } } int operate() { @@ -419,12 +417,18 @@ public: reenter(this) { yield { uint32_t lock_duration = 30; - call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, mdlog_sync_status_oid, - lock_name, cookie, lock_duration)); + string lock_name = "sync_lock"; + lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool, mdlog_sync_status_oid, + lock_name, lock_duration); + lease_cr->get(); + spawn(lease_cr, false); } - if (retcode < 0) { - ldout(cct, 0) << "ERROR: failed to take a lock on " << mdlog_sync_status_oid << dendl; - return set_state(RGWCoroutine_Error, retcode); + while (!lease_cr->is_locked()) { + if (lease_cr->is_done()) { + ldout(cct, 0) << "ERROR: lease cr failed, done early " << dendl; + return set_state(RGWCoroutine_Error, lease_cr->get_ret_status()); + } + yield; } yield { call(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, @@ -437,7 +441,7 @@ public: } } - drain_all(); + drain_all_but(1); /* the lease cr still needs to run */ yield { for (int i = 0; i < (int)status.num_shards; i++) { @@ -452,10 +456,7 @@ public: call(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, mdlog_sync_status_oid, status)); } - yield { /* unlock */ - call(new RGWSimpleRadosUnlockCR(async_rados, store, store->get_zone_params().log_pool, mdlog_sync_status_oid, - lock_name, cookie)); - } + yield lease_cr->go_down(); while (collect(&ret)) { if (ret < 0) { return set_state(RGWCoroutine_Error); @@ -519,12 +520,21 @@ class RGWFetchAllMetaCR : public RGWCoroutine { RGWShardedOmapCRManager *entries_index; + RGWContinuousLeaseCR *lease_cr; + bool lost_lock; + public: RGWFetchAllMetaCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, int _num_shards) : RGWCoroutine(_store->ctx()), store(_store), http_manager(_mgr), async_rados(_async_rados), num_shards(_num_shards), - req_ret(0), entries_index(NULL) { + req_ret(0), entries_index(NULL), lease_cr(NULL), lost_lock(false) { + } + + ~RGWFetchAllMetaCR() { + if (lease_cr) { + lease_cr->put(); + } } void append_section_from_set(set& all_sections, const string& name) { @@ -557,6 +567,21 @@ public: RGWRESTConn *conn = store->rest_master_conn; reenter(this) { + yield { + uint32_t lock_duration = 30; + string lock_name = "sync_lock"; + lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool, mdlog_sync_status_oid, + lock_name, lock_duration); + lease_cr->get(); + spawn(lease_cr, false); + } + while (!lease_cr->is_locked()) { + if (lease_cr->is_done()) { + ldout(cct, 0) << "ERROR: lease cr failed, done early " << dendl; + return set_state(RGWCoroutine_Error, lease_cr->get_ret_status()); + } + yield; + } entries_index = new RGWShardedOmapCRManager(async_rados, store, this, num_shards, store->get_zone_params().log_pool, mdlog_sync_full_sync_index_prefix); yield { @@ -582,6 +607,10 @@ public: return set_state(RGWCoroutine_Error); } for (list::iterator iter = result.begin(); iter != result.end(); ++iter) { + if (!lease_cr->is_locked()) { + lost_lock = true; + break; + } ldout(store->ctx(), 20) << "list metadata: section=" << *sections_iter << " key=" << *iter << dendl; string s = *sections_iter + ":" + *iter; entries_index->append(s); @@ -590,6 +619,11 @@ public: } } yield entries_index->finish(); + + drain_all_but(1); /* the lease cr still needs to run */ + + yield lease_cr->go_down(); + int ret; while (collect(&ret)) { if (ret < 0) { @@ -597,6 +631,9 @@ public: } yield; } + if (lost_lock) { + yield return set_state(RGWCoroutine_Error, -EBUSY); + } yield return set_state(RGWCoroutine_Done); } return 0; @@ -1144,41 +1181,52 @@ int RGWRemoteMetaLog::run_sync(int num_shards, rgw_meta_sync_status& sync_status continue; } if (r < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to init sync status" << dendl; + ldout(store->ctx(), 0) << "ERROR: failed to init sync status r=" << r << dendl; return r; } } } while (sync_status.sync_info.state == rgw_meta_sync_info::StateInit); - switch ((rgw_meta_sync_info::SyncState)sync_status.sync_info.state) { - case rgw_meta_sync_info::StateBuildingFullSyncMaps: - ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl; - r = run(new RGWFetchAllMetaCR(store, &http_manager, async_rados, num_shards)); - if (r < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl; - return r; - } + do { + r = run(new RGWReadSyncStatusCoroutine(async_rados, store, obj_ctx, &sync_status)); + if (r < 0 && r != -ENOENT) { + ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status r=" << r << dendl; + return r; + } - sync_status.sync_info.state = rgw_meta_sync_info::StateSync; - r = set_sync_info(sync_status.sync_info); - if (r < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to update sync status" << dendl; - return r; - } - /* fall through */ - case rgw_meta_sync_info::StateSync: - ldout(store->ctx(), 20) << __func__ << "(): sync" << dendl; - meta_sync_cr = new RGWMetaSyncCR(store, &http_manager, async_rados, sync_status); - r = run(meta_sync_cr); - if (r < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl; - return r; - } - break; - default: - ldout(store->ctx(), 0) << "ERROR: bad sync state!" << dendl; - return -EIO; - } + switch ((rgw_meta_sync_info::SyncState)sync_status.sync_info.state) { + case rgw_meta_sync_info::StateBuildingFullSyncMaps: + ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl; + r = run(new RGWFetchAllMetaCR(store, &http_manager, async_rados, num_shards)); + if (r == -EBUSY) { + continue; + } + if (r < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl; + return r; + } + + sync_status.sync_info.state = rgw_meta_sync_info::StateSync; + r = set_sync_info(sync_status.sync_info); + if (r < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to update sync status" << dendl; + return r; + } + /* fall through */ + case rgw_meta_sync_info::StateSync: + ldout(store->ctx(), 20) << __func__ << "(): sync" << dendl; + meta_sync_cr = new RGWMetaSyncCR(store, &http_manager, async_rados, sync_status); + r = run(meta_sync_cr); + if (r < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl; + return r; + } + break; + default: + ldout(store->ctx(), 0) << "ERROR: bad sync state!" << dendl; + return -EIO; + } + } while (true); return 0; }