}
f.close_section(); //blacklist
f.flush(ss);
+ } else if (command == "dump_watchers") {
+ list<obj_watch_item_t> watchers;
+ osd_lock.Lock();
+ // scan pg's
+ for (hash_map<pg_t,PG*>::iterator it = pg_map.begin();
+ it != pg_map.end();
+ ++it) {
+
+ list<obj_watch_item_t> pg_watchers;
+ PG *pg = it->second;
+ pg->lock();
+ pg->get_watchers(pg_watchers);
+ pg->unlock();
+ watchers.splice(watchers.end(), pg_watchers);
+ }
+ osd_lock.Unlock();
+
+ JSONFormatter f(true);
+ f.open_array_section("watchers");
+ for (list<obj_watch_item_t>::iterator it = watchers.begin();
+ it != watchers.end(); ++it) {
+
+ f.open_array_section("watch");
+
+ f.dump_string("object", it->obj.oid.name);
+
+ f.open_object_section("entity_name");
+ it->wi.name.dump(&f);
+ f.close_section(); //entity_name_t
+
+ f.dump_int("cookie", it->wi.cookie);
+ f.dump_int("timeout", it->wi.timeout_seconds);
+
+ f.open_object_section("entity_addr_t");
+ it->wi.addr.dump(&f);
+ f.close_section(); //entity_addr_t
+
+ f.close_section(); //watch
+ }
+
+ f.close_section(); //watches
+ f.flush(ss);
} else {
assert(0 == "broken asok registration");
}
r = admin_socket->register_command("dump_blacklist", asok_hook,
"dump_blacklist");
assert(r == 0);
+ r = admin_socket->register_command("dump_watchers", asok_hook,
+ "dump_watchers");
+ assert(r == 0);
test_ops_hook = new TestOpsSocketHook(&(this->service), this->store);
r = admin_socket->register_command("setomapval", test_ops_hook,
cct->get_admin_socket()->unregister_command("dump_historic_ops");
cct->get_admin_socket()->unregister_command("dump_op_pq_state");
cct->get_admin_socket()->unregister_command("dump_blacklist");
+ cct->get_admin_socket()->unregister_command("dump_watchers");
delete asok_hook;
asok_hook = NULL;
virtual void on_flushed() = 0;
virtual void on_shutdown() = 0;
virtual void check_blacklisted_watchers() = 0;
+ virtual void get_watchers(std::list<obj_watch_item_t>&) = 0;
};
ostream& operator<<(ostream& out, const PG& pg);
// -------------------------------------------------------
+void ReplicatedPG::get_watchers(list<obj_watch_item_t> &pg_watchers)
+{
+ for (map<hobject_t, ObjectContext*>::iterator i = object_contexts.begin();
+ i != object_contexts.end();
+ ++i) {
+ i->second->get();
+ get_obc_watchers(i->second, pg_watchers);
+ put_object_context(i->second);
+ }
+}
+
+void ReplicatedPG::get_obc_watchers(ObjectContext *obc, list<obj_watch_item_t> &pg_watchers)
+{
+ for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator j =
+ obc->watchers.begin();
+ j != obc->watchers.end();
+ ++j) {
+ obj_watch_item_t owi;
+
+ owi.obj = obc->obs.oi.soid;
+ owi.wi.addr = j->second->get_peer_addr();
+ owi.wi.name = j->second->get_entity();
+ owi.wi.cookie = j->second->get_cookie();
+ owi.wi.timeout_seconds = j->second->get_timeout();
+
+ dout(30) << "watch: Found oid=" << owi.obj << " addr=" << owi.wi.addr
+ << " name=" << owi.wi.name << " cookie=" << owi.wi.cookie << dendl;
+
+ pg_watchers.push_back(owi);
+ }
+}
+
void ReplicatedPG::check_blacklisted_watchers()
{
dout(20) << "ReplicatedPG::check_blacklisted_watchers for pg " << get_pgid() << dendl;
void populate_obc_watchers(ObjectContext *obc);
void check_blacklisted_obc_watchers(ObjectContext *);
void check_blacklisted_watchers();
+ void get_watchers(list<obj_watch_item_t> &pg_watchers);
+ void get_obc_watchers(ObjectContext *obc, list<obj_watch_item_t> &pg_watchers);
public:
void handle_watch_timeout(WatchRef watch);
protected:
uint64_t get_cookie() const { return cookie; }
entity_name_t get_entity() const { return entity; }
entity_addr_t get_peer_addr() const { return addr; }
+ uint32_t get_timeout() const { return timeout; }
/// Generates context for use if watch timeout is delayed by scrub or recovery
Context *get_delayed_cb();
};
WRITE_CLASS_ENCODER(watch_item_t)
+struct obj_watch_item_t {
+ hobject_t obj;
+ watch_item_t wi;
+};
+
/**
* obj list watch response format
*