MWatchNotify *reply = notif->reply;
client_messenger->send_message(reply, notif->session->con);
notif->session->put();
+ notif->session->con->put();
watch->remove_notification(notif);
if (notif->timeout)
watch_timer.cancel_event(notif->timeout);
}
}
-bool OSD::ms_handle_reset(Connection *con)
+void OSD::handle_watch_timeout(void *obc,
+ ReplicatedPG *pg,
+ entity_name_t entity,
+ utime_t expire)
{
- dout(0) << "OSD::ms_handle_reset()" << dendl;
- OSD::Session *session = (OSD::Session *)con->get_priv();
- if (!session)
- return false;
-
- dout(0) << "OSD::ms_handle_reset() s=" << (void *)session << dendl;
+ pg->lock();
+ pg->handle_watch_timeout(obc, entity, expire);
+ pg->unlock();
+ pg->put();
+}
+void OSD::disconnect_session_watches(Session *session)
+{
// get any watched obc's
map<ReplicatedPG::ObjectContext *, pg_t> obcs;
watch_lock.Lock();
ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)oiter->first;
dout(0) << "obc=" << (void *)obc << dendl;
+ ReplicatedPG *pg = static_cast<ReplicatedPG *>(lookup_lock_raw_pg(oiter->second));
+ assert(pg);
obc->lock.Lock();
watch_lock.Lock();
/* NOTE! fix this one, should be able to just lookup entity name,
watch_info_t& w = obc->obs.oi.watchers[entity];
utime_t expire = ceph_clock_now(g_ceph_context);
expire += w.timeout_seconds;
- obc->unconnected_watchers[entity] = expire;
+ pg->register_unconnected_watcher(obc, entity, expire);
dout(10) << " disconnected watch " << w << " by " << entity << " session " << session
<< ", expires " << expire << dendl;
obc->watchers.erase(witer++);
+ session->put();
}
if (witer == obc->watchers.end())
break;
}
watch_lock.Unlock();
obc->lock.Unlock();
+ pg->put_object_context(obc);
/* now drop a reference to that obc */
- put_object_context(obc, oiter->second);
+ pg->unlock();
}
+}
-#if 0
- // FIXME: do we really want to _cancel_ notifications here?
- // shouldn't they time out in the usual way? because this person
- // might/should immediately reconnect...
- watch_lock.Lock();
- for (map<void *, entity_name_t>::iterator notif_iter = session->notifs.begin();
- notif_iter != session->notifs.end();
- ++notif_iter) {
- Watch::Notification *notif = (Watch::Notification *)notif_iter->first;
- entity_name_t& dest = notif_iter->second;
- dout(0) << "ms_handle_reset: ack notification for notif=" << (void *)notif << " entity=" << dest << dendl;
- ack_notification(dest, notif);
- }
- session->notifs.clear();
- watch_lock.Unlock();
-#endif
-
+bool OSD::ms_handle_reset(Connection *con)
+{
+ dout(0) << "OSD::ms_handle_reset()" << dendl;
+ OSD::Session *session = (OSD::Session *)con->get_priv();
+ if (!session)
+ return false;
+ dout(0) << "OSD::ms_handle_reset() s=" << (void *)session << dendl;
+ disconnect_session_watches(session);
session->put();
return true;
}
assert(niter != obc->notifs.end());
niter->first->session->put();
+ niter->first->session->con->put();
obc->notifs.erase(niter);
put_object_context(obc);
) {
map<hobject_t, ObjectContext *>::iterator iter = oiter++;
ObjectContext *obc = iter->second;
+ obc->ref++;
for (map<entity_name_t, OSD::Session *>::iterator witer = obc->watchers.begin();
witer != obc->watchers.end();
remove_watcher(obc, (witer++)->first));
+ for (map<entity_name_t, Context *>::iterator iter = obc->unconnected_watchers.begin();
+ iter != obc->unconnected_watchers.end();
+ ) {
+ map<entity_name_t, Context *>::iterator i = iter++;
+ unregister_unconnected_watcher(obc, i->first);
+ }
for (map<Watch::Notification *, bool>::iterator niter = obc->notifs.begin();
niter != obc->notifs.end();
remove_notify(obc, (niter++)->first));
+ put_object_context(obc);
}
osd->watch_lock.Unlock();
}
session->get();
session->watches[obc] = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
}
- map<entity_name_t, utime_t>::iterator un_iter = obc->unconnected_watchers.find(entity);
- if (un_iter != obc->unconnected_watchers.end())
- obc->unconnected_watchers.erase(un_iter);
-
+ map<entity_name_t, Context *>::iterator un_iter =
+ obc->unconnected_watchers.find(entity);
+ if (un_iter != obc->unconnected_watchers.end()) {
+ unregister_unconnected_watcher(obc, un_iter->first);
+ }
+
map<Watch::Notification *, bool>::iterator niter;
for (niter = obc->notifs.begin(); niter != obc->notifs.end(); ++niter) {
Watch::Notification *notif = niter->first;
remove_watcher(obc, entity);
} else {
assert(obc->unconnected_watchers.count(entity));
- obc->unconnected_watchers.erase(entity);
+ unregister_unconnected_watcher(obc, entity);
}
}
Watch::Notification *notif = new Watch::Notification(ctx->reqid.name, session, p->cookie, p->bl);
session->get(); // notif got a reference
+ session->con->get();
notif->pgid = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
osd->watch->add_notification(notif);
// connected
- for (map<entity_name_t, OSD::Session*>::iterator q = obc->watchers.begin();
- q != obc->watchers.end();
- q++) {
- entity_name_t name = q->first;
- OSD::Session *s = q->second;
- watch_info_t& w = obc->obs.oi.watchers[q->first];
-
- notif->add_watcher(name, Watch::WATCHER_NOTIFIED); // adding before send_message to avoid race
- s->add_notif(notif, name);
-
- MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl);
- osd->client_messenger->send_message(notify_msg, s->con);
- }
+ for (map<entity_name_t, watch_info_t>::iterator i = obc->obs.oi.watchers.begin();
+ i != obc->obs.oi.watchers.end();
+ ++i) {
+ map<entity_name_t, OSD::Session*>::iterator q = obc->watchers.find(i->first);
+ if (q != obc->watchers.end()) {
+ entity_name_t name = q->first;
+ OSD::Session *s = q->second;
+ watch_info_t& w = obc->obs.oi.watchers[q->first];
+
+ notif->add_watcher(name, Watch::WATCHER_NOTIFIED); // adding before send_message to avoid race
+ s->add_notif(notif, name);
- // unconnected
- utime_t now = ceph_clock_now(g_ceph_context);
- for (map<entity_name_t, utime_t>::iterator q = obc->unconnected_watchers.begin();
- q != obc->unconnected_watchers.end();
- q++) {
- entity_name_t name = q->first;
- utime_t expire = q->second;
- if (now < expire)
- notif->add_watcher(name, Watch::WATCHER_PENDING); /* FIXME: should we remove expired unconnected? probably yes */
+ MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl);
+ osd->client_messenger->send_message(notify_msg, s->con);
+ } else {
+ // unconnected
+ utime_t now = ceph_clock_now(g_ceph_context);
+ entity_name_t name = i->first;
+ notif->add_watcher(name, Watch::WATCHER_PENDING);
+ }
}
notif->reply = new MWatchNotify(p->cookie, oi.user_version.version, notif->id, WATCH_NOTIFY_COMPLETE, notif->bl);
void ReplicatedPG::populate_obc_watchers(ObjectContext *obc)
{
+ if (!is_active() || is_degraded_object(obc->obs.oi.soid) ||
+ is_missing_object(obc->obs.oi.soid))
+ return;
+
if (!obc->obs.oi.watchers.empty()) {
+ Mutex::Locker l(osd->watch_lock);
+ assert(obc->unconnected_watchers.size() == 0);
+ assert(obc->watchers.size() == 0);
// populate unconnected_watchers
utime_t now = ceph_clock_now(g_ceph_context);
for (map<entity_name_t, watch_info_t>::iterator p = obc->obs.oi.watchers.begin();
utime_t expire = now;
expire += p->second.timeout_seconds;
dout(10) << " unconnected watcher " << p->first << " will expire " << expire << dendl;
- obc->unconnected_watchers[p->first] = expire;
+ register_unconnected_watcher(obc, p->first, expire);
}
}
}
+void ReplicatedPG::unregister_unconnected_watcher(void *_obc,
+ entity_name_t entity)
+{
+ ObjectContext *obc = static_cast<ObjectContext *>(_obc);
+ osd->watch_timer.cancel_event(obc->unconnected_watchers[entity]);
+ obc->unconnected_watchers.erase(entity);
+ put_object_context(obc);
+ put();
+}
+
+void ReplicatedPG::register_unconnected_watcher(void *_obc,
+ entity_name_t entity,
+ utime_t expire)
+{
+ ObjectContext *obc = static_cast<ObjectContext *>(_obc);
+ pg_t pgid = info.pgid;
+ pgid.set_ps(obc->obs.oi.soid.hash);
+ get();
+ obc->ref++;
+ Context *cb = new Watch::C_WatchTimeout(osd,
+ static_cast<void *>(obc),
+ this,
+ entity, expire);
+ osd->watch_timer.add_event_at(expire, cb);
+ obc->unconnected_watchers[entity] = cb;
+}
+
+void ReplicatedPG::handle_watch_timeout(void *_obc,
+ entity_name_t entity,
+ utime_t expire)
+{
+ ObjectContext *obc = static_cast<ObjectContext *>(_obc);
+ obc->unconnected_watchers.erase(entity);
+ obc->obs.oi.watchers.erase(entity);
+
+ vector<OSDOp> ops;
+ tid_t rep_tid = osd->get_tid();
+ osd_reqid_t reqid(osd->cluster_messenger->get_myname(), 0, rep_tid);
+ OpContext *ctx = new OpContext(NULL, reqid, ops, &obc->obs, obc->ssc, this);
+ ctx->mtime = ceph_clock_now(g_ceph_context);
+
+ ctx->at_version.epoch = osd->osdmap->get_epoch();
+ ctx->at_version.version = log.head.version + 1;
+
+ entity_inst_t nobody;
+
+ /* Currently, mode.try_write always returns true. If this changes, we will
+ * need to delay the repop accordingly */
+ assert(mode.try_write(nobody));
+ RepGather *repop = new_repop(ctx, obc, rep_tid);
+
+ ObjectStore::Transaction *t = &ctx->op_t;
+
+ ctx->log.push_back(Log::Entry(Log::Entry::MODIFY, obc->obs.oi.soid,
+ ctx->at_version,
+ obc->obs.oi.version,
+ osd_reqid_t(), ctx->mtime));
+
+ eversion_t old_last_update = log.head;
+ bool old_exists = repop->obc->obs.exists;
+ uint64_t old_size = repop->obc->obs.oi.size;
+ eversion_t old_version = repop->obc->obs.oi.version;
+
+ obc->obs.oi.prior_version = old_version;
+ obc->obs.oi.version = ctx->at_version;
+ bufferlist bl;
+ ::encode(obc->obs.oi, bl);
+ t->setattr(coll, obc->obs.oi.soid, OI_ATTR, bl);
+
+ ctx->at_version.version++;
+
+ append_log(repop->ctx->log, eversion_t(), repop->ctx->local_t);
+
+ // obc ref swallowed by repop!
+ issue_repop(repop, repop->ctx->mtime, old_last_update, old_exists,
+ old_size, old_version);
+ eval_repop(repop);
+}
ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(const hobject_t& soid,
const object_locator_t& oloc,
osd->take_waiters(waiting_for_degraded_object[soid]);
waiting_for_degraded_object.erase(soid);
}
+ map<hobject_t, ObjectContext *>::iterator i =
+ object_contexts.find(soid);
+ if (i != object_contexts.end()) {
+ populate_obc_watchers(i->second);
+ }
} else {
dout(10) << "pushed " << soid << ", still waiting for push ack from "
<< pushing[soid].size() << " others" << dendl;
obc->obs.exists = true;
obc->obs.oi.decode(oibl);
-
- populate_obc_watchers(obc);
// suck in snapset context?
SnapSetContext *ssc = obc->ssc;
remove_watchers_and_notifies();
}
+void ReplicatedPG::on_activate()
+{
+ for (map<hobject_t, ObjectContext *>::iterator i = object_contexts.begin();
+ i != object_contexts.end();
+ ++i) {
+ populate_obc_watchers(i->second);
+ }
+}
+
void ReplicatedPG::on_change()
{
dout(10) << "on_change" << dendl;
state_clear(PG_STATE_REPAIR);
}
+ context_registry_on_change();
+
// take object waiters
take_object_waiters(waiting_for_missing_object);
take_object_waiters(waiting_for_degraded_object);
p++)
osd->take_waiters(p->second);
waiting_for_ondisk.clear();
-
- context_registry_on_change();
}