#include "Journal.h"
#include "FileJournal.h"
#include "common/RWLock.h"
+#include "osd/OpRequest.h"
class JournalingObjectStore : public ObjectStore {
protected:
#include "include/xlist.h"
#include "msg/Message.h"
#include "include/memory.h"
+#include "osd/osd_types.h"
#include "common/TrackedOp.h"
-/**
- * osd request identifier
- *
- * caller name + incarnation# + tid to unique identify this request.
- */
-struct osd_reqid_t {
- entity_name_t name; // who
- ceph_tid_t tid;
- int32_t inc; // incarnation
-
- osd_reqid_t()
- : tid(0), inc(0) {}
- osd_reqid_t(const entity_name_t& a, int i, ceph_tid_t t)
- : name(a), tid(t), inc(i) {}
-
- DENC(osd_reqid_t, v, p) {
- DENC_START(2, 2, p);
- denc(v.name, p);
- denc(v.tid, p);
- denc(v.inc, p);
- DENC_FINISH(p);
- }
- void dump(Formatter *f) const;
- static void generate_test_instances(list<osd_reqid_t*>& o);
-};
-WRITE_CLASS_DENC(osd_reqid_t)
-
/**
* The OpRequest takes in a Message* and takes over a single reference
* to it, which it puts() when destroyed.
#include "common/hobject.h"
#include "osd/osd_types.h"
+#include "osd/osd_internal_types.h"
#include "common/interval_map.h"
#include "common/inline_variant.h"
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_OSD_INTERNAL_TYPES_H
+#define CEPH_OSD_INTERNAL_TYPES_H
+
+#include "osd_types.h"
+#include "OpRequest.h"
+
+/*
+ * keep tabs on object modifications that are in flight.
+ * we need to know the projected existence, size, snapset,
+ * etc., because we don't send writes down to disk until after
+ * replicas ack.
+ */
+
+struct ObjectContext;
+
+struct ObjectState {
+ object_info_t oi;
+ bool exists; ///< the stored object exists (i.e., we will remember the object_info_t)
+
+ ObjectState() : exists(false) {}
+
+ ObjectState(const object_info_t &oi_, bool exists_)
+ : oi(oi_), exists(exists_) {}
+};
+
+typedef ceph::shared_ptr<ObjectContext> ObjectContextRef;
+
+struct ObjectContext {
+ ObjectState obs;
+
+ SnapSetContext *ssc; // may be null
+
+ Context *destructor_callback;
+
+private:
+ Mutex lock;
+public:
+ Cond cond;
+ int unstable_writes, readers, writers_waiting, readers_waiting;
+
+
+ // any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers.
+ map<pair<uint64_t, entity_name_t>, WatchRef> watchers;
+
+ // attr cache
+ map<string, bufferlist> attr_cache;
+
+ struct RWState {
+ enum State {
+ RWNONE,
+ RWREAD,
+ RWWRITE,
+ RWEXCL,
+ };
+ static const char *get_state_name(State s) {
+ switch (s) {
+ case RWNONE: return "none";
+ case RWREAD: return "read";
+ case RWWRITE: return "write";
+ case RWEXCL: return "excl";
+ default: return "???";
+ }
+ }
+ const char *get_state_name() const {
+ return get_state_name(state);
+ }
+
+ list<OpRequestRef> waiters; ///< ops waiting on state change
+ int count; ///< number of readers or writers
+
+ State state:4; ///< rw state
+ /// if set, restart backfill when we can get a read lock
+ bool recovery_read_marker:1;
+ /// if set, requeue snaptrim on lock release
+ bool snaptrimmer_write_marker:1;
+
+ RWState()
+ : count(0),
+ state(RWNONE),
+ recovery_read_marker(false),
+ snaptrimmer_write_marker(false)
+ {}
+ bool get_read(OpRequestRef op) {
+ if (get_read_lock()) {
+ return true;
+ } // else
+ waiters.push_back(op);
+ return false;
+ }
+ /// this function adjusts the counts if necessary
+ bool get_read_lock() {
+ // don't starve anybody!
+ if (!waiters.empty()) {
+ return false;
+ }
+ switch (state) {
+ case RWNONE:
+ assert(count == 0);
+ state = RWREAD;
+ // fall through
+ case RWREAD:
+ count++;
+ return true;
+ case RWWRITE:
+ return false;
+ case RWEXCL:
+ return false;
+ default:
+ assert(0 == "unhandled case");
+ return false;
+ }
+ }
+
+ bool get_write(OpRequestRef op, bool greedy=false) {
+ if (get_write_lock(greedy)) {
+ return true;
+ } // else
+ if (op)
+ waiters.push_back(op);
+ return false;
+ }
+ bool get_write_lock(bool greedy=false) {
+ if (!greedy) {
+ // don't starve anybody!
+ if (!waiters.empty() ||
+ recovery_read_marker) {
+ return false;
+ }
+ }
+ switch (state) {
+ case RWNONE:
+ assert(count == 0);
+ state = RWWRITE;
+ // fall through
+ case RWWRITE:
+ count++;
+ return true;
+ case RWREAD:
+ return false;
+ case RWEXCL:
+ return false;
+ default:
+ assert(0 == "unhandled case");
+ return false;
+ }
+ }
+ bool get_excl_lock() {
+ switch (state) {
+ case RWNONE:
+ assert(count == 0);
+ state = RWEXCL;
+ count = 1;
+ return true;
+ case RWWRITE:
+ return false;
+ case RWREAD:
+ return false;
+ case RWEXCL:
+ return false;
+ default:
+ assert(0 == "unhandled case");
+ return false;
+ }
+ }
+ bool get_excl(OpRequestRef op) {
+ if (get_excl_lock()) {
+ return true;
+ } // else
+ if (op)
+ waiters.push_back(op);
+ return false;
+ }
+ /// same as get_write_lock, but ignore starvation
+ bool take_write_lock() {
+ if (state == RWWRITE) {
+ count++;
+ return true;
+ }
+ return get_write_lock();
+ }
+ void dec(list<OpRequestRef> *requeue) {
+ assert(count > 0);
+ assert(requeue);
+ count--;
+ if (count == 0) {
+ state = RWNONE;
+ requeue->splice(requeue->end(), waiters);
+ }
+ }
+ void put_read(list<OpRequestRef> *requeue) {
+ assert(state == RWREAD);
+ dec(requeue);
+ }
+ void put_write(list<OpRequestRef> *requeue) {
+ assert(state == RWWRITE);
+ dec(requeue);
+ }
+ void put_excl(list<OpRequestRef> *requeue) {
+ assert(state == RWEXCL);
+ dec(requeue);
+ }
+ bool empty() const { return state == RWNONE; }
+ } rwstate;
+
+ bool get_read(OpRequestRef op) {
+ return rwstate.get_read(op);
+ }
+ bool get_write(OpRequestRef op) {
+ return rwstate.get_write(op, false);
+ }
+ bool get_excl(OpRequestRef op) {
+ return rwstate.get_excl(op);
+ }
+ bool get_lock_type(OpRequestRef op, RWState::State type) {
+ switch (type) {
+ case RWState::RWWRITE:
+ return get_write(op);
+ case RWState::RWREAD:
+ return get_read(op);
+ case RWState::RWEXCL:
+ return get_excl(op);
+ default:
+ assert(0 == "invalid lock type");
+ return true;
+ }
+ }
+ bool get_write_greedy(OpRequestRef op) {
+ return rwstate.get_write(op, true);
+ }
+ bool get_snaptrimmer_write(bool mark_if_unsuccessful) {
+ if (rwstate.get_write_lock()) {
+ return true;
+ } else {
+ if (mark_if_unsuccessful)
+ rwstate.snaptrimmer_write_marker = true;
+ return false;
+ }
+ }
+ bool get_recovery_read() {
+ rwstate.recovery_read_marker = true;
+ if (rwstate.get_read_lock()) {
+ return true;
+ }
+ return false;
+ }
+ bool try_get_read_lock() {
+ return rwstate.get_read_lock();
+ }
+ void drop_recovery_read(list<OpRequestRef> *ls) {
+ assert(rwstate.recovery_read_marker);
+ rwstate.put_read(ls);
+ rwstate.recovery_read_marker = false;
+ }
+ void put_read(list<OpRequestRef> *to_wake) {
+ rwstate.put_read(to_wake);
+ }
+ void put_excl(list<OpRequestRef> *to_wake,
+ bool *requeue_recovery,
+ bool *requeue_snaptrimmer) {
+ rwstate.put_excl(to_wake);
+ if (rwstate.empty() && rwstate.recovery_read_marker) {
+ rwstate.recovery_read_marker = false;
+ *requeue_recovery = true;
+ }
+ if (rwstate.empty() && rwstate.snaptrimmer_write_marker) {
+ rwstate.snaptrimmer_write_marker = false;
+ *requeue_snaptrimmer = true;
+ }
+ }
+ void put_write(list<OpRequestRef> *to_wake,
+ bool *requeue_recovery,
+ bool *requeue_snaptrimmer) {
+ rwstate.put_write(to_wake);
+ if (rwstate.empty() && rwstate.recovery_read_marker) {
+ rwstate.recovery_read_marker = false;
+ *requeue_recovery = true;
+ }
+ if (rwstate.empty() && rwstate.snaptrimmer_write_marker) {
+ rwstate.snaptrimmer_write_marker = false;
+ *requeue_snaptrimmer = true;
+ }
+ }
+ void put_lock_type(
+ ObjectContext::RWState::State type,
+ list<OpRequestRef> *to_wake,
+ bool *requeue_recovery,
+ bool *requeue_snaptrimmer) {
+ switch (type) {
+ case ObjectContext::RWState::RWWRITE:
+ return put_write(to_wake, requeue_recovery, requeue_snaptrimmer);
+ case ObjectContext::RWState::RWREAD:
+ return put_read(to_wake);
+ case ObjectContext::RWState::RWEXCL:
+ return put_excl(to_wake, requeue_recovery, requeue_snaptrimmer);
+ default:
+ assert(0 == "invalid lock type");
+ }
+ }
+ bool is_request_pending() {
+ return (rwstate.count > 0);
+ }
+
+ ObjectContext()
+ : ssc(NULL),
+ destructor_callback(0),
+ lock("PrimaryLogPG::ObjectContext::lock"),
+ unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0),
+ blocked(false), requeue_scrub_on_unblock(false) {}
+
+ ~ObjectContext() {
+ assert(rwstate.empty());
+ if (destructor_callback)
+ destructor_callback->complete(0);
+ }
+
+ void start_block() {
+ assert(!blocked);
+ blocked = true;
+ }
+ void stop_block() {
+ assert(blocked);
+ blocked = false;
+ }
+ bool is_blocked() const {
+ return blocked;
+ }
+
+ // do simple synchronous mutual exclusion, for now. no waitqueues or anything fancy.
+ void ondisk_write_lock() {
+ lock.Lock();
+ writers_waiting++;
+ while (readers_waiting || readers)
+ cond.Wait(lock);
+ writers_waiting--;
+ unstable_writes++;
+ lock.Unlock();
+ }
+ void ondisk_write_unlock() {
+ lock.Lock();
+ assert(unstable_writes > 0);
+ unstable_writes--;
+ if (!unstable_writes && readers_waiting)
+ cond.Signal();
+ lock.Unlock();
+ }
+ void ondisk_read_lock() {
+ lock.Lock();
+ readers_waiting++;
+ while (unstable_writes)
+ cond.Wait(lock);
+ readers_waiting--;
+ readers++;
+ lock.Unlock();
+ }
+ void ondisk_read_unlock() {
+ lock.Lock();
+ assert(readers > 0);
+ readers--;
+ if (!readers && writers_waiting)
+ cond.Signal();
+ lock.Unlock();
+ }
+
+ /// in-progress copyfrom ops for this object
+ bool blocked:1;
+ bool requeue_scrub_on_unblock:1; // true if we need to requeue scrub on unblock
+
+};
+
+inline ostream& operator<<(ostream& out, const ObjectState& obs)
+{
+ out << obs.oi.soid;
+ if (!obs.exists)
+ out << "(dne)";
+ return out;
+}
+
+inline ostream& operator<<(ostream& out, const ObjectContext::RWState& rw)
+{
+ return out << "rwstate(" << rw.get_state_name()
+ << " n=" << rw.count
+ << " w=" << rw.waiters.size()
+ << ")";
+}
+
+inline ostream& operator<<(ostream& out, const ObjectContext& obc)
+{
+ return out << "obc(" << obc.obs << " " << obc.rwstate << ")";
+}
+
+class ObcLockManager {
+ struct ObjectLockState {
+ ObjectContextRef obc;
+ ObjectContext::RWState::State type;
+ ObjectLockState(
+ ObjectContextRef obc,
+ ObjectContext::RWState::State type)
+ : obc(obc), type(type) {}
+ };
+ map<hobject_t, ObjectLockState> locks;
+public:
+ ObcLockManager() = default;
+ ObcLockManager(ObcLockManager &&) = default;
+ ObcLockManager(const ObcLockManager &) = delete;
+ ObcLockManager &operator=(ObcLockManager &&) = default;
+ bool empty() const {
+ return locks.empty();
+ }
+ bool get_lock_type(
+ ObjectContext::RWState::State type,
+ const hobject_t &hoid,
+ ObjectContextRef obc,
+ OpRequestRef op) {
+ assert(locks.find(hoid) == locks.end());
+ if (obc->get_lock_type(op, type)) {
+ locks.insert(make_pair(hoid, ObjectLockState(obc, type)));
+ return true;
+ } else {
+ return false;
+ }
+ }
+ /// Get write lock, ignore starvation
+ bool take_write_lock(
+ const hobject_t &hoid,
+ ObjectContextRef obc) {
+ assert(locks.find(hoid) == locks.end());
+ if (obc->rwstate.take_write_lock()) {
+ locks.insert(
+ make_pair(
+ hoid, ObjectLockState(obc, ObjectContext::RWState::RWWRITE)));
+ return true;
+ } else {
+ return false;
+ }
+ }
+ /// Get write lock for snap trim
+ bool get_snaptrimmer_write(
+ const hobject_t &hoid,
+ ObjectContextRef obc,
+ bool mark_if_unsuccessful) {
+ assert(locks.find(hoid) == locks.end());
+ if (obc->get_snaptrimmer_write(mark_if_unsuccessful)) {
+ locks.insert(
+ make_pair(
+ hoid, ObjectLockState(obc, ObjectContext::RWState::RWWRITE)));
+ return true;
+ } else {
+ return false;
+ }
+ }
+ /// Get write lock greedy
+ bool get_write_greedy(
+ const hobject_t &hoid,
+ ObjectContextRef obc,
+ OpRequestRef op) {
+ assert(locks.find(hoid) == locks.end());
+ if (obc->get_write_greedy(op)) {
+ locks.insert(
+ make_pair(
+ hoid, ObjectLockState(obc, ObjectContext::RWState::RWWRITE)));
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /// try get read lock
+ bool try_get_read_lock(
+ const hobject_t &hoid,
+ ObjectContextRef obc) {
+ assert(locks.find(hoid) == locks.end());
+ if (obc->try_get_read_lock()) {
+ locks.insert(
+ make_pair(
+ hoid,
+ ObjectLockState(obc, ObjectContext::RWState::RWREAD)));
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void put_locks(
+ list<pair<hobject_t, list<OpRequestRef> > > *to_requeue,
+ bool *requeue_recovery,
+ bool *requeue_snaptrimmer) {
+ for (auto p: locks) {
+ list<OpRequestRef> _to_requeue;
+ p.second.obc->put_lock_type(
+ p.second.type,
+ &_to_requeue,
+ requeue_recovery,
+ requeue_snaptrimmer);
+ if (to_requeue) {
+ to_requeue->push_back(
+ make_pair(
+ p.second.obc->obs.oi.soid,
+ std::move(_to_requeue)));
+ }
+ }
+ locks.clear();
+ }
+ ~ObcLockManager() {
+ assert(locks.empty());
+ }
+};
+
+
+
+#endif
#include "common/snap_types.h"
#include "HitSet.h"
#include "Watch.h"
-#include "OpRequest.h"
#include "include/cmp.h"
#include "librados/ListObjectImpl.h"
#include "compressor/Compressor.h"
/// conver CEPH_OSD_ALLOC_HINT_FLAG_* op flags to a string
string ceph_osd_alloc_hint_flag_string(unsigned flags);
+
+/**
+ * osd request identifier
+ *
+ * caller name + incarnation# + tid to unique identify this request.
+ */
+struct osd_reqid_t {
+ entity_name_t name; // who
+ ceph_tid_t tid;
+ int32_t inc; // incarnation
+
+ osd_reqid_t()
+ : tid(0), inc(0) {}
+ osd_reqid_t(const entity_name_t& a, int i, ceph_tid_t t)
+ : name(a), tid(t), inc(i) {}
+
+ DENC(osd_reqid_t, v, p) {
+ DENC_START(2, 2, p);
+ denc(v.name, p);
+ denc(v.tid, p);
+ denc(v.inc, p);
+ DENC_FINISH(p);
+ }
+ void dump(Formatter *f) const;
+ static void generate_test_instances(list<osd_reqid_t*>& o);
+};
+WRITE_CLASS_DENC(osd_reqid_t)
+
+
+
struct pg_shard_t {
int32_t osd;
shard_id_t shard;
};
WRITE_CLASS_ENCODER_FEATURES(object_info_t)
-struct ObjectState {
- object_info_t oi;
- bool exists; ///< the stored object exists (i.e., we will remember the object_info_t)
-
- ObjectState() : exists(false) {}
-
- ObjectState(const object_info_t &oi_, bool exists_)
- : oi(oi_), exists(exists_) {}
-};
-
struct SnapSetContext {
hobject_t oid;
SnapSet snapset;
oid(o), ref(0), registered(false), exists(true) { }
};
-/*
- * keep tabs on object modifications that are in flight.
- * we need to know the projected existence, size, snapset,
- * etc., because we don't send writes down to disk until after
- * replicas ack.
- */
-
-struct ObjectContext;
-
-typedef ceph::shared_ptr<ObjectContext> ObjectContextRef;
-
-struct ObjectContext {
- ObjectState obs;
-
- SnapSetContext *ssc; // may be null
-
- Context *destructor_callback;
-
-private:
- Mutex lock;
-public:
- Cond cond;
- int unstable_writes, readers, writers_waiting, readers_waiting;
-
-
- // any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers.
- map<pair<uint64_t, entity_name_t>, WatchRef> watchers;
-
- // attr cache
- map<string, bufferlist> attr_cache;
-
- struct RWState {
- enum State {
- RWNONE,
- RWREAD,
- RWWRITE,
- RWEXCL,
- };
- static const char *get_state_name(State s) {
- switch (s) {
- case RWNONE: return "none";
- case RWREAD: return "read";
- case RWWRITE: return "write";
- case RWEXCL: return "excl";
- default: return "???";
- }
- }
- const char *get_state_name() const {
- return get_state_name(state);
- }
-
- list<OpRequestRef> waiters; ///< ops waiting on state change
- int count; ///< number of readers or writers
-
- State state:4; ///< rw state
- /// if set, restart backfill when we can get a read lock
- bool recovery_read_marker:1;
- /// if set, requeue snaptrim on lock release
- bool snaptrimmer_write_marker:1;
-
- RWState()
- : count(0),
- state(RWNONE),
- recovery_read_marker(false),
- snaptrimmer_write_marker(false)
- {}
- bool get_read(OpRequestRef op) {
- if (get_read_lock()) {
- return true;
- } // else
- waiters.push_back(op);
- return false;
- }
- /// this function adjusts the counts if necessary
- bool get_read_lock() {
- // don't starve anybody!
- if (!waiters.empty()) {
- return false;
- }
- switch (state) {
- case RWNONE:
- assert(count == 0);
- state = RWREAD;
- // fall through
- case RWREAD:
- count++;
- return true;
- case RWWRITE:
- return false;
- case RWEXCL:
- return false;
- default:
- assert(0 == "unhandled case");
- return false;
- }
- }
-
- bool get_write(OpRequestRef op, bool greedy=false) {
- if (get_write_lock(greedy)) {
- return true;
- } // else
- if (op)
- waiters.push_back(op);
- return false;
- }
- bool get_write_lock(bool greedy=false) {
- if (!greedy) {
- // don't starve anybody!
- if (!waiters.empty() ||
- recovery_read_marker) {
- return false;
- }
- }
- switch (state) {
- case RWNONE:
- assert(count == 0);
- state = RWWRITE;
- // fall through
- case RWWRITE:
- count++;
- return true;
- case RWREAD:
- return false;
- case RWEXCL:
- return false;
- default:
- assert(0 == "unhandled case");
- return false;
- }
- }
- bool get_excl_lock() {
- switch (state) {
- case RWNONE:
- assert(count == 0);
- state = RWEXCL;
- count = 1;
- return true;
- case RWWRITE:
- return false;
- case RWREAD:
- return false;
- case RWEXCL:
- return false;
- default:
- assert(0 == "unhandled case");
- return false;
- }
- }
- bool get_excl(OpRequestRef op) {
- if (get_excl_lock()) {
- return true;
- } // else
- if (op)
- waiters.push_back(op);
- return false;
- }
- /// same as get_write_lock, but ignore starvation
- bool take_write_lock() {
- if (state == RWWRITE) {
- count++;
- return true;
- }
- return get_write_lock();
- }
- void dec(list<OpRequestRef> *requeue) {
- assert(count > 0);
- assert(requeue);
- count--;
- if (count == 0) {
- state = RWNONE;
- requeue->splice(requeue->end(), waiters);
- }
- }
- void put_read(list<OpRequestRef> *requeue) {
- assert(state == RWREAD);
- dec(requeue);
- }
- void put_write(list<OpRequestRef> *requeue) {
- assert(state == RWWRITE);
- dec(requeue);
- }
- void put_excl(list<OpRequestRef> *requeue) {
- assert(state == RWEXCL);
- dec(requeue);
- }
- bool empty() const { return state == RWNONE; }
- } rwstate;
-
- bool get_read(OpRequestRef op) {
- return rwstate.get_read(op);
- }
- bool get_write(OpRequestRef op) {
- return rwstate.get_write(op, false);
- }
- bool get_excl(OpRequestRef op) {
- return rwstate.get_excl(op);
- }
- bool get_lock_type(OpRequestRef op, RWState::State type) {
- switch (type) {
- case RWState::RWWRITE:
- return get_write(op);
- case RWState::RWREAD:
- return get_read(op);
- case RWState::RWEXCL:
- return get_excl(op);
- default:
- assert(0 == "invalid lock type");
- return true;
- }
- }
- bool get_write_greedy(OpRequestRef op) {
- return rwstate.get_write(op, true);
- }
- bool get_snaptrimmer_write(bool mark_if_unsuccessful) {
- if (rwstate.get_write_lock()) {
- return true;
- } else {
- if (mark_if_unsuccessful)
- rwstate.snaptrimmer_write_marker = true;
- return false;
- }
- }
- bool get_recovery_read() {
- rwstate.recovery_read_marker = true;
- if (rwstate.get_read_lock()) {
- return true;
- }
- return false;
- }
- bool try_get_read_lock() {
- return rwstate.get_read_lock();
- }
- void drop_recovery_read(list<OpRequestRef> *ls) {
- assert(rwstate.recovery_read_marker);
- rwstate.put_read(ls);
- rwstate.recovery_read_marker = false;
- }
- void put_read(list<OpRequestRef> *to_wake) {
- rwstate.put_read(to_wake);
- }
- void put_excl(list<OpRequestRef> *to_wake,
- bool *requeue_recovery,
- bool *requeue_snaptrimmer) {
- rwstate.put_excl(to_wake);
- if (rwstate.empty() && rwstate.recovery_read_marker) {
- rwstate.recovery_read_marker = false;
- *requeue_recovery = true;
- }
- if (rwstate.empty() && rwstate.snaptrimmer_write_marker) {
- rwstate.snaptrimmer_write_marker = false;
- *requeue_snaptrimmer = true;
- }
- }
- void put_write(list<OpRequestRef> *to_wake,
- bool *requeue_recovery,
- bool *requeue_snaptrimmer) {
- rwstate.put_write(to_wake);
- if (rwstate.empty() && rwstate.recovery_read_marker) {
- rwstate.recovery_read_marker = false;
- *requeue_recovery = true;
- }
- if (rwstate.empty() && rwstate.snaptrimmer_write_marker) {
- rwstate.snaptrimmer_write_marker = false;
- *requeue_snaptrimmer = true;
- }
- }
- void put_lock_type(
- ObjectContext::RWState::State type,
- list<OpRequestRef> *to_wake,
- bool *requeue_recovery,
- bool *requeue_snaptrimmer) {
- switch (type) {
- case ObjectContext::RWState::RWWRITE:
- return put_write(to_wake, requeue_recovery, requeue_snaptrimmer);
- case ObjectContext::RWState::RWREAD:
- return put_read(to_wake);
- case ObjectContext::RWState::RWEXCL:
- return put_excl(to_wake, requeue_recovery, requeue_snaptrimmer);
- default:
- assert(0 == "invalid lock type");
- }
- }
- bool is_request_pending() {
- return (rwstate.count > 0);
- }
-
- ObjectContext()
- : ssc(NULL),
- destructor_callback(0),
- lock("PrimaryLogPG::ObjectContext::lock"),
- unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0),
- blocked(false), requeue_scrub_on_unblock(false) {}
-
- ~ObjectContext() {
- assert(rwstate.empty());
- if (destructor_callback)
- destructor_callback->complete(0);
- }
-
- void start_block() {
- assert(!blocked);
- blocked = true;
- }
- void stop_block() {
- assert(blocked);
- blocked = false;
- }
- bool is_blocked() const {
- return blocked;
- }
-
- // do simple synchronous mutual exclusion, for now. no waitqueues or anything fancy.
- void ondisk_write_lock() {
- lock.Lock();
- writers_waiting++;
- while (readers_waiting || readers)
- cond.Wait(lock);
- writers_waiting--;
- unstable_writes++;
- lock.Unlock();
- }
- void ondisk_write_unlock() {
- lock.Lock();
- assert(unstable_writes > 0);
- unstable_writes--;
- if (!unstable_writes && readers_waiting)
- cond.Signal();
- lock.Unlock();
- }
- void ondisk_read_lock() {
- lock.Lock();
- readers_waiting++;
- while (unstable_writes)
- cond.Wait(lock);
- readers_waiting--;
- readers++;
- lock.Unlock();
- }
- void ondisk_read_unlock() {
- lock.Lock();
- assert(readers > 0);
- readers--;
- if (!readers && writers_waiting)
- cond.Signal();
- lock.Unlock();
- }
-
- /// in-progress copyfrom ops for this object
- bool blocked:1;
- bool requeue_scrub_on_unblock:1; // true if we need to requeue scrub on unblock
-
-};
-
-inline ostream& operator<<(ostream& out, const ObjectState& obs)
-{
- out << obs.oi.soid;
- if (!obs.exists)
- out << "(dne)";
- return out;
-}
-
-inline ostream& operator<<(ostream& out, const ObjectContext::RWState& rw)
-{
- return out << "rwstate(" << rw.get_state_name()
- << " n=" << rw.count
- << " w=" << rw.waiters.size()
- << ")";
-}
-
-inline ostream& operator<<(ostream& out, const ObjectContext& obc)
-{
- return out << "obc(" << obc.obs << " " << obc.rwstate << ")";
-}
ostream& operator<<(ostream& out, const object_info_t& oi);
-class ObcLockManager {
- struct ObjectLockState {
- ObjectContextRef obc;
- ObjectContext::RWState::State type;
- ObjectLockState(
- ObjectContextRef obc,
- ObjectContext::RWState::State type)
- : obc(obc), type(type) {}
- };
- map<hobject_t, ObjectLockState> locks;
-public:
- ObcLockManager() = default;
- ObcLockManager(ObcLockManager &&) = default;
- ObcLockManager(const ObcLockManager &) = delete;
- ObcLockManager &operator=(ObcLockManager &&) = default;
- bool empty() const {
- return locks.empty();
- }
- bool get_lock_type(
- ObjectContext::RWState::State type,
- const hobject_t &hoid,
- ObjectContextRef obc,
- OpRequestRef op) {
- assert(locks.find(hoid) == locks.end());
- if (obc->get_lock_type(op, type)) {
- locks.insert(make_pair(hoid, ObjectLockState(obc, type)));
- return true;
- } else {
- return false;
- }
- }
- /// Get write lock, ignore starvation
- bool take_write_lock(
- const hobject_t &hoid,
- ObjectContextRef obc) {
- assert(locks.find(hoid) == locks.end());
- if (obc->rwstate.take_write_lock()) {
- locks.insert(
- make_pair(
- hoid, ObjectLockState(obc, ObjectContext::RWState::RWWRITE)));
- return true;
- } else {
- return false;
- }
- }
- /// Get write lock for snap trim
- bool get_snaptrimmer_write(
- const hobject_t &hoid,
- ObjectContextRef obc,
- bool mark_if_unsuccessful) {
- assert(locks.find(hoid) == locks.end());
- if (obc->get_snaptrimmer_write(mark_if_unsuccessful)) {
- locks.insert(
- make_pair(
- hoid, ObjectLockState(obc, ObjectContext::RWState::RWWRITE)));
- return true;
- } else {
- return false;
- }
- }
- /// Get write lock greedy
- bool get_write_greedy(
- const hobject_t &hoid,
- ObjectContextRef obc,
- OpRequestRef op) {
- assert(locks.find(hoid) == locks.end());
- if (obc->get_write_greedy(op)) {
- locks.insert(
- make_pair(
- hoid, ObjectLockState(obc, ObjectContext::RWState::RWWRITE)));
- return true;
- } else {
- return false;
- }
- }
-
- /// try get read lock
- bool try_get_read_lock(
- const hobject_t &hoid,
- ObjectContextRef obc) {
- assert(locks.find(hoid) == locks.end());
- if (obc->try_get_read_lock()) {
- locks.insert(
- make_pair(
- hoid,
- ObjectLockState(obc, ObjectContext::RWState::RWREAD)));
- return true;
- } else {
- return false;
- }
- }
-
- void put_locks(
- list<pair<hobject_t, list<OpRequestRef> > > *to_requeue,
- bool *requeue_recovery,
- bool *requeue_snaptrimmer) {
- for (auto p: locks) {
- list<OpRequestRef> _to_requeue;
- p.second.obc->put_lock_type(
- p.second.type,
- &_to_requeue,
- requeue_recovery,
- requeue_snaptrimmer);
- if (to_requeue) {
- to_requeue->push_back(
- make_pair(
- p.second.obc->obs.oi.soid,
- std::move(_to_requeue)));
- }
- }
- locks.clear();
- }
- ~ObcLockManager() {
- assert(locks.empty());
- }
-};
-
// Object recovery