using watch_key_t = std::pair<uint64_t, entity_name_t>;
std::map<watch_key_t, seastar::shared_ptr<crimson::osd::Watch>> watchers;
+ CommonOBCPipeline obc_pipeline;
+
ObjectContext(hobject_t hoid) : lock(hoid),
obs(std::move(hoid)) {}
private:
boost::intrusive::list_member_hook<> obc_accessing_hook;
uint64_t list_link_cnt = 0;
+
+ /**
+ * loading_started
+ *
+ * ObjectContext instances may be used for pipeline stages
+ * prior to actually being loaded.
+ *
+ * ObjectContextLoader::load_and_lock* use loading_started
+ * to determine whether to initiate loading or simply take
+ * the desired lock directly.
+ *
+ * If loading_started is not set, the task must set it and
+ * (syncronously) take an exclusive lock. That exclusive lock
+ * must be held until the loading completes, at which point the
+ * lock may be relaxed or released.
+ *
+ * If loading_started is set, it is safe to directly take
+ * the desired lock, once the lock is obtained loading may
+ * be assumed to be complete.
+ *
+ * loading_started, once set, remains set for the lifetime
+ * of the object.
+ */
+ bool loading_started = false;
+
+ /// true once set_*_state has been called, used for debugging
bool fully_loaded = false;
+
+ /**
+ * invalidated
+ *
+ * Set to true upon eviction from cache. This happens to all
+ * cached obc's upon interval change and to the target of
+ * a repop received on a replica to ensure that the cached
+ * state is refreshed upon subsequent replica read.
+ */
bool invalidated = false;
friend class ObjectContextRegistry;
LOG_PREFIX(ObjectContextLoader::load_and_lock_head);
DEBUGDPP("{} {}", dpp, manager.target, lock_type);
auto releaser = manager.get_releaser();
- // no users pre-populate head_state on this path, so we don't bother to
- // handle it
ceph_assert(manager.target.is_head());
- ceph_assert(manager.head_state.is_empty());
+
+ if (manager.head_state.is_empty()) {
+ auto [obc, _] = obc_registry.get_cached_obc(manager.target);
+ manager.set_state_obc(manager.head_state, obc);
+ }
ceph_assert(manager.target_state.is_empty());
- auto [obc, existed] = obc_registry.get_cached_obc(manager.target);
- manager.set_state_obc(manager.target_state, obc);
- manager.set_state_obc(manager.head_state, obc);
+ manager.set_state_obc(manager.target_state, manager.head_state.obc);
- if (existed) {
+ if (manager.target_state.obc->loading_started) {
co_await manager.target_state.lock_to(lock_type);
} else {
manager.target_state.lock_excl_sync();
+ manager.target_state.obc->loading_started = true;
co_await load_obc(manager.target_state.obc);
manager.target_state.demote_excl_to(lock_type);
}
}
ObjectContextLoader::load_and_lock_fut
-ObjectContextLoader::load_and_lock_clone(Manager &manager, RWState::State lock_type)
+ObjectContextLoader::load_and_lock_clone(
+ Manager &manager, RWState::State lock_type, bool lock_head)
{
LOG_PREFIX(ObjectContextLoader::load_and_lock_clone);
DEBUGDPP("{} {}", dpp, manager.target, lock_type);
ceph_assert(manager.target_state.is_empty());
if (manager.head_state.is_empty()) {
- auto [obc, existed] = obc_registry.get_cached_obc(manager.target.get_head());
+ auto [obc, _] = obc_registry.get_cached_obc(manager.target.get_head());
manager.set_state_obc(manager.head_state, obc);
+ }
- if (existed) {
- co_await manager.head_state.lock_to(RWState::RWREAD);
- } else {
- manager.head_state.lock_excl_sync();
- co_await load_obc(manager.head_state.obc);
- manager.head_state.demote_excl_to(RWState::RWREAD);
- }
+ if (!manager.head_state.obc->loading_started) {
+ // caller is responsible for pre-populating a loaded obc if lock_head is
+ // false
+ ceph_assert(lock_head);
+ manager.head_state.lock_excl_sync();
+ manager.head_state.obc->loading_started = true;
+ co_await load_obc(manager.head_state.obc);
+ manager.head_state.demote_excl_to(RWState::RWREAD);
+ } else if (lock_head) {
+ co_await manager.head_state.lock_to(RWState::RWREAD);
}
if (manager.options.resolve_clone) {
manager.head_state.state = RWState::RWNONE;
}
} else {
- auto [obc, existed] = obc_registry.get_cached_obc(manager.target);
+ auto [obc, _] = obc_registry.get_cached_obc(manager.target);
manager.set_state_obc(manager.target_state, obc);
- if (existed) {
+ if (manager.target_state.obc->loading_started) {
co_await manager.target_state.lock_to(RWState::RWREAD);
} else {
manager.target_state.lock_excl_sync();
+ manager.target_state.obc->loading_started = true;
co_await load_obc(manager.target_state.obc);
manager.target_state.obc->set_clone_ssc(manager.head_state.obc->ssc);
manager.target_state.demote_excl_to(RWState::RWREAD);
#include <seastar/core/future.hh>
#include <seastar/util/defer.hh>
+#include "crimson/common/coroutine.h"
#include "crimson/common/errorator.h"
#include "crimson/common/log.h"
#include "crimson/osd/object_context.h"
+#include "crimson/osd/osd_operation.h"
#include "crimson/osd/pg_backend.h"
#include "osd/object_state_fmt.h"
ObjectContextRef &get_obc() {
ceph_assert(!target_state.is_empty());
+ ceph_assert(target_state.obc->is_loaded());
return target_state.obc;
}
ObjectContextRef &get_head_obc() {
ceph_assert(!head_state.is_empty());
+ ceph_assert(head_state.obc->is_loaded());
return head_state.obc;
}
release();
}
};
- Manager get_obc_manager(hobject_t oid, bool resolve_clone = true) {
+
+ class Orderer {
+ friend ObjectContextLoader;
+ ObjectContextRef orderer_obc;
+ public:
+ CommonOBCPipeline &obc_pp() {
+ ceph_assert(orderer_obc);
+ return orderer_obc->obc_pipeline;
+ }
+ };
+
+ Orderer get_obc_orderer(const hobject_t &oid) {
+ Orderer ret;
+ std::tie(ret.orderer_obc, std::ignore) =
+ obc_registry.get_cached_obc(oid.get_head());
+ return ret;
+ }
+
+ Manager get_obc_manager(const hobject_t &oid, bool resolve_clone = true) {
Manager ret(*this, oid);
ret.options.resolve_clone = resolve_clone;
return ret;
}
+ Manager get_obc_manager(
+ Orderer &orderer, const hobject_t &oid, bool resolve_clone = true) {
+ Manager ret = get_obc_manager(oid, resolve_clone);
+ ret.set_state_obc(ret.head_state, orderer.orderer_obc);
+ return ret;
+ }
+
using load_and_lock_ertr = load_obc_ertr;
using load_and_lock_iertr = interruptible::interruptible_errorator<
IOInterruptCondition, load_and_lock_ertr>;
using load_and_lock_fut = load_and_lock_iertr::future<>;
private:
load_and_lock_fut load_and_lock_head(Manager &, RWState::State);
- load_and_lock_fut load_and_lock_clone(Manager &, RWState::State);
+ load_and_lock_fut load_and_lock_clone(Manager &, RWState::State, bool lock_head=true);
public:
load_and_lock_fut load_and_lock(Manager &, RWState::State);
// locks on head as part of this call.
manager.head_state.obc = head;
manager.head_state.obc->append_to(obc_set_accessing);
- co_await load_and_lock(manager, State);
+ co_await load_and_lock_clone(manager, State, false);
co_await std::invoke(func, head, manager.get_obc());
}