From 728c132aa3d95d5b1e3cf1c0d358dae2b7709e04 Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Tue, 28 Jun 2011 11:40:02 -0700 Subject: [PATCH] C_Gather: hide constructor, convert uses Note: this fixes a small memory leak in MDCache::open_snap_parents. Signed-off-by: Colin McCabe --- src/client/SyntheticClient.cc | 8 +++--- src/include/Context.h | 3 +- src/mds/CDir.cc | 9 +++--- src/mds/CInode.cc | 6 ++-- src/mds/MDCache.cc | 54 ++++++++++++++++++----------------- src/mds/MDCache.h | 2 +- src/mds/Server.cc | 19 ++++++------ src/osdc/ObjectCacher.cc | 17 ++++++----- 8 files changed, 60 insertions(+), 58 deletions(-) diff --git a/src/client/SyntheticClient.cc b/src/client/SyntheticClient.cc index 6c86728b16592..c228348ad2b58 100644 --- a/src/client/SyntheticClient.cc +++ b/src/client/SyntheticClient.cc @@ -1040,8 +1040,8 @@ int SyntheticClient::play_trace(Trace& t, string& prefix, bool metadata_only) Cond cond; bool ack; bool safe; - C_Gather *safeg = new C_Gather(g_ceph_context, new C_SafeCond(&lock, &cond, &safe)); - Context *safegref = safeg->new_sub(); // take a ref + C_GatherBuilder safeg(g_ceph_context, new C_SafeCond(&lock, &cond, &safe)); + Context *safegref = safeg.new_sub(); // take a ref while (!t.end()) { @@ -1432,7 +1432,7 @@ int SyntheticClient::play_trace(Trace& t, string& prefix, bool metadata_only) SnapContext snapc; client->objecter->write(oid, oloc, off, len, snapc, bl, ceph_clock_now(g_ceph_context), 0, new C_SafeCond(&lock, &cond, &ack), - safeg->new_sub()); + safeg.new_sub()); while (!ack) cond.Wait(lock); lock.Unlock(); } @@ -1447,7 +1447,7 @@ int SyntheticClient::play_trace(Trace& t, string& prefix, bool metadata_only) SnapContext snapc; client->objecter->zero(oid, oloc, off, len, snapc, ceph_clock_now(g_ceph_context), 0, new C_SafeCond(&lock, &cond, &ack), - safeg->new_sub()); + safeg.new_sub()); while (!ack) cond.Wait(lock); lock.Unlock(); } diff --git a/src/include/Context.h b/src/include/Context.h index 058ff7eaff98b..86192ec91bdf0 100644 --- a/src/include/Context.h +++ b/src/include/Context.h @@ -192,7 +192,6 @@ private: } }; -public: C_Gather(CephContext *cct_, Context *f=0, bool an=false) : cct(cct_), result(0), onfinish(f), sub_created_count(0), sub_existing_count(0), @@ -201,6 +200,7 @@ public: { ldout(cct,10) << "C_Gather " << this << ".new" << dendl; } +public: ~C_Gather() { ldout(cct,10) << "C_Gather " << this << ".delete" << dendl; assert(sub_existing_count == 0); @@ -244,6 +244,7 @@ public: assert(0); // nobody should ever call me. } + friend class C_GatherBuilder; }; /* diff --git a/src/mds/CDir.cc b/src/mds/CDir.cc index 5b084fc459fb5..7a866a096abef 100644 --- a/src/mds/CDir.cc +++ b/src/mds/CDir.cc @@ -1961,13 +1961,14 @@ void CDir::_commit(version_t want) new C_Dir_Committed(this, get_version(), inode->inode.last_renamed_version)); else { // send in a different Context - C_Gather *gather = new C_Gather(g_ceph_context, new C_Dir_Committed(this, get_version(), - inode->inode.last_renamed_version)); + C_GatherBuilder gather(g_ceph_context, + new C_Dir_Committed(this, get_version(), + inode->inode.last_renamed_version)); while (committed_dn != items.end()) { ObjectOperation n = ObjectOperation(); committed_dn = _commit_partial(n, snaps, max_write_size, committed_dn); cache->mds->objecter->mutate(oid, oloc, n, snapc, ceph_clock_now(g_ceph_context), 0, NULL, - gather->new_sub()); + gather.new_sub()); } /* * save the original object for last -- it contains the new header, @@ -1980,7 +1981,7 @@ void CDir::_commit(version_t want) * get our header into an incorrect state. */ cache->mds->objecter->mutate(oid, oloc, m, snapc, ceph_clock_now(g_ceph_context), 0, NULL, - gather->new_sub()); + gather.new_sub()); } } diff --git a/src/mds/CInode.cc b/src/mds/CInode.cc index 4ec6559547ff6..9ed7b5a9e4f56 100644 --- a/src/mds/CInode.cc +++ b/src/mds/CInode.cc @@ -996,7 +996,7 @@ void CInode::fetch(Context *fin) dout(10) << "fetch" << dendl; C_Inode_Fetched *c = new C_Inode_Fetched(this, fin); - C_Gather *gather = new C_Gather(g_ceph_context, c); + C_GatherBuilder gather(g_ceph_context, c); object_t oid = CInode::get_object_name(ino(), frag_t(), ""); object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pg_pool()); @@ -1004,11 +1004,11 @@ void CInode::fetch(Context *fin) ObjectOperation rd; rd.getxattr("inode"); - mdcache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, &c->bl, 0, gather->new_sub()); + mdcache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, &c->bl, 0, gather.new_sub()); // read from separate object too object_t oid2 = CInode::get_object_name(ino(), frag_t(), ".inode"); - mdcache->mds->objecter->read(oid2, oloc, 0, 0, CEPH_NOSNAP, &c->bl2, 0, gather->new_sub()); + mdcache->mds->objecter->read(oid2, oloc, 0, 0, CEPH_NOSNAP, &c->bl2, 0, gather.new_sub()); } void CInode::_fetched(bufferlist& bl, bufferlist& bl2, Context *fin) diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index a3f32f867fa44..76b78a0160e5e 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -4600,13 +4600,13 @@ void MDCache::open_snap_parents() dout(10) << "open_snap_parents" << dendl; map splits; - C_Gather *gather = new C_Gather(g_ceph_context); + C_GatherBuilder gather(g_ceph_context); map > >::iterator p = missing_snap_parents.begin(); while (p != missing_snap_parents.end()) { CInode *in = p->first; assert(in->snaprealm); - if (in->snaprealm->open_parents(gather->new_sub())) { + if (in->snaprealm->open_parents(gather.new_sub())) { dout(10) << " past parents now open on " << *in << dendl; // include in a (now safe) snap split? @@ -4639,9 +4639,10 @@ void MDCache::open_snap_parents() send_snaps(splits); - if (gather->get_num_remaining()) { - dout(10) << "open_snap_parents - waiting for " << gather->get_num_remaining() << dendl; - gather->set_finisher(new C_MDC_OpenSnapParents(this)); + if (gather.has_subs()) { + dout(10) << "open_snap_parents - waiting for " + << gather.get()->get_num_remaining() << dendl; + gather.set_finisher(new C_MDC_OpenSnapParents(this)); } else { assert(missing_snap_parents.empty()); assert(reconnected_snaprealms.empty()); @@ -4664,19 +4665,18 @@ void MDCache::open_undef_dirfrags() { dout(10) << "open_undef_dirfrags " << rejoin_undef_dirfrags.size() << " dirfrags" << dendl; - C_Gather *gather = 0; + C_GatherBuilder gather(g_ceph_context); for (set::iterator p = rejoin_undef_dirfrags.begin(); p != rejoin_undef_dirfrags.end(); p++) { CDir *dir = *p; - if (!gather) - gather = new C_Gather(g_ceph_context, new C_MDC_OpenUndefDirfragsFinish(this)); - dir->fetch(gather->new_sub()); + dir->fetch(gather.new_sub()); } - if (rejoin_undef_dirfrags.empty()) { - assert(!gather); - + if (gather.has_subs()) { + gather.set_finisher(new C_MDC_OpenUndefDirfragsFinish(this)); + } + else { start_files_to_recover(rejoin_recover_q, rejoin_check_q); mds->queue_waiters(rejoin_waiters); @@ -9433,7 +9433,8 @@ void MDCache::split_dir(CDir *dir, int bits) return; } - C_Gather *gather = new C_Gather(g_ceph_context, new C_MDC_FragmentFrozen(this, dirs, dir->get_frag(), bits)); + C_GatherBuilder gather(g_ceph_context, + new C_MDC_FragmentFrozen(this, dirs, dir->get_frag(), bits)); fragment_freeze_dirs(dirs, gather); // initial mark+complete pass @@ -9467,14 +9468,15 @@ void MDCache::merge_dir(CInode *diri, frag_t frag) int bits = first->get_frag().bits() - frag.bits(); dout(10) << " we are merginb by " << bits << " bits" << dendl; - C_Gather *gather = new C_Gather(g_ceph_context, new C_MDC_FragmentFrozen(this, dirs, frag, bits)); + C_GatherBuilder gather(g_ceph_context, + new C_MDC_FragmentFrozen(this, dirs, frag, bits)); fragment_freeze_dirs(dirs, gather); // initial mark+complete pass fragment_mark_and_complete(dirs); } -void MDCache::fragment_freeze_dirs(list& dirs, C_Gather *gather) +void MDCache::fragment_freeze_dirs(list& dirs, C_GatherBuilder &gather) { for (list::iterator p = dirs.begin(); p != dirs.end(); p++) { CDir *dir = *p; @@ -9482,7 +9484,7 @@ void MDCache::fragment_freeze_dirs(list& dirs, C_Gather *gather) dir->state_set(CDir::STATE_FRAGMENTING); dir->freeze_dir(); assert(dir->is_freezing_dir()); - dir->add_waiter(CDir::WAIT_FROZEN, gather->new_sub()); + dir->add_waiter(CDir::WAIT_FROZEN, gather.new_sub()); } } @@ -9501,19 +9503,16 @@ void MDCache::fragment_mark_and_complete(list& dirs) CInode *diri = dirs.front()->get_inode(); dout(10) << "fragment_mark_and_complete " << dirs << " on " << *diri << dendl; - C_Gather *gather = 0; + C_GatherBuilder gather(g_ceph_context); for (list::iterator p = dirs.begin(); p != dirs.end(); ++p) { CDir *dir = *p; - + if (!dir->is_complete()) { dout(15) << " fetching incomplete " << *dir << dendl; - if (!gather) - gather = new C_Gather(g_ceph_context, - new C_MDC_FragmentMarking(this, dirs)); - dir->fetch(gather->new_sub(), + dir->fetch(gather.new_sub(), true); // ignore authpinnability } else if (!dir->state_test(CDir::STATE_DNPINNEDFRAG)) { @@ -9533,6 +9532,9 @@ void MDCache::fragment_mark_and_complete(list& dirs) dout(15) << " already marked " << *dir << dendl; } } + if (gather.has_subs()) { + gather.set_finisher(new C_MDC_FragmentMarking(this, dirs)); + } // flush log so that request auth_pins are retired mds->mdlog->flush(); @@ -9638,8 +9640,8 @@ void MDCache::fragment_frozen(list& dirs, frag_t basefrag, int bits) */ // freeze, journal, and store resulting frags - C_Gather *gather = new C_Gather(g_ceph_context, - new C_MDC_FragmentLoggedAndStored(this, mut, + C_GatherBuilder gather(g_ceph_context, + new C_MDC_FragmentLoggedAndStored(this, mut, resultfrags, basefrag, bits)); for (list::iterator p = resultfrags.begin(); @@ -9651,10 +9653,10 @@ void MDCache::fragment_frozen(list& dirs, frag_t basefrag, int bits) // freeze and store them too dir->state_set(CDir::STATE_FRAGMENTING); - dir->commit(0, gather->new_sub(), true); // ignore authpinnability + dir->commit(0, gather.new_sub(), true); // ignore authpinnability } - mds->mdlog->submit_entry(le, gather->new_sub()); + mds->mdlog->submit_entry(le, gather.new_sub()); mds->mdlog->flush(); } diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h index 89594d5735db9..6f76de27ffcb8 100644 --- a/src/mds/MDCache.h +++ b/src/mds/MDCache.h @@ -1173,7 +1173,7 @@ public: void merge_dir(CInode *diri, frag_t fg); private: - void fragment_freeze_dirs(list& dirs, C_Gather *gather); + void fragment_freeze_dirs(list& dirs, C_GatherBuilder &gather); void fragment_mark_and_complete(list& dirs); void fragment_frozen(list& dirs, frag_t basefrag, int bits); void fragment_unmark_unfreeze_dirs(list& dirs); diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 76b57964fd163..d67e124855bb0 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -4924,7 +4924,7 @@ void Server::handle_client_rename(MDRequest *mdr) // -- prepare anchor updates -- if (!linkmerge || srcdnl->is_primary()) { - C_Gather *anchorgather = 0; + C_GatherBuilder anchorgather(g_ceph_context); if (srcdnl->is_primary() && (srcdnl->get_inode()->is_anchored() || @@ -4935,10 +4935,9 @@ void Server::handle_client_rename(MDRequest *mdr) dout(10) << "reanchoring src->dst " << *srcdnl->get_inode() << dendl; vector trace; destdn->make_anchor_trace(trace, srcdnl->get_inode()); - - anchorgather = new C_Gather(g_ceph_context, new C_MDS_RetryRequest(mdcache, mdr)); - mds->anchorclient->prepare_update(srcdnl->get_inode()->ino(), trace, &mdr->more()->src_reanchor_atid, - anchorgather->new_sub()); + mds->anchorclient->prepare_update(srcdnl->get_inode()->ino(), + trace, &mdr->more()->src_reanchor_atid, + anchorgather.new_sub()); } if (destdnl->is_primary() && destdnl->get_inode()->is_anchored() && @@ -4949,14 +4948,14 @@ void Server::handle_client_rename(MDRequest *mdr) vector trace; straydn->make_anchor_trace(trace, destdnl->get_inode()); - if (!anchorgather) - anchorgather = new C_Gather(g_ceph_context, new C_MDS_RetryRequest(mdcache, mdr)); - mds->anchorclient->prepare_update(destdnl->get_inode()->ino(), trace, &mdr->more()->dst_reanchor_atid, - anchorgather->new_sub()); + mds->anchorclient->prepare_update(destdnl->get_inode()->ino(), trace, + &mdr->more()->dst_reanchor_atid, anchorgather.new_sub()); } - if (anchorgather) + if (anchorgather.has_subs()) { + anchorgather.set_finisher(new C_MDS_RetryRequest(mdcache, mdr)); return; // waiting for anchor prepares + } assert(g_conf->mds_kill_rename_at != 4); } diff --git a/src/osdc/ObjectCacher.cc b/src/osdc/ObjectCacher.cc index 0e5fb8ae841bf..dd66aab82937b 100644 --- a/src/osdc/ObjectCacher.cc +++ b/src/osdc/ObjectCacher.cc @@ -1488,7 +1488,8 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish) ldout(cct, 10) << "flush_set " << oset << dendl; - C_Gather *gather = 0; // we'll need to wait for all objects to flush! + // we'll need to wait for all objects to flush! + C_GatherBuilder gather(cct, onfinish); bool safe = true; for (xlist::iterator i = oset->objects.begin(); @@ -1497,16 +1498,14 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish) if (!flush(ob)) { // we'll need to gather... - if (!gather && onfinish) - gather = new C_Gather(cct, onfinish); safe = false; ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid " << ob->last_write_tid << " on " << *ob << dendl; - if (gather) - ob->waitfor_ack[ob->last_write_tid].push_back(gather->new_sub()); + if (onfinish != NULL) + ob->waitfor_ack[ob->last_write_tid].push_back(gather.new_sub()); } } @@ -1534,7 +1533,8 @@ bool ObjectCacher::commit_set(ObjectSet *oset, Context *onfinish) // make sure it's flushing. flush_set(oset); - C_Gather *gather = 0; // we'll need to wait for all objects to commit + // we'll need to wait for all objects to commit + C_GatherBuilder gather(cct, onfinish); bool safe = true; for (xlist::iterator i = oset->objects.begin(); @@ -1545,10 +1545,9 @@ bool ObjectCacher::commit_set(ObjectSet *oset, Context *onfinish) ldout(cct, 10) << "commit_set " << oset << " " << *ob << " will finish on commit tid " << ob->last_write_tid << dendl; - if (!gather && onfinish) gather = new C_Gather(cct, onfinish); safe = false; - if (gather) - ob->waitfor_commit[ob->last_write_tid].push_back( gather->new_sub() ); + if (onfinish != NULL) + ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub()); } } -- 2.39.5