- rename over old files should flush data, or revert back to old contents
rados
+- snaps
+ - use default pool contexts
+ - make snap removal work with the default pools (fix new_removed_snaps logic in OSD.cc)
- merge pgs
- destroy pg_pools
- autosize pg_pools?
snapid_t sn = pending_destroy[tid];
dout(7) << "commit " << tid << " destroy " << sn << dendl;
snaps.erase(sn);
- need_to_purge.insert(sn);
+
+ for (vector<__u32>::const_iterator p = mds->mdsmap->get_data_pg_pools().begin();
+ p != mds->mdsmap->get_data_pg_pools().end();
+ p++)
+ need_to_purge[*p].insert(sn);
+
pending_destroy.erase(tid);
}
else if (pending_noop.count(tid)) {
void SnapServer::_server_update(bufferlist& bl)
{
bufferlist::iterator p = bl.begin();
- vector<snapid_t> purge;
+ map<int, vector<snapid_t> > purge;
::decode(purge, p);
dout(7) << "_server_update purged " << purge << dendl;
- for (vector<snapid_t>::iterator p = purge.begin();
+ for (map<int, vector<snapid_t> >::iterator p = purge.begin();
p != purge.end();
- p++)
- need_to_purge.erase(*p);
+ p++) {
+ for (vector<snapid_t>::iterator q = p->second.begin();
+ q != p->second.end();
+ q++)
+ need_to_purge[p->first].erase(*q);
+ if (need_to_purge[p->first].empty())
+ need_to_purge.erase(p->first);
+ }
version++;
}
}
dout(10) << "check_osd_map need_to_purge=" << need_to_purge << dendl;
- vector<snapid_t> purge;
- vector<snapid_t> purged;
+ map<int, vector<snapid_t> > all_purge;
+ map<int, vector<snapid_t> > all_purged;
- for (set<snapid_t>::iterator p = need_to_purge.begin();
+ for (map<int, set<snapid_t> >::iterator p = need_to_purge.begin();
p != need_to_purge.end();
p++) {
- if (mds->osdmap->is_removed_snap(*p)) {
- dout(10) << " osdmap marks " << *p << " as removed" << dendl;
- purged.push_back(*p);
- } else {
- purge.push_back(*p);
+ int id = p->first;
+ const pg_pool_t& pi = mds->osdmap->get_pg_pool(id);
+ for (set<snapid_t>::iterator q = p->second.begin();
+ q != p->second.end();
+ q++) {
+ if (pi.is_removed_snap(*q)) {
+ dout(10) << " osdmap marks " << *q << " as removed" << dendl;
+ all_purged[id].push_back(*q);
+ } else {
+ all_purge[id].push_back(*q);
+ }
}
}
- if (purged.size()) {
+ if (all_purged.size()) {
// prepare to remove from need_to_purge list
bufferlist bl;
- ::encode(purged, bl);
+ ::encode(all_purged, bl);
do_server_update(bl);
}
- if (!purge.empty()) {
- dout(10) << "requesting removal of " << purge << dendl;
- MRemoveSnaps *m = new MRemoveSnaps(purge);
+ if (!all_purge.empty()) {
+ dout(10) << "requesting removal of " << all_purge << dendl;
+ MRemoveSnaps *m = new MRemoveSnaps(all_purge);
int mon = mds->monmap->pick_mon();
mds->messenger->send_message(m, mds->monmap->get_inst(mon));
}
protected:
snapid_t last_snap;
map<snapid_t, SnapInfo> snaps;
- set<snapid_t> need_to_purge;
+ map<int, set<snapid_t> > need_to_purge;
map<version_t, SnapInfo> pending_create;
map<version_t, snapid_t> pending_destroy;
#include "msg/Message.h"
struct MRemoveSnaps : public Message {
- vector<snapid_t> snaps;
+ map<int, vector<snapid_t> > snaps;
MRemoveSnaps() :
Message(MSG_REMOVE_SNAPS) { }
- MRemoveSnaps(vector<snapid_t>& s) :
+ MRemoveSnaps(map<int, vector<snapid_t> >& s) :
Message(MSG_REMOVE_SNAPS) {
snaps.swap(s);
}
{
dout(7) << "preprocess_remove_snaps " << *m << dendl;
- for (vector<snapid_t>::iterator p = m->snaps.begin();
- p != m->snaps.end();
- p++) {
- if (*p > osdmap.max_snap ||
- !osdmap.removed_snaps.contains(*p))
- return false;
+ for (map<int, vector<snapid_t> >::iterator q = m->snaps.begin();
+ q != m->snaps.end();
+ q++) {
+ if (!osdmap.have_pg_pool(q->first)) {
+ dout(10) << " ignoring removed_snaps " << q->second << " on non-existant pool " << q->first << dendl;
+ continue;
+ }
+ const pg_pool_t& pi = osdmap.get_pg_pool(q->first);
+ for (vector<snapid_t>::iterator p = q->second.begin();
+ p != q->second.end();
+ p++) {
+ if (*p > pi.get_snap_seq() ||
+ !pi.removed_snaps.contains(*p))
+ return false;
+ }
}
delete m;
return true;
{
dout(7) << "prepare_remove_snaps " << *m << dendl;
- snapid_t max;
- for (vector<snapid_t>::iterator p = m->snaps.begin();
+ for (map<int, vector<snapid_t> >::iterator p = m->snaps.begin();
p != m->snaps.end();
p++) {
- if (*p > max)
- max = *p;
-
- if (!osdmap.removed_snaps.contains(*p) &&
- !pending_inc.removed_snaps.contains(*p)) {
- dout(10) << " adding " << *p << " to removed_snaps" << dendl;
- pending_inc.removed_snaps.insert(*p);
+ pg_pool_t& pi = osdmap.pools[p->first];
+ for (vector<snapid_t>::iterator q = p->second.begin();
+ q != p->second.end();
+ q++) {
+ if (!pi.removed_snaps.contains(*q) &&
+ (!pending_inc.new_pools.count(p->first) ||
+ !pending_inc.new_pools[p->first].removed_snaps.contains(*q))) {
+ if (pending_inc.new_pools.count(p->first) == 0)
+ pending_inc.new_pools[p->first] = pi;
+ pg_pool_t& newpi = pending_inc.new_pools[p->first];
+ newpi.removed_snaps.insert(*q);
+ dout(10) << " pool " << p->first << " removed_snaps added " << *q
+ << " (now " << newpi.removed_snaps << ")" << dendl;
+ if (*q > newpi.get_snap_seq()) {
+ dout(10) << " pool " << p->first << " snap_seq " << newpi.get_snap_seq() << " -> " << *q << dendl;
+ newpi.set_snap_seq(*q);
+ }
+ newpi.set_snap_epoch(pending_inc.epoch);
+ }
}
}
- if (max > osdmap.max_snap &&
- max > pending_inc.new_max_snap) {
- dout(10) << " new_max_snap " << max << dendl;
- pending_inc.new_max_snap = max;
- } else {
- dout(10) << " max_snap " << osdmap.max_snap << " still >= " << max << dendl;
- }
-
delete m;
return true;
}
// ======================================================
// PG's
+PGPool *OSD::_lookup_pool(int id)
+{
+ if (pool_map.count(id))
+ return pool_map[id];
+ return 0;
+}
+
+PGPool* OSD::_get_pool(int id)
+{
+ PGPool *p = _lookup_pool(id);
+ if (!p) {
+ p = new PGPool(id);
+ pool_map[id] = p;
+ p->get();
+
+ const pg_pool_t& pi = osdmap->get_pg_pool(id);
+ p->info = pi;
+ p->snapc = pi.get_snap_context();
+ }
+ dout(10) << "_get_pool " << p->id << " " << p->num_pg << " -> " << (p->num_pg+1) << dendl;
+ p->num_pg++;
+ return p;
+}
+
+void OSD::_put_pool(int id)
+{
+ PGPool *p = _lookup_pool(id);
+ dout(10) << "_put_pool " << id << " " << p->num_pg << " -> " << (p->num_pg-1) << dendl;
+ p->num_pg--;
+ if (!p->num_pg) {
+ pool_map.erase(id);
+ p->put();
+ }
+}
+
+
PG *OSD::_open_lock_pg(pg_t pgid, bool no_lockdep_check)
{
assert(osd_lock.is_locked());
+ PGPool *pool = _get_pool(pgid.pool());
+
// create
PG *pg;
sobject_t logoid = make_pg_log_oid(pgid);
if (osdmap->get_pg_type(pgid) == CEPH_PG_TYPE_REP)
- pg = new ReplicatedPG(this, pgid, logoid);
+ pg = new ReplicatedPG(this, pool, pgid, logoid);
//else if (pgid.is_raid4())
//pg = new RAID4PG(this, pgid);
else
// remove from map
pg_map.erase(pgid);
+ _put_pool(pgid.pool());
+
// unlock, and probably delete
pg->unlock();
pg->put(); // will delete, if last reference
OSDMap *newmap = new OSDMap;
newmap->decode(bl);
- // fake inc->removed_snaps
- inc.removed_snaps = newmap->get_removed_snaps();
- inc.removed_snaps.subtract(osdmap->get_removed_snaps());
-
// kill connections to newly down osds
set<int> old;
osdmap->get_all_osds(old);
break;
}
+ // update pools
+ for (map<int, PGPool*>::iterator p = pool_map.begin();
+ p != pool_map.end();
+ p++) {
+ const pg_pool_t& pi = osdmap->get_pg_pool(p->first);
+ if (pi.get_snap_epoch() == cur+1) {
+ PGPool *pool = p->second;
+ pool->new_removed_snaps = pi.removed_snaps;
+ pool->new_removed_snaps.subtract(pool->info.removed_snaps);
+ dout(10) << " pool " << p->first << " removed_snaps " << pool->info.removed_snaps
+ << " -> " << pi.removed_snaps
+ << ", new is " << pool->new_removed_snaps << ")"
+ << dendl;
+ pool->info = pi;
+ pool->snapc = pi.get_snap_context();
+ } else {
+ dout(10) << " pool " << p->first << " unchanged (snap_epoch = " << pi.get_snap_epoch() << ")" << dendl;
+ }
+ }
+
cur++;
superblock.current_epoch = cur;
- advance_map(t, inc.removed_snaps);
+ advance_map(t);
advanced = true;
had_map_since = g_clock.now();
}
* scan placement groups, initiate any replication
* activities.
*/
-void OSD::advance_map(ObjectStore::Transaction& t, interval_set<snapid_t>& removed_snaps)
+void OSD::advance_map(ObjectStore::Transaction& t)
{
assert(osd_lock.is_locked());
dout(7) << "advance_map epoch " << osdmap->get_epoch()
<< " " << pg_map.size() << " pgs"
- << " removed_snaps " << removed_snaps
<< dendl;
if (!up_epoch &&
pg->lock();
// adjust removed_snaps?
- if (!removed_snaps.empty()) {
- for (map<snapid_t,snapid_t>::iterator p = removed_snaps.m.begin();
- p != removed_snaps.m.end();
+ if (!pg->pool->new_removed_snaps.empty()) {
+ for (map<snapid_t,snapid_t>::iterator p = pg->pool->new_removed_snaps.m.begin();
+ p != pg->pool->new_removed_snaps.m.end();
p++)
for (snapid_t t = 0; t < p->second; ++t)
- pg->info.dead_snaps.insert(p->first + t);
- dout(10) << *pg << " dead_snaps now " << pg->info.dead_snaps << dendl;
+ pg->info.snap_trimq.insert(p->first + t);
+ dout(10) << *pg << " snap_trimq now " << pg->info.snap_trimq << dendl;
pg->dirty_info = true;
- }
+ }
// no change?
if (tacting == pg->acting && (pg->is_active() || !pg->prior_set_affected(osdmap))) {
pg->lock();
if (pg->is_active()) {
// update started counter
- if (!pg->info.dead_snaps.empty())
+ if (!pg->info.snap_trimq.empty())
pg->queue_snap_trim();
}
else if (pg->is_primary() &&
void note_down_osd(int osd);
void note_up_osd(int osd);
- void advance_map(ObjectStore::Transaction& t, interval_set<snapid_t>& removed_snaps);
+ void advance_map(ObjectStore::Transaction& t);
void activate_map(ObjectStore::Transaction& t);
// osd map cache (past osd maps)
protected:
// -- placement groups --
+ map<int, PGPool*> pool_map;
hash_map<pg_t, PG*> pg_map;
hash_map<pg_t, list<Message*> > waiting_for_pg;
+ PGPool *_lookup_pool(int id);
+ PGPool *_get_pool(int id);
+ void _put_pool(int id);
+
bool _have_pg(pg_t pgid);
PG *_lookup_lock_pg(pg_t pgid);
PG *_open_lock_pg(pg_t pg, bool no_lockdep_check=false); // create new PG (in memory)
<< "created " << get_created() << "\n"
<< "modifed " << get_modified() << "\n"
<< std::endl;
- for (map<int,pg_pool_t>::iterator p = pools.begin(); p != pools.end(); p++)
+ for (map<int,pg_pool_t>::iterator p = pools.begin(); p != pools.end(); p++) {
out << "pg_pool " << p->first
<< " '" << pool_name[p->first]
<< "' " << p->second << "\n";
+ if (!p->second.removed_snaps.empty())
+ out << "\tremoved_snaps " << p->second.removed_snaps << "\n";
+ }
out << std::endl;
out << "max_osd " << get_max_osd() << "\n";
out << "blacklist " << p->first << " expires " << p->second << "\n";
// ignore pg_swap_primary
-
- out << "max_snap " << get_max_snap() << "\n"
- << "removed_snaps " << get_removed_snaps() << "\n"
- << std::endl;
}
void OSDMap::print_summary(ostream& out)
map<entity_addr_t,utime_t> new_blacklist;
vector<entity_addr_t> old_blacklist;
- snapid_t new_max_snap;
- interval_set<snapid_t> removed_snaps;
-
void encode(bufferlist& bl) {
// base
::encode(fsid, bl);
::encode(new_lost, bl);
::encode(new_pg_swap_primary, bl);
::encode(old_pg_swap_primary, bl);
- ::encode(new_max_snap, bl);
- ::encode(removed_snaps.m, bl);
::encode(new_blacklist, bl);
::encode(old_blacklist, bl);
}
::decode(new_lost, p);
::decode(new_pg_swap_primary, p);
::decode(old_pg_swap_primary, p);
- ::decode(new_max_snap, p);
- ::decode(removed_snaps.m, p);
::decode(new_blacklist, p);
::decode(old_blacklist, p);
}
map<int,nstring> pool_name;
map<nstring,int> name_pool;
map<pg_t,uint32_t> pg_swap_primary; // force new osd to be pg primary (if already a member)
- snapid_t max_snap;
- interval_set<snapid_t> removed_snaps;
hash_map<entity_addr_t,utime_t> blacklist;
public:
OSDMap() : epoch(0),
flags(0),
- max_osd(0), max_snap(0) {
+ max_osd(0) {
memset(&fsid, 0, sizeof(fsid));
}
const utime_t& get_created() const { return created; }
const utime_t& get_modified() const { return modified; }
- snapid_t get_max_snap() { return max_snap; }
- bool is_removed_snap(snapid_t sn) {
- if (sn > max_snap)
- return false;
- return removed_snaps.contains(sn);
- }
- interval_set<snapid_t>& get_removed_snaps() { return removed_snaps; }
-
bool is_blacklisted(const entity_addr_t& a) {
return !blacklist.empty() && blacklist.count(a);
}
i++)
pg_swap_primary.erase(*i);
- // snaps
- if (inc.new_max_snap > 0)
- max_snap = inc.new_max_snap;
- removed_snaps.union_of(inc.removed_snaps);
-
// blacklist
for (map<entity_addr_t,utime_t>::iterator p = inc.new_blacklist.begin();
p != inc.new_blacklist.end();
::encode(pool_name, blist);
::encode(pg_swap_primary, blist);
- ::encode(max_snap, blist);
- ::encode(removed_snaps.m, blist);
::encode(blacklist, blist);
}
::decode(pg_swap_primary, p);
- ::decode(max_snap, p);
- ::decode(removed_snaps.m, p);
::decode(blacklist, p);
}
return -1;
}
-
+ bool have_pg_pool(int p) const {
+ return pools.count(p);
+ }
const pg_pool_t& get_pg_pool(int p) {
assert(pools.count(p));
return pools[p];
// clean up stray objects, snaps
clean_up_local(t);
- if (!info.dead_snaps.empty())
+ if (!info.snap_trimq.empty())
queue_snap_trim();
// init complete pointer
class MOSDSubOpReply;
class MOSDPGInfo;
+
+struct PGPool {
+ int id;
+ atomic_t nref;
+ int num_pg;
+
+ pg_pool_t info;
+ SnapContext snapc; // the default pool snapc, ready to go.
+
+ interval_set<snapid_t> new_removed_snaps; // newly removed in the last epoch
+
+ PGPool(int i) : id(i), num_pg(0) {}
+
+ void get() { nref.inc(); }
+ void put() {
+ if (nref.dec() == 0)
+ delete this;
+ }
+};
+
+
/** PG - Replica Placement Group
*
*/
eversion_t log_bottom; // oldest log entry.
bool log_backlog; // do we store a complete log?
- set<snapid_t> dead_snaps; // snaps we need to trim
+ set<snapid_t> snap_trimq; // snaps we need to trim
pg_stat_t stats;
::encode(log_backlog, bl);
::encode(stats, bl);
history.encode(bl);
- ::encode(dead_snaps, bl);
+ ::encode(snap_trimq, bl);
}
void decode(bufferlist::iterator &bl) {
::decode(pgid, bl);
::decode(log_backlog, bl);
::decode(stats, bl);
history.decode(bl);
- ::decode(dead_snaps, bl);
+ ::decode(snap_trimq, bl);
}
};
WRITE_CLASS_ENCODER(Info::History)
/*** PG ****/
protected:
OSD *osd;
+ PGPool *pool;
/** locking and reference counting.
* I destroy myself when the reference count hits zero.
public:
- PG(OSD *o, pg_t p, const sobject_t& oid) :
- osd(o),
+ PG(OSD *o, PGPool *_pool, pg_t p, const sobject_t& oid) :
+ osd(o), pool(_pool),
_lock("PG::_lock"),
ref(0), deleted(false), dirty_info(false), dirty_log(false),
info(p), log_oid(oid),
pg_stats_valid(false),
finish_sync_event(NULL)
{
+ pool->get();
+ }
+ virtual ~PG() {
+ pool->put();
}
- virtual ~PG() { }
pg_t get_pgid() const { return info.pgid; }
int get_nrep() const { return acting.size(); }
if (lost)
out << " l=" << lost;
}
- if (pg.info.dead_snaps.size())
- out << " dead=" << pg.info.dead_snaps;
+ if (pg.info.snap_trimq.size())
+ out << " snaptrimq=" << pg.info.snap_trimq;
out << "]";
lock();
dout(10) << "snap_trimmer start" << dendl;
- while (info.dead_snaps.size() &&
+ while (info.snap_trimq.size() &&
is_active()) {
- snapid_t sn = *info.dead_snaps.begin();
+ snapid_t sn = *info.snap_trimq.begin();
coll_t c = info.pgid.to_snap_coll(sn);
vector<sobject_t> ls;
osd->store->collection_list(c, ls);
// remove snaps
vector<snapid_t> newsnaps;
for (unsigned i=0; i<snaps.size(); i++)
- if (!osd->osdmap->is_removed_snap(snaps[i]))
+ if (!osd->_lookup_pool(info.pgid.pool())->info.is_removed_snap(snaps[i]))
newsnaps.push_back(snaps[i]);
else {
vector<snapid_t>::iterator q = snapset.snaps.begin();
t.remove_collection(c);
osd->store->apply_transaction(t);
- info.dead_snaps.erase(sn);
+ info.snap_trimq.erase(sn);
}
// done
public:
- ReplicatedPG(OSD *o, pg_t p, const sobject_t& oid) :
- PG(o, p, oid)
+ ReplicatedPG(OSD *o, PGPool *_pool, pg_t p, const sobject_t& oid) :
+ PG(o, _pool, p, oid)
{ }
~ReplicatedPG() {}
epoch_t get_snap_epoch() const { return v.snap_epoch; }
snapid_t get_snap_seq() const { return snapid_t(v.snap_seq); }
+ void set_snap_seq(snapid_t s) { v.snap_seq = s; }
+ void set_snap_epoch(epoch_t e) { v.snap_epoch = e; }
+
bool is_rep() const { return get_type() == CEPH_PG_TYPE_REP; }
bool is_raid4() const { return get_type() == CEPH_PG_TYPE_RAID4; }
lpgp_num_mask = (1 << calc_bits_of(v.lpgp_num-1)) - 1;
}
- bool is_removed_snap(snapid_t s) {
+ bool is_removed_snap(snapid_t s) const {
if (snaps.size())
return snaps.count(s) == 0;
return s <= get_snap_seq() && removed_snaps.contains(s);