common/admin_socket_client.h \
common/shared_cache.hpp \
common/simple_cache.hpp \
+ common/sharedptr_registry.hpp \
common/MemoryModel.h\
common/Mutex.h\
common/PrebufferedStreambuf.h\
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_SHAREDPTR_REGISTRY_H
+#define CEPH_SHAREDPTR_REGISTRY_H
+
+#include <map>
+#include <memory>
+#include "common/Mutex.h"
+#include "common/Cond.h"
+
+/**
+ * Provides a registry of shared_ptr<V> indexed by K while
+ * the references are alive.
+ */
+template <class K, class V>
+class SharedPtrRegistry {
+ Mutex lock;
+ Cond cond;
+ typedef std::tr1::shared_ptr<V> VPtr;
+ typedef std::tr1::weak_ptr<V> WeakVPtr;
+ map<K, WeakVPtr> contents;
+
+ class OnRemoval {
+ SharedPtrRegistry<K,V> *parent;
+ K key;
+ public:
+ OnRemoval(SharedPtrRegistry<K,V> *parent, K key) :
+ parent(parent), key(key) {}
+ void operator()(V *to_remove) {
+ {
+ Mutex::Locker l(parent->lock);
+ parent->contents.erase(key);
+ parent->cond.Signal();
+ }
+ delete to_remove;
+ }
+ };
+ friend class OnRemoval;
+
+public:
+ SharedPtrRegistry() : lock("SharedPtrRegistry::lock") {}
+
+ template<class A>
+ VPtr lookup(const K &key, const A &arg) {
+ Mutex::Locker l(lock);
+ while (1) {
+ if (contents.count(key)) {
+ VPtr retval = contents[key].lock();
+ if (retval)
+ return retval;
+ } else {
+ break;
+ }
+ cond.Wait(lock);
+ }
+ VPtr retval(new V(arg), OnRemoval(this, key));
+ contents[key] = retval;
+ return retval;
+ }
+};
+
+#endif
delete t;
}
};
-
+ template<class T>
+ struct C_DeleteTransactionHolder : public Context {
+ ObjectStore::Transaction *t;
+ T obj;
+ C_DeleteTransactionHolder(ObjectStore::Transaction *tt, T &obj) :
+ t(tt), obj(obj) {}
+ void finish(int r) {
+ delete t;
+ }
+ };
virtual unsigned apply_transaction(Transaction& t, Context *ondisk=0) = 0;
virtual unsigned apply_transactions(list<Transaction*>& tls, Context *ondisk=0) = 0;
snap_trim_wq(osd->snap_trim_wq),
scrub_wq(osd->scrub_wq),
scrub_finalize_wq(osd->scrub_finalize_wq),
- remove_wq(osd->remove_wq),
rep_scrub_wq(osd->rep_scrub_wq),
class_handler(osd->class_handler),
publish_lock("OSDService::publish_lock"),
scrub_wq(this, g_conf->osd_scrub_thread_timeout, &disk_tp),
scrub_finalize_wq(this, g_conf->osd_scrub_finalize_thread_timeout, &op_tp),
rep_scrub_wq(this, g_conf->osd_scrub_thread_timeout, &disk_tp),
- remove_wq(this, g_conf->osd_remove_thread_timeout, &disk_tp),
+ remove_wq(store, g_conf->osd_remove_thread_timeout, &disk_tp),
+ next_removal_seq(0),
watch_lock("OSD::watch_lock"),
watch_timer(external_messenger->cct, watch_lock),
service(this)
vector<hobject_t> objects;
store->collection_list(tmp, objects);
- if (objects.empty())
- return;
-
// delete them.
ObjectStore::Transaction t;
unsigned removed = 0;
if (it->is_temp(pgid))
clear_temp(store, *it);
dout(10) << "load_pgs skipping non-pg " << *it << dendl;
+ if (it->is_temp(pgid)) {
+ clear_temp(store, *it);
+ continue;
+ }
+ uint64_t seq;
+ if (it->is_removal(&seq)) {
+ if (seq >= next_removal_seq)
+ next_removal_seq = seq + 1;
+ pair<coll_t, SequencerRef> *to_queue = new pair<coll_t, SequencerRef>;
+ to_queue->first = *it;
+ remove_wq.queue(to_queue);
+ continue;
+ }
continue;
}
if (snap != CEPH_NOSNAP) {
op_tracker.dump_ops_in_flight(ss);
}
+// =========================================
+void OSD::RemoveWQ::_process(pair<coll_t, SequencerRef> *item)
+{
+ coll_t &coll = item->first;
+ ObjectStore::Sequencer *osr = item->second.get();
+ store->flush();
+ vector<hobject_t> olist;
+ store->collection_list(coll, olist);
+ //*_dout << "OSD::RemoveWQ::_process removing coll " << coll << std::endl;
+ uint64_t num = 1;
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ for (vector<hobject_t>::iterator i = olist.begin();
+ i != olist.end();
+ ++i, ++num) {
+ if (num % 20 == 0) {
+ store->queue_transaction(
+ osr, t,
+ new ObjectStore::C_DeleteTransactionHolder<SequencerRef>(t, item->second));
+ t = new ObjectStore::Transaction;
+ }
+ t->remove(coll, *i);
+ }
+ t->remove_collection(coll);
+ store->queue_transaction(
+ osr, t,
+ new ObjectStore::C_DeleteTransactionHolder<SequencerRef>(t, item->second));
+ delete item;
+}
// =========================================
void OSD::do_mon_report()
if (!osdmap->have_pg_pool(pg->info.pgid.pool())) {
//pool is deleted!
- queue_pg_for_deletion(pg);
- //pg->unlock();
+ pg->get();
+ _remove_pg(pg);
+ pg->unlock();
+ pg->put();
continue;
} else {
pg->queue_null(osdmap->get_epoch(), osdmap->get_epoch());
if (!ctx.transaction->empty()) {
ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction));
int tr = store->queue_transaction(
- &pg->osr,
+ pg->osr.get(),
ctx.transaction, ctx.on_applied, ctx.on_safe);
assert(tr == 0);
ctx.transaction = new ObjectStore::Transaction;
} else {
ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction));
int tr = store->queue_transaction(
- &pg->osr,
+ pg->osr.get(),
ctx.transaction, ctx.on_applied, ctx.on_safe);
assert(tr == 0);
}
ObjectStore::Transaction *t = new ObjectStore::Transaction;
pg->trim(*t, m->trim_to);
pg->write_info(*t);
- int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t));
+ int tr = store->queue_transaction(pg->osr.get(), t,
+ new ObjectStore::C_DeleteTransaction(t));
assert(tr == 0);
}
pg->unlock();
if (pg_map.count(pgid)) {
pg = _lookup_lock_pg(pgid);
- if (!pg->deleting) {
- pg->queue_query(it->second.epoch_sent, it->second.epoch_sent,
- from, it->second);
- pg->unlock();
- continue;
- } else {
- pg->unlock();
- }
+ pg->queue_query(it->second.epoch_sent, it->second.epoch_sent,
+ from, it->second);
+ pg->unlock();
+ continue;
}
// get active crush mapping
dout(5) << "queue_pg_for_deletion: " << pgid << dendl;
PG *pg = _lookup_lock_pg(pgid);
if (pg->info.history.same_interval_since <= m->get_epoch()) {
- if (pg->deleting) {
- dout(10) << *pg << " already removing." << dendl;
- } else {
- assert(pg->get_primary() == m->get_source().num());
- queue_pg_for_deletion(pg);
- }
+ assert(pg->get_primary() == m->get_source().num());
+ pg->get();
+ _remove_pg(pg);
+ pg->unlock();
+ pg->put();
} else {
dout(10) << *pg << " ignoring remove request, pg changed in epoch "
<< pg->info.history.same_interval_since
<< " > " << m->get_epoch() << dendl;
+ pg->unlock();
}
- pg->unlock();
- }
-}
-
-
-void OSD::queue_pg_for_deletion(PG *pg)
-{
- dout(10) << *pg << " removing." << dendl;
- pg->assert_locked();
- assert(pg->get_role() == -1);
- if (!pg->deleting) {
- pg->deleting = true;
- remove_wq.queue(pg);
}
}
void OSD::_remove_pg(PG *pg)
{
- pg_t pgid = pg->info.pgid;
- dout(10) << "_remove_pg " << pgid << dendl;
-
- pg->lock();
- if (!pg->deleting) {
- pg->unlock();
- return;
- }
-
- // reset log, last_complete, in case deletion gets canceled
- pg->info.last_complete = eversion_t();
- pg->info.last_update = eversion_t();
- pg->info.log_tail = eversion_t();
- pg->log.zero();
- pg->ondisklog.zero();
-
- {
- ObjectStore::Transaction *t = new ObjectStore::Transaction;
- pg->write_info(*t);
- pg->write_log(*t);
- int tr = store->queue_transaction(&pg->osr, t);
- assert(tr == 0);
- }
-
- // flush all pg operations to the fs, so we can rely on
- // collection_list below.
- pg->osr.flush();
-
- int n = 0;
-
+ vector<coll_t> removals;
ObjectStore::Transaction *rmt = new ObjectStore::Transaction;
-
- // snap collections
for (interval_set<snapid_t>::iterator p = pg->snap_collections.begin();
p != pg->snap_collections.end();
- p++) {
+ ++p) {
for (snapid_t cur = p.get_start();
cur < p.get_start() + p.get_len();
++cur) {
- vector<hobject_t> olist;
- store->collection_list(coll_t(pgid, cur), olist);
- dout(10) << "_remove_pg " << pgid << " snap " << cur << " " << olist.size() << " objects" << dendl;
- for (vector<hobject_t>::iterator q = olist.begin();
- q != olist.end();
- q++) {
- ObjectStore::Transaction *t = new ObjectStore::Transaction;
- t->remove(coll_t(pgid, cur), *q);
- t->remove(coll_t(pgid), *q); // we may hit this twice, but it's harmless
- int tr = store->queue_transaction(&pg->osr, t);
- assert(tr == 0);
-
- if ((++n & 0xff) == 0) {
- pg->unlock();
- pg->lock();
- if (!pg->deleting) {
- dout(10) << "_remove_pg aborted on " << *pg << dendl;
- pg->unlock();
- return;
- }
- }
- }
- rmt->remove_collection(coll_t(pgid, cur));
+ coll_t to_remove = get_next_removal_coll();
+ removals.push_back(to_remove);
+ rmt->collection_rename(coll_t(pg->info.pgid, cur), to_remove);
}
}
-
- // (what remains of the) main collection
- vector<hobject_t> olist;
- store->collection_list(coll_t(pgid), olist);
- dout(10) << "_remove_pg " << pgid << " " << olist.size() << " objects" << dendl;
- for (vector<hobject_t>::iterator p = olist.begin();
- p != olist.end();
- p++) {
- ObjectStore::Transaction *t = new ObjectStore::Transaction;
- t->remove(coll_t(pgid), *p);
- int tr = store->queue_transaction(&pg->osr, t);
- assert(tr == 0);
-
- if ((++n & 0xff) == 0) {
- pg->unlock();
- pg->lock();
- if (!pg->deleting) {
- dout(10) << "_remove_pg aborted on " << *pg << dendl;
- pg->unlock();
- return;
- }
- }
- }
-
- pg->unlock();
-
- dout(10) << "_remove_pg " << pgid << " flushing store" << dendl;
- store->flush();
-
- dout(10) << "_remove_pg " << pgid << " taking osd_lock" << dendl;
- osd_lock.Lock();
- pg->lock();
-
- if (!pg->deleting) {
- osd_lock.Unlock();
- pg->unlock();
- return;
- }
-
- dout(10) << "_remove_pg " << pgid << " removing final" << dendl;
-
- {
- rmt->remove(coll_t::META_COLL, pg->log_oid);
- rmt->remove(coll_t::META_COLL, pg->biginfo_oid);
- rmt->remove_collection(coll_t(pgid));
- int tr = store->queue_transaction(NULL, rmt);
- assert(tr == 0);
+ coll_t to_remove = get_next_removal_coll();
+ removals.push_back(to_remove);
+ rmt->collection_rename(coll_t(pg->info.pgid), to_remove);
+ if (pg->have_temp_coll()) {
+ to_remove = get_next_removal_coll();
+ removals.push_back(to_remove);
+ rmt->collection_rename(pg->get_temp_coll(), to_remove);
}
+ rmt->remove(coll_t::META_COLL, pg->log_oid);
+ rmt->remove(coll_t::META_COLL, pg->biginfo_oid);
- if (store->collection_exists(coll_t::make_temp_coll(pg->get_pgid()))) {
- clear_temp(store, coll_t::make_temp_coll(pg->get_pgid()));
- }
+ store->queue_transaction(
+ pg->osr.get(), rmt,
+ new ObjectStore::C_DeleteTransactionHolder<
+ SequencerRef>(rmt, pg->osr));
// on_removal, which calls remove_watchers_and_notifies, and the erasure from
// the pg_map must be done together without unlocking the pg lock,
// and handle_notify_timeout
pg->on_removal();
+ for (vector<coll_t>::iterator i = removals.begin();
+ i != removals.end();
+ ++i) {
+ remove_wq.queue(new pair<coll_t, SequencerRef>(*i, pg->osr));
+ }
+
+ recovery_wq.dequeue(pg);
+ scrub_wq.dequeue(pg);
+ scrub_finalize_wq.dequeue(pg);
+ snap_trim_wq.dequeue(pg);
+ pg_stat_queue_dequeue(pg);
+ op_wq.dequeue(pg);
+ peering_wq.dequeue(pg);
+
+ pg->deleting = true;
+
// remove from map
- pg_map.erase(pgid);
+ pg_map.erase(pg->info.pgid);
pg->put(); // since we've taken it out of map
- service.unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
+ service.unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
_put_pool(pg->pool);
-
- // unlock, and probably delete
- pg->unlock();
- pg->put(); // will delete, if last reference
- osd_lock.Unlock();
- dout(10) << "_remove_pg " << pgid << " all done" << dendl;
}
dout(10) << "do_recovery raced and failed to start anything; requeuing " << *pg << dendl;
recovery_wq.queue(pg);
} else {
-
pg->lock();
+ if (pg->deleting) {
+ pg->unlock();
+ pg->put();
+ return;
+ }
dout(10) << "do_recovery starting " << max
<< " (" << recovery_ops_active << "/" << g_conf->osd_recovery_max_active << " rops) on "
OpRequestRef op;
pg->lock();
+ if (pg->deleting) {
+ pg->unlock();
+ pg->put();
+ return;
+ }
assert(!pg->op_queue.empty());
op = pg->op_queue.front();
pg->op_queue.pop_front();
#include "OpRequest.h"
#include "common/shared_cache.hpp"
#include "common/simple_cache.hpp"
+#include "common/sharedptr_registry.hpp"
#define CEPH_OSD_PROTOCOL 10 /* cluster internal */
extern const coll_t meta_coll;
+typedef std::tr1::shared_ptr<ObjectStore::Sequencer> SequencerRef;
+
class OSD;
class OSDService {
public:
OSD *osd;
+ SharedPtrRegistry<pg_t, ObjectStore::Sequencer> osr_registry;
const int whoami;
ObjectStore *&store;
LogClient &clog;
ThreadPool::WorkQueue<PG> &snap_trim_wq;
ThreadPool::WorkQueue<PG> &scrub_wq;
ThreadPool::WorkQueue<PG> &scrub_finalize_wq;
- ThreadPool::WorkQueue<PG> &remove_wq;
ThreadPool::WorkQueue<MOSDRepScrub> &rep_scrub_wq;
ClassHandler *&class_handler;
OpsFlightSocketHook *admin_ops_hook;
// -- op queue --
- deque<PG*> op_queue;
+ list<PG*> op_queue;
int op_queue_len;
struct OpWQ : public ThreadPool::WorkQueue<PG> {
bool _enqueue(PG *pg);
void _dequeue(PG *pg) {
- assert(0);
+ for (list<PG*>::iterator i = osd->op_queue.begin();
+ i != osd->op_queue.end();
+ ) {
+ if (*i == pg)
+ osd->op_queue.erase(i++);
+ else
+ ++i;
+ }
}
bool _empty() {
return osd->op_queue.empty();
// -- peering queue --
struct PeeringWQ : public ThreadPool::WorkQueue<PG> {
- deque<PG*> peering_queue;
+ list<PG*> peering_queue;
OSD *osd;
PeeringWQ(OSD *o, time_t ti, ThreadPool *tp)
: ThreadPool::WorkQueue<PG>(
"OSD::PeeringWQ", ti, ti*10, tp), osd(o) {}
void _dequeue(PG *pg) {
- assert(0);
+ for (list<PG*>::iterator i = peering_queue.begin();
+ i != peering_queue.end();
+ ) {
+ if (*i == pg)
+ peering_queue.erase(i++);
+ else
+ ++i;
+ }
}
bool _enqueue(PG *pg) {
pg->get();
void handle_pg_backfill(OpRequestRef op);
void handle_pg_remove(OpRequestRef op);
- void queue_pg_for_deletion(PG *pg);
void _remove_pg(PG *pg);
// -- commands --
} rep_scrub_wq;
// -- removing --
- xlist<PG*> remove_queue;
-
- struct RemoveWQ : public ThreadPool::WorkQueue<PG> {
- OSD *osd;
- RemoveWQ(OSD *o, time_t ti, ThreadPool *tp)
- : ThreadPool::WorkQueue<PG>("OSD::RemoveWQ", ti, 0, tp), osd(o) {}
+ struct RemoveWQ : public ThreadPool::WorkQueue<pair<coll_t, SequencerRef> > {
+ ObjectStore *&store;
+ list<pair<coll_t, SequencerRef> *> remove_queue;
+ RemoveWQ(ObjectStore *&o, time_t ti, ThreadPool *tp)
+ : ThreadPool::WorkQueue<pair<coll_t, SequencerRef> >("OSD::RemoveWQ", ti, 0, tp),
+ store(o) {}
bool _empty() {
- return osd->remove_queue.empty();
+ return remove_queue.empty();
}
- bool _enqueue(PG *pg) {
- if (pg->remove_item.is_on_list())
- return false;
- pg->get();
- osd->remove_queue.push_back(&pg->remove_item);
+ bool _enqueue(pair<coll_t, SequencerRef> *item) {
+ remove_queue.push_back(item);
return true;
}
- void _dequeue(PG *pg) {
- if (pg->remove_item.remove_myself())
- pg->put();
+ void _dequeue(pair<coll_t, SequencerRef> *item) {
+ assert(0);
}
- PG *_dequeue() {
- if (osd->remove_queue.empty())
+ pair<coll_t, SequencerRef> *_dequeue() {
+ if (remove_queue.empty())
return NULL;
- PG *pg = osd->remove_queue.front();
- osd->remove_queue.pop_front();
- return pg;
- }
- void _process(PG *pg) {
- osd->_remove_pg(pg);
+ pair<coll_t, SequencerRef> *item = remove_queue.front();
+ remove_queue.pop_front();
+ return item;
}
+ void _process(pair<coll_t, SequencerRef> *item);
void _clear() {
- while (!osd->remove_queue.empty()) {
- PG *pg = osd->remove_queue.front();
- osd->remove_queue.pop_front();
- pg->put();
+ while (!remove_queue.empty()) {
+ delete remove_queue.front();
+ remove_queue.pop_front();
}
}
} remove_wq;
+ uint64_t next_removal_seq;
+ coll_t get_next_removal_coll() {
+ return coll_t::make_removal_coll(next_removal_seq++);
+ }
private:
bool ms_dispatch(Message *m);
return *_dout << pg->gen_prefix();
}
+PG::PG(OSDService *o, OSDMapRef curmap,
+ PGPool *_pool, pg_t p, const hobject_t& loid, const hobject_t& ioid) :
+ osd(o), osdmap_ref(curmap), pool(_pool),
+ _lock("PG::_lock"),
+ ref(0), deleting(false), dirty_info(false), dirty_log(false),
+ info(p), coll(p), log_oid(loid), biginfo_oid(ioid),
+ recovery_item(this), scrub_item(this), scrub_finalize_item(this), snap_trim_item(this), stat_queue_item(this),
+ recovery_ops_active(0),
+ waiting_on_backfill(0),
+ role(0),
+ state(0),
+ need_up_thru(false),
+ need_flush(false),
+ last_peering_reset(0),
+ heartbeat_peer_lock("PG::heartbeat_peer_lock"),
+ backfill_target(-1),
+ pg_stats_lock("PG::pg_stats_lock"),
+ pg_stats_valid(false),
+ osr(osd->osr_registry.lookup(p, (stringify(p)))),
+ finish_sync_event(NULL),
+ finalizing_scrub(false),
+ scrub_block_writes(false),
+ scrub_active(false),
+ scrub_reserved(false), scrub_reserve_failed(false),
+ scrub_waiting_on(0),
+ active_rep_scrub(0),
+ recovery_state(this)
+{
+ pool->get();
+}
+
void PG::lock(bool no_lockdep)
{
_lock.Lock(no_lockdep);
assert(is_locked());
if (need_flush) {
dout(10) << "do_pending_flush doing pending flush" << dendl;
- osr.flush();
+ osr->flush();
need_flush = false;
dout(10) << "do_pending_flush done" << dendl;
}
if (dirty_info) {
ObjectStore::Transaction *t = new ObjectStore::Transaction;
write_info(*t);
- int tr = osd->store->queue_transaction(&osr, t);
+ int tr = osd->store->queue_transaction(osr.get(), t);
assert(tr == 0);
}
assert(peer_activated.size() == acting.size());
info.history.last_epoch_started = get_osdmap()->get_epoch();
- dirty_info = true;
// make sure CLEAN is marked if we've been clean in this interval
if (info.last_complete == info.last_update &&
// wait for any writes on our pg to flush to disk first. this avoids races
// with scrub starting immediately after trim or recovery completion.
- osr.flush();
+ osr->flush();
// objects
vector<hobject_t> ls;
{
lock();
+ if (deleting) {
+ unlock();
+ put();
+ return;
+ }
if (!is_primary() || !is_active() || !is_clean() || !is_scrubbing()) {
dout(10) << "scrub -- not primary or active or not clean" << dendl;
void PG::scrub_finalize() {
lock();
+ if (deleting) {
+ unlock();
+ return;
+ }
+
assert(last_update_applied == info.last_update);
if (scrub_epoch_start != info.history.same_interval_since) {
{
ObjectStore::Transaction *t = new ObjectStore::Transaction;
write_info(*t);
- int tr = osd->store->queue_transaction(&osr, t);
+ int tr = osd->store->queue_transaction(osr.get(), t);
assert(tr == 0);
}
osd->scrub_wq.dequeue(this);
osd->scrub_finalize_wq.dequeue(this);
osd->snap_trim_wq.dequeue(this);
- osd->remove_wq.dequeue(this);
osd->pg_stat_queue_dequeue(this);
remove_watchers_and_notifies();
// pg->on_*
on_change();
- if (deleting) {
- dout(10) << *this << " canceling deletion!" << dendl;
- deleting = false;
- osd->remove_wq.dequeue(this);
- osd->reg_last_pg_scrub(info.pgid, info.history.last_scrub_stamp);
- }
-
+ assert(!deleting);
+
if (role != oldrole) {
// old primary?
if (oldrole == 0) {
if (pg.snap_trimq.size())
out << " snaptrimq=" << pg.snap_trimq;
- if (pg.deleting)
- out << " DELETING";
out << "]";
}
if (old_peering_evt(evt))
return;
- assert(!deleting);
recovery_state.handle_event(evt, rctx);
}
/* You should not use these items without taking their respective queue locks
* (if they have one) */
- xlist<PG*>::item recovery_item, scrub_item, scrub_finalize_item, snap_trim_item, remove_item, stat_queue_item;
+ xlist<PG*>::item recovery_item, scrub_item, scrub_finalize_item, snap_trim_item, stat_queue_item;
int recovery_ops_active;
bool waiting_on_backfill;
#ifdef DEBUG_RECOVERY_OIDS
pg_stat_t pg_stats_stable;
// for ordering writes
- ObjectStore::Sequencer osr;
+ std::tr1::shared_ptr<ObjectStore::Sequencer> osr;
void update_stats();
void clear_stats();
void build_scrub_map(ScrubMap &map);
void build_inc_scrub_map(ScrubMap &map, eversion_t v);
virtual int _scrub(ScrubMap &map, int& errors, int& fixed) { return 0; }
+ virtual coll_t get_temp_coll() = 0;
+ virtual bool have_temp_coll() = 0;
void clear_scrub_reserved();
void scrub_reserve_replicas();
void scrub_unreserve_replicas();
public:
PG(OSDService *o, OSDMapRef curmap,
- PGPool *_pool, pg_t p, const hobject_t& loid, const hobject_t& ioid) :
- osd(o), osdmap_ref(curmap), pool(_pool),
- _lock("PG::_lock"),
- ref(0), deleting(false), dirty_info(false), dirty_log(false),
- info(p), coll(p), log_oid(loid), biginfo_oid(ioid),
- recovery_item(this), scrub_item(this), scrub_finalize_item(this), snap_trim_item(this), remove_item(this), stat_queue_item(this),
- recovery_ops_active(0),
- waiting_on_backfill(0),
- role(0),
- state(0),
- need_up_thru(false),
- need_flush(false),
- last_peering_reset(0),
- heartbeat_peer_lock("PG::heartbeat_peer_lock"),
- backfill_target(-1),
- flushed(true),
- pg_stats_lock("PG::pg_stats_lock"),
- pg_stats_valid(false),
- osr(stringify(p)),
- finish_sync_event(NULL),
- finalizing_scrub(false),
- scrub_block_writes(false),
- scrub_active(false),
- scrub_reserved(false), scrub_reserve_failed(false),
- scrub_waiting_on(0),
- active_rep_scrub(0),
- recovery_state(this)
- {
- pool->get();
- }
+ PGPool *_pool, pg_t p, const hobject_t& loid, const hobject_t& ioid);
virtual ~PG() {
pool->put();
}
virtual void do_sub_op_reply(OpRequestRef op) = 0;
virtual void do_scan(OpRequestRef op) = 0;
virtual void do_backfill(OpRequestRef op) = 0;
- virtual bool snap_trimmer() = 0;
+ virtual void snap_trimmer() = 0;
virtual int do_command(vector<string>& cmd, ostream& ss,
bufferlist& idata, bufferlist& odata) = 0;
hobject_t next;
hobject_t current = response.handle;
- osr.flush();
+ osr->flush();
int r = osd->store->collection_list_partial(coll, current,
list_size,
list_size,
case MOSDPGScan::OP_SCAN_GET_DIGEST:
{
BackfillInterval bi;
- osr.flush();
+ osr->flush();
scan_range(m->begin, g_conf->osd_backfill_scan_min, g_conf->osd_backfill_scan_max, &bi);
MOSDPGScan *reply = new MOSDPGScan(MOSDPGScan::OP_SCAN_DIGEST,
get_osdmap()->get_epoch(), m->query_epoch,
ObjectStore::Transaction *t = new ObjectStore::Transaction;
write_info(*t);
- int tr = osd->store->queue_transaction(&osr, t);
+ int tr = osd->store->queue_transaction(osr.get(), t);
assert(tr == 0);
}
break;
}
// flush pg ops to fs so we can rely on collection_list()
- osr.flush();
+ osr->flush();
osd->store->collection_list(col_to_trim, obs_to_trim);
return repop;
}
-bool ReplicatedPG::snap_trimmer()
+void ReplicatedPG::snap_trimmer()
{
lock();
+ if (deleting) {
+ unlock();
+ put();
+ return;
+ }
dout(10) << "snap_trimmer entry" << dendl;
if (is_primary()) {
entity_inst_t nobody;
queue_snap_trim();
unlock();
put();
- return true;
+ return;
}
if (!scrub_block_writes) {
dout(10) << "snap_trimmer posting" << dendl;
}
unlock();
put();
- return true;
+ return;
}
int ReplicatedPG::do_xattr_cmp_u64(int op, __u64 v1, bufferlist& xattr)
Context *onapplied = new C_OSD_OpApplied(this, repop);
Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(repop->obc,
repop->ctx->clone_obc);
- int r = osd->store->queue_transactions(&osr, repop->tls, onapplied, oncommit, onapplied_sync, repop->ctx->op);
+ int r = osd->store->queue_transactions(osr.get(), repop->tls, onapplied, oncommit, onapplied_sync, repop->ctx->op);
if (r) {
derr << "apply_repop queue_transactions returned " << r << " on " << *repop << dendl;
assert(0);
Context *oncommit = new C_OSD_RepModifyCommit(rm);
Context *onapply = new C_OSD_RepModifyApply(rm);
- int r = osd->store->queue_transactions(&osr, rm->tls, onapply, oncommit, 0, op);
+ int r = osd->store->queue_transactions(osr.get(), rm->tls, onapply, oncommit, 0, op);
if (r) {
dout(0) << "error applying transaction: r = " << r << dendl;
assert(0);
}
int r = osd->store->
- queue_transaction(&osr, t,
+ queue_transaction(osr.get(), t,
onreadable,
new C_OSD_CommittedPushedObject(this, op,
info.history.same_interval_since,
t);
int r = osd->store->
- queue_transaction(&osr, t,
+ queue_transaction(osr.get(), t,
onreadable,
new C_OSD_CommittedPushedObject(
this, op,
ObjectStore::Transaction *t = new ObjectStore::Transaction;
remove_object_with_snap_hardlinks(*t, m->poid);
- int r = osd->store->queue_transaction(&osr, t);
+ int r = osd->store->queue_transaction(osr.get(), t);
assert(r == 0);
}
write_info(*t);
}
- osd->store->queue_transaction(&osr, t, c, NULL, new C_OSD_OndiskWriteUnlockList(&c->obcs));
+ osd->store->queue_transaction(osr.get(), t, c, NULL, new C_OSD_OndiskWriteUnlockList(&c->obcs));
// Send out the PG log to all replicas
// So that they know what is lost
recover_got(soid, latest->version);
- osd->store->queue_transaction(&osr, t,
+ osd->store->queue_transaction(osr.get(), t,
new C_OSD_AppliedRecoveredObject(this, t, obc),
new C_OSD_CommittedPushedObject(this, OpRequestRef(),
info.history.same_interval_since,
// objects.
dout(10) << " rescanning local backfill_info from " << backfill_pos << dendl;
backfill_info.clear();
- osr.flush();
+ osr->flush();
scan_range(backfill_pos, local_min, local_max, &backfill_info);
int ops = 0;
while (ops < max) {
if (backfill_info.begin <= pbi.begin &&
!backfill_info.extends_to_end() && backfill_info.empty()) {
- osr.flush();
+ osr->flush();
scan_range(backfill_info.end, local_min, local_max, &backfill_info);
backfill_info.trim();
}
// flush all operations to fs so we can rely on collection_list
// below.
- pg->osr.flush();
+ pg->osr->flush();
vector<hobject_t> obs_to_trim;
pg->osd->store->collection_list(col_to_trim, obs_to_trim);
pg->snap_collections.erase(sn);
pg->write_info(*t);
t->remove_collection(c);
- int tr = pg->osd->store->queue_transaction(&pg->osr, t);
+ int tr = pg->osd->store->queue_transaction(pg->osr.get(), t);
assert(tr == 0);
context<SnapTrimmer>().need_share_pg_info = true;
coll_t &col_to_trim,
vector<hobject_t> &obs_to_trim);
RepGather *trim_object(const hobject_t &coid, const snapid_t &sn);
- bool snap_trimmer();
+ void snap_trimmer();
int do_osd_ops(OpContext *ctx, vector<OSDOp>& ops);
void do_osd_op_effects(OpContext *ctx);
private:
bool temp_created;
coll_t temp_coll;
coll_t get_temp_coll(ObjectStore::Transaction *t);
+public:
+ bool have_temp_coll() {
+ return temp_created;
+ }
+ coll_t get_temp_coll() {
+ return temp_coll;
+ }
+private:
struct NotTrimming;
struct SnapTrim : boost::statechart::event< SnapTrim > {
SnapTrim() : boost::statechart::event < SnapTrim >() {}
return true;
}
+bool coll_t::is_removal(uint64_t *seq) const
+{
+ if (str.substr(0, 12) != string("FORREMOVAL_"))
+ return false;
+
+ stringstream ss(str.substr(12));
+ ss >> *seq;
+ return true;
+}
void coll_t::encode(bufferlist& bl) const
{
return coll_t(pg_to_tmp_str(pgid));
}
+ static coll_t make_removal_coll(uint64_t seq) {
+ return coll_t(seq_to_removal_str(seq));
+ }
+
const std::string& to_str() const {
return str;
}
bool is_pg(pg_t& pgid, snapid_t& snap) const;
bool is_temp(pg_t& pgid) const;
+ bool is_removal(uint64_t *seq) const;
void encode(bufferlist& bl) const;
void decode(bufferlist::iterator& bl);
inline bool operator==(const coll_t& rhs) const {
oss << p << "_TEMP";
return oss.str();
}
+ static std::string seq_to_removal_str(uint64_t seq) {
+ std::ostringstream oss;
+ oss << "FORREMOVAL_" << seq;
+ return oss.str();
+ }
std::string str;
};