From: Samuel Just Date: Fri, 29 Jun 2012 21:11:07 +0000 (-0700) Subject: OSD,PG: clean up pg removal X-Git-Tag: v0.50~109^2~2^2~45 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b200710b9606fea2c7aca8465300d5e4e4107a62;p=ceph.git OSD,PG: clean up pg removal PG opsequencers will be used for removing a pg. If the pg is recreated before the removal is complete, we need the new pg incarnation to be able to inherit the osr of its predecessor. Previously, we queued the pg for removal and only rendered it unusable after the contents were fully removed. Now, we syncronously remove it from the map and queue a transaction renaming the collections. We then asyncronously clean up those collections. If the pg is recreated, it will inherit the same osr until the cleanup is complete ensuring correct op ordering with respect to the collection rename. Signed-off-by: Samuel Just --- diff --git a/src/Makefile.am b/src/Makefile.am index e3e9cf9ad165..8702f3cc6d42 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1265,6 +1265,7 @@ noinst_HEADERS = \ common/admin_socket_client.h \ common/shared_cache.hpp \ common/simple_cache.hpp \ + common/sharedptr_registry.hpp \ common/MemoryModel.h\ common/Mutex.h\ common/PrebufferedStreambuf.h\ diff --git a/src/common/sharedptr_registry.hpp b/src/common/sharedptr_registry.hpp new file mode 100644 index 000000000000..729b74dd9f45 --- /dev/null +++ b/src/common/sharedptr_registry.hpp @@ -0,0 +1,74 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_SHAREDPTR_REGISTRY_H +#define CEPH_SHAREDPTR_REGISTRY_H + +#include +#include +#include "common/Mutex.h" +#include "common/Cond.h" + +/** + * Provides a registry of shared_ptr indexed by K while + * the references are alive. + */ +template +class SharedPtrRegistry { + Mutex lock; + Cond cond; + typedef std::tr1::shared_ptr VPtr; + typedef std::tr1::weak_ptr WeakVPtr; + map contents; + + class OnRemoval { + SharedPtrRegistry *parent; + K key; + public: + OnRemoval(SharedPtrRegistry *parent, K key) : + parent(parent), key(key) {} + void operator()(V *to_remove) { + { + Mutex::Locker l(parent->lock); + parent->contents.erase(key); + parent->cond.Signal(); + } + delete to_remove; + } + }; + friend class OnRemoval; + +public: + SharedPtrRegistry() : lock("SharedPtrRegistry::lock") {} + + template + VPtr lookup(const K &key, const A &arg) { + Mutex::Locker l(lock); + while (1) { + if (contents.count(key)) { + VPtr retval = contents[key].lock(); + if (retval) + return retval; + } else { + break; + } + cond.Wait(lock); + } + VPtr retval(new V(arg), OnRemoval(this, key)); + contents[key] = retval; + return retval; + } +}; + +#endif diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h index bcbffe4883d1..ecc24a5a45bf 100644 --- a/src/os/ObjectStore.h +++ b/src/os/ObjectStore.h @@ -599,7 +599,16 @@ public: delete t; } }; - + template + struct C_DeleteTransactionHolder : public Context { + ObjectStore::Transaction *t; + T obj; + C_DeleteTransactionHolder(ObjectStore::Transaction *tt, T &obj) : + t(tt), obj(obj) {} + void finish(int r) { + delete t; + } + }; virtual unsigned apply_transaction(Transaction& t, Context *ondisk=0) = 0; virtual unsigned apply_transactions(list& tls, Context *ondisk=0) = 0; diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index a65b87061ed5..fccc03ed8e6e 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -152,7 +152,6 @@ OSDService::OSDService(OSD *osd) : snap_trim_wq(osd->snap_trim_wq), scrub_wq(osd->scrub_wq), scrub_finalize_wq(osd->scrub_finalize_wq), - remove_wq(osd->remove_wq), rep_scrub_wq(osd->rep_scrub_wq), class_handler(osd->class_handler), publish_lock("OSDService::publish_lock"), @@ -710,7 +709,8 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, scrub_wq(this, g_conf->osd_scrub_thread_timeout, &disk_tp), scrub_finalize_wq(this, g_conf->osd_scrub_finalize_thread_timeout, &op_tp), rep_scrub_wq(this, g_conf->osd_scrub_thread_timeout, &disk_tp), - remove_wq(this, g_conf->osd_remove_thread_timeout, &disk_tp), + remove_wq(store, g_conf->osd_remove_thread_timeout, &disk_tp), + next_removal_seq(0), watch_lock("OSD::watch_lock"), watch_timer(external_messenger->cct, watch_lock), service(this) @@ -1149,9 +1149,6 @@ void OSD::clear_temp(ObjectStore *store, coll_t tmp) vector objects; store->collection_list(tmp, objects); - if (objects.empty()) - return; - // delete them. ObjectStore::Transaction t; unsigned removed = 0; @@ -1332,6 +1329,19 @@ void OSD::load_pgs() if (it->is_temp(pgid)) clear_temp(store, *it); dout(10) << "load_pgs skipping non-pg " << *it << dendl; + if (it->is_temp(pgid)) { + clear_temp(store, *it); + continue; + } + uint64_t seq; + if (it->is_removal(&seq)) { + if (seq >= next_removal_seq) + next_removal_seq = seq + 1; + pair *to_queue = new pair; + to_queue->first = *it; + remove_wq.queue(to_queue); + continue; + } continue; } if (snap != CEPH_NOSNAP) { @@ -1941,6 +1951,34 @@ void OSD::dump_ops_in_flight(ostream& ss) op_tracker.dump_ops_in_flight(ss); } +// ========================================= +void OSD::RemoveWQ::_process(pair *item) +{ + coll_t &coll = item->first; + ObjectStore::Sequencer *osr = item->second.get(); + store->flush(); + vector olist; + store->collection_list(coll, olist); + //*_dout << "OSD::RemoveWQ::_process removing coll " << coll << std::endl; + uint64_t num = 1; + ObjectStore::Transaction *t = new ObjectStore::Transaction; + for (vector::iterator i = olist.begin(); + i != olist.end(); + ++i, ++num) { + if (num % 20 == 0) { + store->queue_transaction( + osr, t, + new ObjectStore::C_DeleteTransactionHolder(t, item->second)); + t = new ObjectStore::Transaction; + } + t->remove(coll, *i); + } + t->remove_collection(coll); + store->queue_transaction( + osr, t, + new ObjectStore::C_DeleteTransactionHolder(t, item->second)); + delete item; +} // ========================================= void OSD::do_mon_report() @@ -3623,8 +3661,10 @@ void OSD::activate_map() if (!osdmap->have_pg_pool(pg->info.pgid.pool())) { //pool is deleted! - queue_pg_for_deletion(pg); - //pg->unlock(); + pg->get(); + _remove_pg(pg); + pg->unlock(); + pg->put(); continue; } else { pg->queue_null(osdmap->get_epoch(), osdmap->get_epoch()); @@ -4204,7 +4244,7 @@ void OSD::dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg) if (!ctx.transaction->empty()) { ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction)); int tr = store->queue_transaction( - &pg->osr, + pg->osr.get(), ctx.transaction, ctx.on_applied, ctx.on_safe); assert(tr == 0); ctx.transaction = new ObjectStore::Transaction; @@ -4228,7 +4268,7 @@ void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg) } else { ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction)); int tr = store->queue_transaction( - &pg->osr, + pg->osr.get(), ctx.transaction, ctx.on_applied, ctx.on_safe); assert(tr == 0); } @@ -4440,7 +4480,8 @@ void OSD::handle_pg_trim(OpRequestRef op) ObjectStore::Transaction *t = new ObjectStore::Transaction; pg->trim(*t, m->trim_to); pg->write_info(*t); - int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t)); + int tr = store->queue_transaction(pg->osr.get(), t, + new ObjectStore::C_DeleteTransaction(t)); assert(tr == 0); } pg->unlock(); @@ -4543,14 +4584,10 @@ void OSD::handle_pg_query(OpRequestRef op) if (pg_map.count(pgid)) { pg = _lookup_lock_pg(pgid); - if (!pg->deleting) { - pg->queue_query(it->second.epoch_sent, it->second.epoch_sent, - from, it->second); - pg->unlock(); - continue; - } else { - pg->unlock(); - } + pg->queue_query(it->second.epoch_sent, it->second.epoch_sent, + from, it->second); + pg->unlock(); + continue; } // get active crush mapping @@ -4622,151 +4659,50 @@ void OSD::handle_pg_remove(OpRequestRef op) dout(5) << "queue_pg_for_deletion: " << pgid << dendl; PG *pg = _lookup_lock_pg(pgid); if (pg->info.history.same_interval_since <= m->get_epoch()) { - if (pg->deleting) { - dout(10) << *pg << " already removing." << dendl; - } else { - assert(pg->get_primary() == m->get_source().num()); - queue_pg_for_deletion(pg); - } + assert(pg->get_primary() == m->get_source().num()); + pg->get(); + _remove_pg(pg); + pg->unlock(); + pg->put(); } else { dout(10) << *pg << " ignoring remove request, pg changed in epoch " << pg->info.history.same_interval_since << " > " << m->get_epoch() << dendl; + pg->unlock(); } - pg->unlock(); - } -} - - -void OSD::queue_pg_for_deletion(PG *pg) -{ - dout(10) << *pg << " removing." << dendl; - pg->assert_locked(); - assert(pg->get_role() == -1); - if (!pg->deleting) { - pg->deleting = true; - remove_wq.queue(pg); } } void OSD::_remove_pg(PG *pg) { - pg_t pgid = pg->info.pgid; - dout(10) << "_remove_pg " << pgid << dendl; - - pg->lock(); - if (!pg->deleting) { - pg->unlock(); - return; - } - - // reset log, last_complete, in case deletion gets canceled - pg->info.last_complete = eversion_t(); - pg->info.last_update = eversion_t(); - pg->info.log_tail = eversion_t(); - pg->log.zero(); - pg->ondisklog.zero(); - - { - ObjectStore::Transaction *t = new ObjectStore::Transaction; - pg->write_info(*t); - pg->write_log(*t); - int tr = store->queue_transaction(&pg->osr, t); - assert(tr == 0); - } - - // flush all pg operations to the fs, so we can rely on - // collection_list below. - pg->osr.flush(); - - int n = 0; - + vector removals; ObjectStore::Transaction *rmt = new ObjectStore::Transaction; - - // snap collections for (interval_set::iterator p = pg->snap_collections.begin(); p != pg->snap_collections.end(); - p++) { + ++p) { for (snapid_t cur = p.get_start(); cur < p.get_start() + p.get_len(); ++cur) { - vector olist; - store->collection_list(coll_t(pgid, cur), olist); - dout(10) << "_remove_pg " << pgid << " snap " << cur << " " << olist.size() << " objects" << dendl; - for (vector::iterator q = olist.begin(); - q != olist.end(); - q++) { - ObjectStore::Transaction *t = new ObjectStore::Transaction; - t->remove(coll_t(pgid, cur), *q); - t->remove(coll_t(pgid), *q); // we may hit this twice, but it's harmless - int tr = store->queue_transaction(&pg->osr, t); - assert(tr == 0); - - if ((++n & 0xff) == 0) { - pg->unlock(); - pg->lock(); - if (!pg->deleting) { - dout(10) << "_remove_pg aborted on " << *pg << dendl; - pg->unlock(); - return; - } - } - } - rmt->remove_collection(coll_t(pgid, cur)); + coll_t to_remove = get_next_removal_coll(); + removals.push_back(to_remove); + rmt->collection_rename(coll_t(pg->info.pgid, cur), to_remove); } } - - // (what remains of the) main collection - vector olist; - store->collection_list(coll_t(pgid), olist); - dout(10) << "_remove_pg " << pgid << " " << olist.size() << " objects" << dendl; - for (vector::iterator p = olist.begin(); - p != olist.end(); - p++) { - ObjectStore::Transaction *t = new ObjectStore::Transaction; - t->remove(coll_t(pgid), *p); - int tr = store->queue_transaction(&pg->osr, t); - assert(tr == 0); - - if ((++n & 0xff) == 0) { - pg->unlock(); - pg->lock(); - if (!pg->deleting) { - dout(10) << "_remove_pg aborted on " << *pg << dendl; - pg->unlock(); - return; - } - } - } - - pg->unlock(); - - dout(10) << "_remove_pg " << pgid << " flushing store" << dendl; - store->flush(); - - dout(10) << "_remove_pg " << pgid << " taking osd_lock" << dendl; - osd_lock.Lock(); - pg->lock(); - - if (!pg->deleting) { - osd_lock.Unlock(); - pg->unlock(); - return; - } - - dout(10) << "_remove_pg " << pgid << " removing final" << dendl; - - { - rmt->remove(coll_t::META_COLL, pg->log_oid); - rmt->remove(coll_t::META_COLL, pg->biginfo_oid); - rmt->remove_collection(coll_t(pgid)); - int tr = store->queue_transaction(NULL, rmt); - assert(tr == 0); + coll_t to_remove = get_next_removal_coll(); + removals.push_back(to_remove); + rmt->collection_rename(coll_t(pg->info.pgid), to_remove); + if (pg->have_temp_coll()) { + to_remove = get_next_removal_coll(); + removals.push_back(to_remove); + rmt->collection_rename(pg->get_temp_coll(), to_remove); } + rmt->remove(coll_t::META_COLL, pg->log_oid); + rmt->remove(coll_t::META_COLL, pg->biginfo_oid); - if (store->collection_exists(coll_t::make_temp_coll(pg->get_pgid()))) { - clear_temp(store, coll_t::make_temp_coll(pg->get_pgid())); - } + store->queue_transaction( + pg->osr.get(), rmt, + new ObjectStore::C_DeleteTransactionHolder< + SequencerRef>(rmt, pg->osr)); // on_removal, which calls remove_watchers_and_notifies, and the erasure from // the pg_map must be done together without unlocking the pg lock, @@ -4774,18 +4710,28 @@ void OSD::_remove_pg(PG *pg) // and handle_notify_timeout pg->on_removal(); + for (vector::iterator i = removals.begin(); + i != removals.end(); + ++i) { + remove_wq.queue(new pair(*i, pg->osr)); + } + + recovery_wq.dequeue(pg); + scrub_wq.dequeue(pg); + scrub_finalize_wq.dequeue(pg); + snap_trim_wq.dequeue(pg); + pg_stat_queue_dequeue(pg); + op_wq.dequeue(pg); + peering_wq.dequeue(pg); + + pg->deleting = true; + // remove from map - pg_map.erase(pgid); + pg_map.erase(pg->info.pgid); pg->put(); // since we've taken it out of map - service.unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp); + service.unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp); _put_pool(pg->pool); - - // unlock, and probably delete - pg->unlock(); - pg->put(); // will delete, if last reference - osd_lock.Unlock(); - dout(10) << "_remove_pg " << pgid << " all done" << dendl; } @@ -4866,8 +4812,12 @@ void OSD::do_recovery(PG *pg) dout(10) << "do_recovery raced and failed to start anything; requeuing " << *pg << dendl; recovery_wq.queue(pg); } else { - pg->lock(); + if (pg->deleting) { + pg->unlock(); + pg->put(); + return; + } dout(10) << "do_recovery starting " << max << " (" << recovery_ops_active << "/" << g_conf->osd_recovery_max_active << " rops) on " @@ -5327,6 +5277,11 @@ void OSD::dequeue_op(PG *pg) OpRequestRef op; pg->lock(); + if (pg->deleting) { + pg->unlock(); + pg->put(); + return; + } assert(!pg->op_queue.empty()); op = pg->op_queue.front(); pg->op_queue.pop_front(); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 03b382253659..11793fa15ba7 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -49,6 +49,7 @@ using namespace __gnu_cxx; #include "OpRequest.h" #include "common/shared_cache.hpp" #include "common/simple_cache.hpp" +#include "common/sharedptr_registry.hpp" #define CEPH_OSD_PROTOCOL 10 /* cluster internal */ @@ -128,10 +129,13 @@ class OpsFlightSocketHook; extern const coll_t meta_coll; +typedef std::tr1::shared_ptr SequencerRef; + class OSD; class OSDService { public: OSD *osd; + SharedPtrRegistry osr_registry; const int whoami; ObjectStore *&store; LogClient &clog; @@ -146,7 +150,6 @@ public: ThreadPool::WorkQueue &snap_trim_wq; ThreadPool::WorkQueue &scrub_wq; ThreadPool::WorkQueue &scrub_finalize_wq; - ThreadPool::WorkQueue &remove_wq; ThreadPool::WorkQueue &rep_scrub_wq; ClassHandler *&class_handler; @@ -499,7 +502,7 @@ private: OpsFlightSocketHook *admin_ops_hook; // -- op queue -- - deque op_queue; + list op_queue; int op_queue_len; struct OpWQ : public ThreadPool::WorkQueue { @@ -509,7 +512,14 @@ private: bool _enqueue(PG *pg); void _dequeue(PG *pg) { - assert(0); + for (list::iterator i = osd->op_queue.begin(); + i != osd->op_queue.end(); + ) { + if (*i == pg) + osd->op_queue.erase(i++); + else + ++i; + } } bool _empty() { return osd->op_queue.empty(); @@ -531,14 +541,21 @@ private: // -- peering queue -- struct PeeringWQ : public ThreadPool::WorkQueue { - deque peering_queue; + list peering_queue; OSD *osd; PeeringWQ(OSD *o, time_t ti, ThreadPool *tp) : ThreadPool::WorkQueue( "OSD::PeeringWQ", ti, ti*10, tp), osd(o) {} void _dequeue(PG *pg) { - assert(0); + for (list::iterator i = peering_queue.begin(); + i != peering_queue.end(); + ) { + if (*i == pg) + peering_queue.erase(i++); + else + ++i; + } } bool _enqueue(PG *pg) { pg->get(); @@ -809,7 +826,6 @@ protected: void handle_pg_backfill(OpRequestRef op); void handle_pg_remove(OpRequestRef op); - void queue_pg_for_deletion(PG *pg); void _remove_pg(PG *pg); // -- commands -- @@ -1121,45 +1137,42 @@ protected: } rep_scrub_wq; // -- removing -- - xlist remove_queue; - - struct RemoveWQ : public ThreadPool::WorkQueue { - OSD *osd; - RemoveWQ(OSD *o, time_t ti, ThreadPool *tp) - : ThreadPool::WorkQueue("OSD::RemoveWQ", ti, 0, tp), osd(o) {} + struct RemoveWQ : public ThreadPool::WorkQueue > { + ObjectStore *&store; + list *> remove_queue; + RemoveWQ(ObjectStore *&o, time_t ti, ThreadPool *tp) + : ThreadPool::WorkQueue >("OSD::RemoveWQ", ti, 0, tp), + store(o) {} bool _empty() { - return osd->remove_queue.empty(); + return remove_queue.empty(); } - bool _enqueue(PG *pg) { - if (pg->remove_item.is_on_list()) - return false; - pg->get(); - osd->remove_queue.push_back(&pg->remove_item); + bool _enqueue(pair *item) { + remove_queue.push_back(item); return true; } - void _dequeue(PG *pg) { - if (pg->remove_item.remove_myself()) - pg->put(); + void _dequeue(pair *item) { + assert(0); } - PG *_dequeue() { - if (osd->remove_queue.empty()) + pair *_dequeue() { + if (remove_queue.empty()) return NULL; - PG *pg = osd->remove_queue.front(); - osd->remove_queue.pop_front(); - return pg; - } - void _process(PG *pg) { - osd->_remove_pg(pg); + pair *item = remove_queue.front(); + remove_queue.pop_front(); + return item; } + void _process(pair *item); void _clear() { - while (!osd->remove_queue.empty()) { - PG *pg = osd->remove_queue.front(); - osd->remove_queue.pop_front(); - pg->put(); + while (!remove_queue.empty()) { + delete remove_queue.front(); + remove_queue.pop_front(); } } } remove_wq; + uint64_t next_removal_seq; + coll_t get_next_removal_coll() { + return coll_t::make_removal_coll(next_removal_seq++); + } private: bool ms_dispatch(Message *m); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index e2c333afb4a3..9f651a256447 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -40,6 +40,37 @@ static ostream& _prefix(std::ostream *_dout, const PG *pg) { return *_dout << pg->gen_prefix(); } +PG::PG(OSDService *o, OSDMapRef curmap, + PGPool *_pool, pg_t p, const hobject_t& loid, const hobject_t& ioid) : + osd(o), osdmap_ref(curmap), pool(_pool), + _lock("PG::_lock"), + ref(0), deleting(false), dirty_info(false), dirty_log(false), + info(p), coll(p), log_oid(loid), biginfo_oid(ioid), + recovery_item(this), scrub_item(this), scrub_finalize_item(this), snap_trim_item(this), stat_queue_item(this), + recovery_ops_active(0), + waiting_on_backfill(0), + role(0), + state(0), + need_up_thru(false), + need_flush(false), + last_peering_reset(0), + heartbeat_peer_lock("PG::heartbeat_peer_lock"), + backfill_target(-1), + pg_stats_lock("PG::pg_stats_lock"), + pg_stats_valid(false), + osr(osd->osr_registry.lookup(p, (stringify(p)))), + finish_sync_event(NULL), + finalizing_scrub(false), + scrub_block_writes(false), + scrub_active(false), + scrub_reserved(false), scrub_reserve_failed(false), + scrub_waiting_on(0), + active_rep_scrub(0), + recovery_state(this) +{ + pool->get(); +} + void PG::lock(bool no_lockdep) { _lock.Lock(no_lockdep); @@ -1436,7 +1467,7 @@ void PG::do_pending_flush() assert(is_locked()); if (need_flush) { dout(10) << "do_pending_flush doing pending flush" << dendl; - osr.flush(); + osr->flush(); need_flush = false; dout(10) << "do_pending_flush done" << dendl; } @@ -1546,7 +1577,7 @@ void PG::_activate_committed(epoch_t e, entity_inst_t& primary) if (dirty_info) { ObjectStore::Transaction *t = new ObjectStore::Transaction; write_info(*t); - int tr = osd->store->queue_transaction(&osr, t); + int tr = osd->store->queue_transaction(osr.get(), t); assert(tr == 0); } @@ -1566,7 +1597,6 @@ void PG::all_activated_and_committed() assert(peer_activated.size() == acting.size()); info.history.last_epoch_started = get_osdmap()->get_epoch(); - dirty_info = true; // make sure CLEAN is marked if we've been clean in this interval if (info.last_complete == info.last_update && @@ -2860,7 +2890,7 @@ void PG::build_scrub_map(ScrubMap &map) // wait for any writes on our pg to flush to disk first. this avoids races // with scrub starting immediately after trim or recovery completion. - osr.flush(); + osr->flush(); // objects vector ls; @@ -3039,6 +3069,11 @@ void PG::scrub() { lock(); + if (deleting) { + unlock(); + put(); + return; + } if (!is_primary() || !is_active() || !is_clean() || !is_scrubbing()) { dout(10) << "scrub -- not primary or active or not clean" << dendl; @@ -3292,6 +3327,11 @@ void PG::_compare_scrubmaps(const map &maps, void PG::scrub_finalize() { lock(); + if (deleting) { + unlock(); + return; + } + assert(last_update_applied == info.last_update); if (scrub_epoch_start != info.history.same_interval_since) { @@ -3406,7 +3446,7 @@ void PG::scrub_finalize() { { ObjectStore::Transaction *t = new ObjectStore::Transaction; write_info(*t); - int tr = osd->store->queue_transaction(&osr, t); + int tr = osd->store->queue_transaction(osr.get(), t); assert(tr == 0); } @@ -3633,7 +3673,6 @@ void PG::on_removal() osd->scrub_wq.dequeue(this); osd->scrub_finalize_wq.dequeue(this); osd->snap_trim_wq.dequeue(this); - osd->remove_wq.dequeue(this); osd->pg_stat_queue_dequeue(this); remove_watchers_and_notifies(); @@ -3758,13 +3797,8 @@ void PG::start_peering_interval(const OSDMapRef lastmap, // pg->on_* on_change(); - if (deleting) { - dout(10) << *this << " canceling deletion!" << dendl; - deleting = false; - osd->remove_wq.dequeue(this); - osd->reg_last_pg_scrub(info.pgid, info.history.last_scrub_stamp); - } - + assert(!deleting); + if (role != oldrole) { // old primary? if (oldrole == 0) { @@ -3923,8 +3957,6 @@ ostream& operator<<(ostream& out, const PG& pg) if (pg.snap_trimq.size()) out << " snaptrimq=" << pg.snap_trimq; - if (pg.deleting) - out << " DELETING"; out << "]"; @@ -4077,7 +4109,6 @@ void PG::handle_peering_event(CephPeeringEvtRef evt, RecoveryCtx *rctx) } if (old_peering_evt(evt)) return; - assert(!deleting); recovery_state.handle_event(evt, rctx); } diff --git a/src/osd/PG.h b/src/osd/PG.h index 22f88bcf85dc..be1d58d2ab36 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -424,7 +424,7 @@ public: /* You should not use these items without taking their respective queue locks * (if they have one) */ - xlist::item recovery_item, scrub_item, scrub_finalize_item, snap_trim_item, remove_item, stat_queue_item; + xlist::item recovery_item, scrub_item, scrub_finalize_item, snap_trim_item, stat_queue_item; int recovery_ops_active; bool waiting_on_backfill; #ifdef DEBUG_RECOVERY_OIDS @@ -628,7 +628,7 @@ protected: pg_stat_t pg_stats_stable; // for ordering writes - ObjectStore::Sequencer osr; + std::tr1::shared_ptr osr; void update_stats(); void clear_stats(); @@ -775,6 +775,8 @@ public: void build_scrub_map(ScrubMap &map); void build_inc_scrub_map(ScrubMap &map, eversion_t v); virtual int _scrub(ScrubMap &map, int& errors, int& fixed) { return 0; } + virtual coll_t get_temp_coll() = 0; + virtual bool have_temp_coll() = 0; void clear_scrub_reserved(); void scrub_reserve_replicas(); void scrub_unreserve_replicas(); @@ -1363,36 +1365,7 @@ public: public: PG(OSDService *o, OSDMapRef curmap, - PGPool *_pool, pg_t p, const hobject_t& loid, const hobject_t& ioid) : - osd(o), osdmap_ref(curmap), pool(_pool), - _lock("PG::_lock"), - ref(0), deleting(false), dirty_info(false), dirty_log(false), - info(p), coll(p), log_oid(loid), biginfo_oid(ioid), - recovery_item(this), scrub_item(this), scrub_finalize_item(this), snap_trim_item(this), remove_item(this), stat_queue_item(this), - recovery_ops_active(0), - waiting_on_backfill(0), - role(0), - state(0), - need_up_thru(false), - need_flush(false), - last_peering_reset(0), - heartbeat_peer_lock("PG::heartbeat_peer_lock"), - backfill_target(-1), - flushed(true), - pg_stats_lock("PG::pg_stats_lock"), - pg_stats_valid(false), - osr(stringify(p)), - finish_sync_event(NULL), - finalizing_scrub(false), - scrub_block_writes(false), - scrub_active(false), - scrub_reserved(false), scrub_reserve_failed(false), - scrub_waiting_on(0), - active_rep_scrub(0), - recovery_state(this) - { - pool->get(); - } + PGPool *_pool, pg_t p, const hobject_t& loid, const hobject_t& ioid); virtual ~PG() { pool->put(); } @@ -1540,7 +1513,7 @@ public: virtual void do_sub_op_reply(OpRequestRef op) = 0; virtual void do_scan(OpRequestRef op) = 0; virtual void do_backfill(OpRequestRef op) = 0; - virtual bool snap_trimmer() = 0; + virtual void snap_trimmer() = 0; virtual int do_command(vector& cmd, ostream& ss, bufferlist& idata, bufferlist& odata) = 0; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 0da6767d2e0e..31e9f864de4c 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -457,7 +457,7 @@ void ReplicatedPG::do_pg_op(OpRequestRef op) hobject_t next; hobject_t current = response.handle; - osr.flush(); + osr->flush(); int r = osd->store->collection_list_partial(coll, current, list_size, list_size, @@ -1121,7 +1121,7 @@ void ReplicatedPG::do_scan(OpRequestRef op) case MOSDPGScan::OP_SCAN_GET_DIGEST: { BackfillInterval bi; - osr.flush(); + osr->flush(); scan_range(m->begin, g_conf->osd_backfill_scan_min, g_conf->osd_backfill_scan_max, &bi); MOSDPGScan *reply = new MOSDPGScan(MOSDPGScan::OP_SCAN_DIGEST, get_osdmap()->get_epoch(), m->query_epoch, @@ -1198,7 +1198,7 @@ void ReplicatedPG::do_backfill(OpRequestRef op) ObjectStore::Transaction *t = new ObjectStore::Transaction; write_info(*t); - int tr = osd->store->queue_transaction(&osr, t); + int tr = osd->store->queue_transaction(osr.get(), t); assert(tr == 0); } break; @@ -1243,7 +1243,7 @@ bool ReplicatedPG::get_obs_to_trim(snapid_t &snap_to_trim, } // flush pg ops to fs so we can rely on collection_list() - osr.flush(); + osr->flush(); osd->store->collection_list(col_to_trim, obs_to_trim); @@ -1398,9 +1398,14 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid, return repop; } -bool ReplicatedPG::snap_trimmer() +void ReplicatedPG::snap_trimmer() { lock(); + if (deleting) { + unlock(); + put(); + return; + } dout(10) << "snap_trimmer entry" << dendl; if (is_primary()) { entity_inst_t nobody; @@ -1409,7 +1414,7 @@ bool ReplicatedPG::snap_trimmer() queue_snap_trim(); unlock(); put(); - return true; + return; } if (!scrub_block_writes) { dout(10) << "snap_trimmer posting" << dendl; @@ -1432,7 +1437,7 @@ bool ReplicatedPG::snap_trimmer() } unlock(); put(); - return true; + return; } int ReplicatedPG::do_xattr_cmp_u64(int op, __u64 v1, bufferlist& xattr) @@ -3363,7 +3368,7 @@ void ReplicatedPG::apply_repop(RepGather *repop) Context *onapplied = new C_OSD_OpApplied(this, repop); Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(repop->obc, repop->ctx->clone_obc); - int r = osd->store->queue_transactions(&osr, repop->tls, onapplied, oncommit, onapplied_sync, repop->ctx->op); + int r = osd->store->queue_transactions(osr.get(), repop->tls, onapplied, oncommit, onapplied_sync, repop->ctx->op); if (r) { derr << "apply_repop queue_transactions returned " << r << " on " << *repop << dendl; assert(0); @@ -4256,7 +4261,7 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op) Context *oncommit = new C_OSD_RepModifyCommit(rm); Context *onapply = new C_OSD_RepModifyApply(rm); - int r = osd->store->queue_transactions(&osr, rm->tls, onapply, oncommit, 0, op); + int r = osd->store->queue_transactions(osr.get(), rm->tls, onapply, oncommit, 0, op); if (r) { dout(0) << "error applying transaction: r = " << r << dendl; assert(0); @@ -4951,7 +4956,7 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op) } int r = osd->store-> - queue_transaction(&osr, t, + queue_transaction(osr.get(), t, onreadable, new C_OSD_CommittedPushedObject(this, op, info.history.same_interval_since, @@ -5006,7 +5011,7 @@ void ReplicatedPG::handle_push(OpRequestRef op) t); int r = osd->store-> - queue_transaction(&osr, t, + queue_transaction(osr.get(), t, onreadable, new C_OSD_CommittedPushedObject( this, op, @@ -5432,7 +5437,7 @@ void ReplicatedPG::sub_op_remove(OpRequestRef op) ObjectStore::Transaction *t = new ObjectStore::Transaction; remove_object_with_snap_hardlinks(*t, m->poid); - int r = osd->store->queue_transaction(&osr, t); + int r = osd->store->queue_transaction(osr.get(), t); assert(r == 0); } @@ -5597,7 +5602,7 @@ void ReplicatedPG::mark_all_unfound_lost(int what) write_info(*t); } - osd->store->queue_transaction(&osr, t, c, NULL, new C_OSD_OndiskWriteUnlockList(&c->obcs)); + osd->store->queue_transaction(osr.get(), t, c, NULL, new C_OSD_OndiskWriteUnlockList(&c->obcs)); // Send out the PG log to all replicas // So that they know what is lost @@ -5970,7 +5975,7 @@ int ReplicatedPG::recover_primary(int max) recover_got(soid, latest->version); - osd->store->queue_transaction(&osr, t, + osd->store->queue_transaction(osr.get(), t, new C_OSD_AppliedRecoveredObject(this, t, obc), new C_OSD_CommittedPushedObject(this, OpRequestRef(), info.history.same_interval_since, @@ -6184,7 +6189,7 @@ int ReplicatedPG::recover_backfill(int max) // objects. dout(10) << " rescanning local backfill_info from " << backfill_pos << dendl; backfill_info.clear(); - osr.flush(); + osr->flush(); scan_range(backfill_pos, local_min, local_max, &backfill_info); int ops = 0; @@ -6198,7 +6203,7 @@ int ReplicatedPG::recover_backfill(int max) while (ops < max) { if (backfill_info.begin <= pbi.begin && !backfill_info.extends_to_end() && backfill_info.empty()) { - osr.flush(); + osr->flush(); scan_range(backfill_info.end, local_min, local_max, &backfill_info); backfill_info.trim(); } @@ -6717,7 +6722,7 @@ boost::statechart::result ReplicatedPG::RepColTrim::react(const SnapTrim&) // flush all operations to fs so we can rely on collection_list // below. - pg->osr.flush(); + pg->osr->flush(); vector obs_to_trim; pg->osd->store->collection_list(col_to_trim, obs_to_trim); @@ -6840,7 +6845,7 @@ boost::statechart::result ReplicatedPG::WaitingOnReplicas::react(const SnapTrim& pg->snap_collections.erase(sn); pg->write_info(*t); t->remove_collection(c); - int tr = pg->osd->store->queue_transaction(&pg->osr, t); + int tr = pg->osd->store->queue_transaction(pg->osr.get(), t); assert(tr == 0); context().need_share_pg_info = true; diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 8bd5105550c1..1d3caee4621b 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -806,13 +806,21 @@ public: coll_t &col_to_trim, vector &obs_to_trim); RepGather *trim_object(const hobject_t &coid, const snapid_t &sn); - bool snap_trimmer(); + void snap_trimmer(); int do_osd_ops(OpContext *ctx, vector& ops); void do_osd_op_effects(OpContext *ctx); private: bool temp_created; coll_t temp_coll; coll_t get_temp_coll(ObjectStore::Transaction *t); +public: + bool have_temp_coll() { + return temp_created; + } + coll_t get_temp_coll() { + return temp_coll; + } +private: struct NotTrimming; struct SnapTrim : boost::statechart::event< SnapTrim > { SnapTrim() : boost::statechart::event < SnapTrim >() {} diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index 79a628050b2e..bc40d1b43291 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -280,6 +280,15 @@ bool coll_t::is_pg(pg_t& pgid, snapid_t& snap) const return true; } +bool coll_t::is_removal(uint64_t *seq) const +{ + if (str.substr(0, 12) != string("FORREMOVAL_")) + return false; + + stringstream ss(str.substr(12)); + ss >> *seq; + return true; +} void coll_t::encode(bufferlist& bl) const { diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index f863b6445f4d..35bc0a3bcb1f 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -323,6 +323,10 @@ public: return coll_t(pg_to_tmp_str(pgid)); } + static coll_t make_removal_coll(uint64_t seq) { + return coll_t(seq_to_removal_str(seq)); + } + const std::string& to_str() const { return str; } @@ -337,6 +341,7 @@ public: bool is_pg(pg_t& pgid, snapid_t& snap) const; bool is_temp(pg_t& pgid) const; + bool is_removal(uint64_t *seq) const; void encode(bufferlist& bl) const; void decode(bufferlist::iterator& bl); inline bool operator==(const coll_t& rhs) const { @@ -360,6 +365,11 @@ private: oss << p << "_TEMP"; return oss.str(); } + static std::string seq_to_removal_str(uint64_t seq) { + std::ostringstream oss; + oss << "FORREMOVAL_" << seq; + return oss.str(); + } std::string str; };