osd->complete_notify((void *)notif, obc);
}
+void ReplicatedPG::remove_watcher(ObjectContext *obc, entity_name_t entity)
+{
+ dout(10) << "remove_watcher " << *obc << " " << entity << dendl;
+ map<entity_name_t, OSD::Session *>::iterator iter = obc->watchers.find(entity);
+ assert(iter != obc->watchers.end());
+ OSD::Session *session = iter->second;
+ dout(10) << "remove_watcher removing session " << session << dendl;
+ obc->watchers.erase(iter);
+ session->watches.erase(obc);
+ put_object_context(obc);
+ session->put();
+}
+
+void ReplicatedPG::remove_watchers()
+{
+ assert_locked();
+
+ dout(10) << "remove_watchers" << dendl;
+
+ osd->watch_lock.Lock();
+ for (map<sobject_t, ObjectContext*>::iterator iter = object_contexts.begin();
+ iter != object_contexts.end();
+ ++iter) {
+ ObjectContext *obc = iter->second;
+ for (map<entity_name_t, OSD::Session *>::iterator witer = obc->watchers.begin();
+ witer != obc->watchers.end();
+ ++witer) {
+ remove_watcher(obc, witer->first);
+ }
+ }
+ osd->watch_lock.Unlock();
+}
+
// ========================================================================
// low level osd ops
if (ctx->watch_disconnect) {
if (iter != obc->watchers.end()) {
- dout(10) << " disconnected session " << iter->second << dendl;
- obc->watchers.erase(iter);
- session->watches.erase(obc);
- put_object_context(obc);
- iter->second->put();
+ remove_watcher(obc, entity);
} else {
assert(obc->unconnected_watchers.count(entity));
obc->unconnected_watchers.erase(entity);
{
dout(10) << "on_shutdown" << dendl;
apply_and_flush_repops(false);
+ remove_watchers();
}
void ReplicatedPG::on_change()