int OSDMap::apply_incremental(const Incremental &inc)
{
+ new_blacklist_entries = false;
if (inc.epoch == 1)
fsid = inc.fsid;
else if (inc.fsid != fsid)
// blacklist
for (map<entity_addr_t,utime_t>::const_iterator p = inc.new_blacklist.begin();
p != inc.new_blacklist.end();
- ++p)
+ ++p) {
blacklist[p->first] = p->second;
+ new_blacklist_entries = true;
+ }
for (vector<entity_addr_t>::const_iterator p = inc.old_blacklist.begin();
p != inc.old_blacklist.end();
++p)
epoch_t cluster_snapshot_epoch;
string cluster_snapshot;
+ bool new_blacklist_entries;
public:
std::tr1::shared_ptr<CrushWrapper> crush; // hierarchical map
pg_temp(new map<pg_t,vector<int> >),
osd_uuid(new vector<uuid_d>),
cluster_snapshot_epoch(0),
+ new_blacklist_entries(false),
crush(new CrushWrapper) {
memset(&fsid, 0, sizeof(fsid));
}
void dump_json(ostream& out) const;
void dump(Formatter *f) const;
static void generate_test_instances(list<OSDMap*>& o);
+ bool check_new_blacklist_entries() const { return new_blacklist_entries; }
};
WRITE_CLASS_ENCODER_FEATURES(OSDMap)
WRITE_CLASS_ENCODER_FEATURES(OSDMap::Incremental)
<< last_persisted_osdmap_ref->get_epoch()
<< " while current is " << osdmap_ref->get_epoch() << dendl;
}
+ if (osdmap_ref->check_new_blacklist_entries()) check_blacklisted_watchers();
}
void PG::handle_loaded(RecoveryCtx *rctx)
virtual void on_activate() = 0;
virtual void on_flushed() = 0;
virtual void on_shutdown() = 0;
+ virtual void check_blacklisted_watchers() = 0;
};
ostream& operator<<(ostream& out, const PG& pg);
dout(10) << "watch: ctx->obc=" << (void *)obc << " cookie=" << cookie
<< " oi.version=" << oi.version.version << " ctx->at_version=" << ctx->at_version << dendl;
dout(10) << "watch: oi.user_version=" << oi.user_version.version << dendl;
+ dout(10) << "watch: peer_addr="
+ << ctx->op->request->get_connection()->get_peer_addr() << dendl;
// FIXME: where does the timeout come from?
watch_info_t w(cookie, 30,
// -------------------------------------------------------
+void ReplicatedPG::check_blacklisted_watchers()
+{
+ dout(20) << "ReplicatedPG::check_blacklisted_watchers for pg " << get_pgid() << dendl;
+ for (map<hobject_t, ObjectContext*>::iterator i = object_contexts.begin();
+ i != object_contexts.end();
+ ++i) {
+ i->second->get();
+ check_blacklisted_obc_watchers(i->second);
+ put_object_context(i->second);
+ }
+}
+
+void ReplicatedPG::check_blacklisted_obc_watchers(ObjectContext *obc)
+{
+ dout(20) << "ReplicatedPG::check_blacklisted_obc_watchers for obc " << obc->obs.oi.soid << dendl;
+ for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator k =
+ obc->watchers.begin();
+ k != obc->watchers.end();
+ ) {
+ //Advance iterator now so handle_watch_timeout() can erase element
+ map<pair<uint64_t, entity_name_t>, WatchRef>::iterator j = k++;
+ dout(30) << "watch: Found " << j->second->get_entity() << " cookie " << j->second->get_cookie() << dendl;
+ entity_addr_t ea = j->second->get_peer_addr();
+ dout(30) << "watch: Check entity_addr_t " << ea << dendl;
+ if (get_osdmap()->is_blacklisted(ea)) {
+ dout(10) << "watch: Found blacklisted watcher for " << ea << dendl;
+ assert(j->second->get_pg() == this);
+ handle_watch_timeout(j->second);
+ }
+ }
+}
+
void ReplicatedPG::populate_obc_watchers(ObjectContext *obc)
{
assert(is_active());
make_pair(p->first.first, p->first.second),
watch));
}
+ // Look for watchers from blacklisted clients and drop
+ check_blacklisted_obc_watchers(obc);
}
void ReplicatedPG::handle_watch_timeout(WatchRef watch)
map<hobject_t, map<client_t, tid_t> > debug_op_order;
void populate_obc_watchers(ObjectContext *obc);
+ void check_blacklisted_obc_watchers(ObjectContext *);
+ void check_blacklisted_watchers();
public:
void handle_watch_timeout(WatchRef watch);
protected: