public:
SharedPtrRegistry() : lock("SharedPtrRegistry::lock") {}
+ VPtr lookup(const K &key) {
+ Mutex::Locker l(lock);
+ while (1) {
+ if (contents.count(key)) {
+ VPtr retval = contents[key].lock();
+ if (retval)
+ return retval;
+ } else {
+ break;
+ }
+ cond.Wait(lock);
+ Mutex::Locker l(lock);
+ }
+ return VPtr();
+ }
+
+ VPtr lookup_or_create(const K &key) {
+ Mutex::Locker l(lock);
+ while (1) {
+ if (contents.count(key)) {
+ VPtr retval = contents[key].lock();
+ if (retval)
+ return retval;
+ } else {
+ break;
+ }
+ cond.Wait(lock);
+ }
+ VPtr retval(new V(), OnRemoval(this, key));
+ contents[key] = retval;
+ return retval;
+ }
+
template<class A>
- VPtr lookup(const K &key, const A &arg) {
+ VPtr lookup_or_create(const K &key, const A &arg) {
Mutex::Locker l(lock);
while (1) {
if (contents.count(key)) {
continue;
}
uint64_t seq;
- if (it->is_removal(&seq)) {
+ if (it->is_removal(&seq, &pgid)) {
if (seq >= next_removal_seq)
next_removal_seq = seq + 1;
- pair<coll_t, SequencerRef> *to_queue = new pair<coll_t, SequencerRef>;
- to_queue->first = *it;
+ boost::tuple<coll_t, SequencerRef, DeletingStateRef> *to_queue =
+ new boost::tuple<coll_t, SequencerRef, DeletingStateRef>;
+ to_queue->get<0>() = *it;
+ to_queue->get<1>() = service.osr_registry.lookup_or_create(
+ pgid, stringify(pgid));
+ to_queue->get<2>() = service.deleting_pgs.lookup_or_create(pgid);
remove_wq.queue(to_queue);
continue;
}
}
// =========================================
-void OSD::RemoveWQ::_process(pair<coll_t, SequencerRef> *item)
+void OSD::RemoveWQ::_process(boost::tuple<coll_t, SequencerRef, DeletingStateRef> *item)
{
- coll_t &coll = item->first;
- ObjectStore::Sequencer *osr = item->second.get();
- store->flush();
+ coll_t &coll = item->get<0>();
+ ObjectStore::Sequencer *osr = item->get<1>().get();
+ if (osr)
+ osr->flush();
vector<hobject_t> olist;
store->collection_list(coll, olist);
//*_dout << "OSD::RemoveWQ::_process removing coll " << coll << std::endl;
if (num % 20 == 0) {
store->queue_transaction(
osr, t,
- new ObjectStore::C_DeleteTransactionHolder<SequencerRef>(t, item->second));
+ new ObjectStore::C_DeleteTransactionHolder<SequencerRef>(t, item->get<1>()));
t = new ObjectStore::Transaction;
}
t->remove(coll, *i);
t->remove_collection(coll);
store->queue_transaction(
osr, t,
- new ObjectStore::C_DeleteTransactionHolder<SequencerRef>(t, item->second));
+ new ObjectStore::C_DeleteTransactionHolder<SequencerRef>(t, item->get<1>()));
delete item;
}
// =========================================
for (snapid_t cur = p.get_start();
cur < p.get_start() + p.get_len();
++cur) {
- coll_t to_remove = get_next_removal_coll();
+ coll_t to_remove = get_next_removal_coll(pg->info.pgid);
removals.push_back(to_remove);
rmt->collection_rename(coll_t(pg->info.pgid, cur), to_remove);
}
}
- coll_t to_remove = get_next_removal_coll();
+ coll_t to_remove = get_next_removal_coll(pg->info.pgid);
removals.push_back(to_remove);
rmt->collection_rename(coll_t(pg->info.pgid), to_remove);
if (pg->have_temp_coll()) {
- to_remove = get_next_removal_coll();
+ to_remove = get_next_removal_coll(pg->info.pgid);
removals.push_back(to_remove);
rmt->collection_rename(pg->get_temp_coll(), to_remove);
}
// and handle_notify_timeout
pg->on_removal();
+ DeletingStateRef deleting = service.deleting_pgs.lookup_or_create(pg->info.pgid);
for (vector<coll_t>::iterator i = removals.begin();
i != removals.end();
++i) {
- remove_wq.queue(new pair<coll_t, SequencerRef>(*i, pg->osr));
+ remove_wq.queue(new boost::tuple<coll_t, SequencerRef, DeletingStateRef>(
+ *i, pg->osr, deleting));
}
recovery_wq.dequeue(pg);
#ifndef CEPH_OSD_H
#define CEPH_OSD_H
+#include "boost/tuple/tuple.hpp"
+
#include "PG.h"
#include "msg/Dispatcher.h"
typedef std::tr1::shared_ptr<ObjectStore::Sequencer> SequencerRef;
+class DeletingState {
+ Mutex lock;
+ list<Context *> on_deletion_complete;
+public:
+ DeletingState() : lock("DeletingState::lock") {}
+ void register_on_delete(Context *completion) {
+ Mutex::Locker l(lock);
+ on_deletion_complete.push_front(completion);
+ }
+ ~DeletingState() {
+ Mutex::Locker l(lock);
+ for (list<Context *>::iterator i = on_deletion_complete.begin();
+ i != on_deletion_complete.end();
+ ++i) {
+ (*i)->complete(0);
+ }
+ }
+};
+typedef std::tr1::shared_ptr<DeletingState> DeletingStateRef;
+
class OSD;
class OSDService {
public:
OSD *osd;
SharedPtrRegistry<pg_t, ObjectStore::Sequencer> osr_registry;
+ SharedPtrRegistry<pg_t, DeletingState> deleting_pgs;
const int whoami;
ObjectStore *&store;
LogClient &clog;
} rep_scrub_wq;
// -- removing --
- struct RemoveWQ : public ThreadPool::WorkQueue<pair<coll_t, SequencerRef> > {
+ struct RemoveWQ : public ThreadPool::WorkQueue<boost::tuple<coll_t, SequencerRef, DeletingStateRef> > {
ObjectStore *&store;
- list<pair<coll_t, SequencerRef> *> remove_queue;
+ list<boost::tuple<coll_t, SequencerRef, DeletingStateRef> *> remove_queue;
RemoveWQ(ObjectStore *&o, time_t ti, ThreadPool *tp)
- : ThreadPool::WorkQueue<pair<coll_t, SequencerRef> >("OSD::RemoveWQ", ti, 0, tp),
+ : ThreadPool::WorkQueue<boost::tuple<coll_t, SequencerRef, DeletingStateRef> >("OSD::RemoveWQ", ti, 0, tp),
store(o) {}
bool _empty() {
return remove_queue.empty();
}
- bool _enqueue(pair<coll_t, SequencerRef> *item) {
+ bool _enqueue(boost::tuple<coll_t, SequencerRef, DeletingStateRef> *item) {
remove_queue.push_back(item);
return true;
}
- void _dequeue(pair<coll_t, SequencerRef> *item) {
+ void _dequeue(boost::tuple<coll_t, SequencerRef, DeletingStateRef> *item) {
assert(0);
}
- pair<coll_t, SequencerRef> *_dequeue() {
+ boost::tuple<coll_t, SequencerRef, DeletingStateRef> *_dequeue() {
if (remove_queue.empty())
return NULL;
- pair<coll_t, SequencerRef> *item = remove_queue.front();
+ boost::tuple<coll_t, SequencerRef, DeletingStateRef> *item = remove_queue.front();
remove_queue.pop_front();
return item;
}
- void _process(pair<coll_t, SequencerRef> *item);
+ void _process(boost::tuple<coll_t, SequencerRef, DeletingStateRef> *item);
void _clear() {
while (!remove_queue.empty()) {
delete remove_queue.front();
}
} remove_wq;
uint64_t next_removal_seq;
- coll_t get_next_removal_coll() {
- return coll_t::make_removal_coll(next_removal_seq++);
+ coll_t get_next_removal_coll(pg_t pgid) {
+ return coll_t::make_removal_coll(next_removal_seq++, pgid);
}
private:
backfill_target(-1),
pg_stats_lock("PG::pg_stats_lock"),
pg_stats_valid(false),
- osr(osd->osr_registry.lookup(p, (stringify(p)))),
+ osr(osd->osr_registry.lookup_or_create(p, (stringify(p)))),
finish_sync_event(NULL),
finalizing_scrub(false),
scrub_block_writes(false),
flushed = false;
on_applied->push_back(new ContainerContext<FlushStateRef>(flush_trigger));
on_safe->push_back(new ContainerContext<FlushStateRef>(flush_trigger));
+ DeletingStateRef del = osd->deleting_pgs.lookup(info.pgid);
+ if (del)
+ del->register_on_delete(new ContainerContext<FlushStateRef>(flush_trigger));
}
/* Called before initializing peering during advance_map */
return true;
}
-bool coll_t::is_removal(uint64_t *seq) const
+bool coll_t::is_removal(uint64_t *seq, pg_t *pgid) const
{
if (str.substr(0, 12) != string("FORREMOVAL_"))
return false;
stringstream ss(str.substr(12));
ss >> *seq;
+ char sep;
+ ss >> sep;
+ assert(sep == '_');
+ string pgid_str;
+ ss >> pgid_str;
+ if (!pgid->parse(pgid_str.c_str())) {
+ assert(0);
+ return false;
+ }
return true;
}
return coll_t(pg_to_tmp_str(pgid));
}
- static coll_t make_removal_coll(uint64_t seq) {
- return coll_t(seq_to_removal_str(seq));
+ static coll_t make_removal_coll(uint64_t seq, pg_t pgid) {
+ return coll_t(seq_to_removal_str(seq, pgid));
}
const std::string& to_str() const {
bool is_pg(pg_t& pgid, snapid_t& snap) const;
bool is_temp(pg_t& pgid) const;
- bool is_removal(uint64_t *seq) const;
+ bool is_removal(uint64_t *seq, pg_t *pgid) const;
void encode(bufferlist& bl) const;
void decode(bufferlist::iterator& bl);
inline bool operator==(const coll_t& rhs) const {
oss << p << "_TEMP";
return oss.str();
}
- static std::string seq_to_removal_str(uint64_t seq) {
+ static std::string seq_to_removal_str(uint64_t seq, pg_t pgid) {
std::ostringstream oss;
- oss << "FORREMOVAL_" << seq;
+ oss << "FORREMOVAL_" << seq << "_" << pgid;
return oss.str();
}