o->decode(bl);
if (o->test_flag(CEPH_OSDMAP_FULL))
last_marked_full = e;
+ set_pool_last_map_marked_full(o, e);
hobject_t fulloid = get_osdmap_pobject_name(e);
t.write(META_COLL, fulloid, 0, bl.length(), bl);
if (o->test_flag(CEPH_OSDMAP_FULL))
last_marked_full = e;
+ set_pool_last_map_marked_full(o, e);
bufferlist fbl;
o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED);
}
}
+ // calc actual pgid
+ pg_t _pgid = m->get_pg();
+ int64_t pool = _pgid.pool();
if (op->may_write()) {
// full?
if ((service.check_failsafe_full() ||
return;
}
+ const pg_pool_t *pi = osdmap->get_pg_pool(pool);
+ if (!pi) {
+ return;
+ }
+ // pool is full ?
+ map<int64_t, epoch_t> &pool_last_map_marked_full = superblock.pool_last_map_marked_full;
+ if (pi->has_flag(pg_pool_t::FLAG_FULL) ||
+ (pool_last_map_marked_full.count(pool) && (m->get_map_epoch() < pool_last_map_marked_full[pool]))) {
+ return;
+ }
+
// invalid?
if (m->get_snapid() != CEPH_NOSNAP) {
service.reply_op_error(op, -EINVAL);
}
}
- // calc actual pgid
- pg_t _pgid = m->get_pg();
- int64_t pool = _pgid.pool();
if ((m->get_flags() & CEPH_OSD_FLAG_PGOP) == 0 &&
osdmap->have_pg_pool(pool))
_pgid = osdmap->raw_pg_to_pg(_pgid);
}
in_use.insert(got.begin(), got.end());
}
+
+void OSD::set_pool_last_map_marked_full(OSDMap *o, epoch_t &e)
+{
+ map<int64_t, epoch_t> &pool_last_map_marked_full = superblock.pool_last_map_marked_full;
+ for (map<int64_t, pg_pool_t>::const_iterator it = o->get_pools().begin();
+ it != o->get_pools().end(); it++) {
+ bool exist = pool_last_map_marked_full.count(it->first);
+ if (it->second.has_flag(pg_pool_t::FLAG_FULL) && !exist)
+ pool_last_map_marked_full[it->first] = e;
+ if (it->second.has_flag(pg_pool_t::FLAG_FULL) &&
+ (exist && pool_last_map_marked_full.count(it->first) < e))
+ pool_last_map_marked_full[it->first] = e;
+ }
+}
void OSDSuperblock::encode(bufferlist &bl) const
{
- ENCODE_START(6, 5, bl);
+ ENCODE_START(7, 5, bl);
::encode(cluster_fsid, bl);
::encode(whoami, bl);
::encode(current_epoch, bl);
::encode(mounted, bl);
::encode(osd_fsid, bl);
::encode(last_map_marked_full, bl);
+ ::encode(pool_last_map_marked_full, bl);
ENCODE_FINISH(bl);
}
::decode(osd_fsid, bl);
if (struct_v >= 6)
::decode(last_map_marked_full, bl);
+ if (struct_v >= 7)
+ ::decode(pool_last_map_marked_full, bl);
DECODE_FINISH(bl);
}