}
}
- // any SNAPDIR op needs to have all clones present. treat them as
- // src_obc's so that we track references properly and clean up later.
- map<hobject_t, ObjectContextRef, hobject_t::BitwiseComparator> src_obc;
- if (m->get_snapid() == CEPH_SNAPDIR) {
- for (vector<snapid_t>::iterator p = obc->ssc->snapset.clones.begin();
- p != obc->ssc->snapset.clones.end();
- ++p) {
- hobject_t clone_oid = obc->obs.oi.soid;
- clone_oid.snap = *p;
- if (!src_obc.count(clone_oid)) {
- if (is_unreadable_object(clone_oid)) {
- wait_for_unreadable_object(clone_oid, op);
- return;
- }
-
- ObjectContextRef sobc = get_object_context(clone_oid, false);
- if (!sobc) {
- if (!maybe_handle_cache(op, write_ordered, sobc, -ENOENT, clone_oid, true))
- osd->reply_op_error(op, -ENOENT);
- return;
- } else {
- dout(10) << " clone_oid " << clone_oid << " obc " << sobc << dendl;
- src_obc[clone_oid] = sobc;
- continue;
- }
- ceph_abort(); // unreachable
- } else {
- continue;
- }
- }
- }
-
OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops, obc, this);
if (!obc->obs.exists)
}
op->mark_started();
- ctx->src_obc.swap(src_obc);
execute_ctx(ctx);
utime_t prepare_latency = ceph_clock_now();
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
ObjectContextRef obc = ctx->obc;
const hobject_t& soid = obc->obs.oi.soid;
- map<hobject_t,ObjectContextRef, hobject_t::BitwiseComparator>& src_obc = ctx->src_obc;
// this method must be idempotent since we may call it several times
// before we finally apply the resulting transaction.
dout(10) << " taking ondisk_read_lock" << dendl;
obc->ondisk_read_lock();
}
- for (map<hobject_t,ObjectContextRef, hobject_t::BitwiseComparator>::iterator p = src_obc.begin(); p != src_obc.end(); ++p) {
- dout(10) << " taking ondisk_read_lock for src " << p->first << dendl;
- p->second->ondisk_read_lock();
- }
{
#ifdef WITH_LTTNG
dout(10) << " dropping ondisk_read_lock" << dendl;
obc->ondisk_read_unlock();
}
- for (map<hobject_t,ObjectContextRef, hobject_t::BitwiseComparator>::iterator p = src_obc.begin(); p != src_obc.end(); ++p) {
- dout(10) << " dropping ondisk_read_lock for src " << p->first << dendl;
- p->second->ondisk_read_unlock();
- }
if (result == -EINPROGRESS) {
// come back later.
hobject_t clone_oid = soid;
clone_oid.snap = *clone_iter;
- ObjectContextRef clone_obc = ctx->src_obc[clone_oid];
- assert(clone_obc);
+
+ /* No need to take a lock here. We are only inspecting state cached on
+ * in the ObjectContext, so we aren't performing an actual read unless
+ * the clone obc is not already loaded (in which case, it cannot have
+ * an in progress write). We also do not risk exposing uncommitted
+ * state since we do have a read lock on the head object or snapdir,
+ * which we would have to write lock in order to make user visible
+ * modifications to the snapshot state (snap trim related mutations
+ * are not user visible).
+ */
+ if (is_missing_object(clone_oid)) {
+ dout(20) << "LIST_SNAPS " << clone_oid << " missing" << dendl;
+ wait_for_unreadable_object(clone_oid, ctx->op);
+ result = -EAGAIN;
+ break;
+ }
+
+ ObjectContextRef clone_obc = get_object_context(clone_oid, false);
+ if (!clone_obc) {
+ if (maybe_handle_cache(
+ ctx->op, true, clone_obc, -ENOENT, clone_oid, true)) {
+ // promoting the clone
+ result = -EAGAIN;
+ } else {
+ osd->clog->error() << "osd." << osd->whoami
+ << ": missing clone " << clone_oid
+ << " for oid "
+ << soid;
+ // should not happen
+ result = -ENOENT;
+ }
+ break;
+ }
for (vector<snapid_t>::reverse_iterator p = clone_obc->obs.oi.snaps.rbegin();
p != clone_obc->obs.oi.snaps.rend();
++p) {