--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "tri_mutex.h"
+
+seastar::future<> tri_mutex::lock_for_read()
+{
+ if (try_lock_for_read()) {
+ return seastar::make_ready_future<>();
+ }
+ waiters.emplace_back(seastar::promise<>(), type_t::read);
+ return waiters.back().pr.get_future();
+}
+
+bool tri_mutex::try_lock_for_read() noexcept
+{
+ if (!writers && !exclusively_used && waiters.empty()) {
+ ++readers;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void tri_mutex::unlock_for_read()
+{
+ assert(readers > 0);
+ if (--readers == 0) {
+ wake();
+ }
+}
+
+seastar::future<> tri_mutex::lock_for_write(bool greedy)
+{
+ if (try_lock_for_write(greedy)) {
+ return seastar::make_ready_future<>();
+ }
+ waiters.emplace_back(seastar::promise<>(), type_t::write);
+ return waiters.back().pr.get_future();
+}
+
+bool tri_mutex::try_lock_for_write(bool greedy) noexcept
+{
+ if (!readers && !exclusively_used) {
+ if (greedy || waiters.empty()) {
+ ++writers;
+ return true;
+ }
+ }
+ return false;
+}
+
+void tri_mutex::unlock_for_write()
+{
+ assert(writers > 0);
+ if (--writers == 0) {
+ wake();
+ }
+}
+
+// for exclusive users
+seastar::future<> tri_mutex::lock_for_excl()
+{
+ if (try_lock_for_excl()) {
+ return seastar::make_ready_future<>();
+ }
+ waiters.emplace_back(seastar::promise<>(), type_t::exclusive);
+ return waiters.back().pr.get_future();
+}
+
+bool tri_mutex::try_lock_for_excl() noexcept
+{
+ if (!readers && !writers && !exclusively_used) {
+ exclusively_used = true;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void tri_mutex::unlock_for_excl()
+{
+ assert(exclusively_used);
+ exclusively_used = false;
+ wake();
+}
+
+bool tri_mutex::is_acquired() const
+{
+ if (readers) {
+ return true;
+ } else if (writers) {
+ return true;
+ } else if (exclusively_used) {
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void tri_mutex::wake()
+{
+ assert(!readers && !writers && !exclusively_used);
+ type_t type = type_t::none;
+ while (!waiters.empty()) {
+ auto& waiter = waiters.front();
+ if (type == type_t::exclusive) {
+ break;
+ } if (type == type_t::none) {
+ type = waiter.type;
+ } else if (type != waiter.type) {
+ // to be woken in the next batch
+ break;
+ }
+ switch (type) {
+ case type_t::read:
+ ++readers;
+ break;
+ case type_t::write:
+ ++writers;
+ break;
+ case type_t::exclusive:
+ exclusively_used = true;
+ break;
+ default:
+ assert(0);
+ }
+ waiter.pr.set_value();
+ waiters.pop_front();
+ }
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/future.hh>
+#include <seastar/core/circular_buffer.hh>
+
+/// shared/exclusive mutual exclusion
+///
+/// similar to seastar::shared_mutex, but instead of two kinds of waiters,
+/// tri_mutex keeps track of three kinds of them:
+/// - readers
+/// - writers
+/// - exclusive users
+/// and unlike shared_mutex, tri_mutex have two kinds of shared users of lock:
+/// - readers
+/// - writers, which are not mutual-exclusive
+/// the exclusive users is like the writers in shared_mutex.
+class tri_mutex {
+public:
+ tri_mutex() = default;
+ // for shared readers
+ seastar::future<> lock_for_read();
+ bool try_lock_for_read() noexcept;
+ void unlock_for_read();
+ // for shared writers
+ seastar::future<> lock_for_write(bool greedy);
+ bool try_lock_for_write(bool greedy) noexcept;
+ void unlock_for_write();
+ // for exclusive users
+ seastar::future<> lock_for_excl();
+ bool try_lock_for_excl() noexcept;
+ void unlock_for_excl();
+
+ bool is_acquired() const;
+
+private:
+ void wake();
+ unsigned readers = 0;
+ unsigned writers = 0;
+ bool exclusively_used = false;
+ enum class type_t : uint8_t {
+ read,
+ write,
+ exclusive,
+ none,
+ };
+ struct waiter_t {
+ waiter_t(seastar::promise<>&& pr, type_t type)
+ : pr(std::move(pr)), type(type)
+ {}
+ seastar::promise<> pr;
+ type_t type;
+ };
+ seastar::circular_buffer<waiter_t> waiters;
+};
#include "common/intrusive_lru.h"
#include "osd/object_state.h"
+#include "crimson/common/tri_mutex.h"
#include "crimson/osd/osd_operation.h"
namespace ceph {
}
private:
- RWState rwstate;
- seastar::shared_mutex wait_queue;
- std::optional<seastar::shared_promise<>> wake;
-
- template <typename F>
- seastar::future<> with_queue(F &&f) {
- return wait_queue.lock().then([this, f=std::move(f)] {
- ceph_assert(!wake);
- return seastar::repeat([this, f=std::move(f)]() {
- if (f()) {
- wait_queue.unlock();
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::yes);
- } else {
- rwstate.inc_waiters();
- wake = seastar::shared_promise<>();
- return wake->get_shared_future().then([this, f=std::move(f)] {
- wake = std::nullopt;
- rwstate.dec_waiters(1);
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::no);
- });
- }
- });
- });
- }
-
+ tri_mutex rwlock;
+ bool recovery_read_marker = false;
const char *get_type_name() const final {
return "ObjectContext";
Operation *op,
LockF &&lockf) {
return op->with_blocking_future(
- make_blocking_future(with_queue(std::forward<LockF>(lockf))));
+ make_blocking_future(std::forward<LockF>(lockf)));
}
- template <typename UnlockF>
- void put_lock(
- UnlockF &&unlockf) {
- if (unlockf() && wake) wake->set_value();
- }
public:
seastar::future<> get_lock_type(Operation *op, RWState::State type) {
switch (type) {
case RWState::RWWRITE:
- return get_lock(op, [this] { return rwstate.get_write_lock(); });
+ return get_lock(op, rwlock.lock_for_write(false));
case RWState::RWREAD:
- return get_lock(op, [this] { return rwstate.get_read_lock(); });
+ return get_lock(op, rwlock.lock_for_read());
case RWState::RWEXCL:
- return get_lock(op, [this] { return rwstate.get_excl_lock(); });
+ return get_lock(op, rwlock.lock_for_excl());
case RWState::RWNONE:
return seastar::make_ready_future<>();
default:
void put_lock_type(RWState::State type) {
switch (type) {
case RWState::RWWRITE:
- return put_lock([this] { return rwstate.put_write(); });
+ return rwlock.unlock_for_write();
case RWState::RWREAD:
- return put_lock([this] { return rwstate.put_read(); });
+ return rwlock.unlock_for_read();
case RWState::RWEXCL:
- return put_lock([this] { return rwstate.put_excl(); });
+ return rwlock.unlock_for_excl();
case RWState::RWNONE:
return;
default:
void degrade_excl_to(RWState::State type) {
// assume we already hold an excl lock
- bool put = rwstate.put_excl();
+ rwlock.unlock_for_excl();
bool success = false;
switch (type) {
case RWState::RWWRITE:
- success = rwstate.get_write_lock();
+ success = rwlock.try_lock_for_write(false);
break;
case RWState::RWREAD:
- success = rwstate.get_read_lock();
+ success = rwlock.try_lock_for_read();
break;
case RWState::RWEXCL:
- success = rwstate.get_excl_lock();
+ success = rwlock.try_lock_for_excl();
break;
case RWState::RWNONE:
success = true;
break;
default:
- ceph_abort_msg("invalid lock type");
+ assert(0 == "invalid lock type");
break;
}
ceph_assert(success);
- if (put && wake) {
- wake->set_value();
- }
}
- bool empty() const { return rwstate.empty(); }
+ bool empty() const {
+ return !rwlock.is_acquired();
+ }
+ bool is_request_pending() const {
+ return rwlock.is_acquired();
+ }
template <typename F>
seastar::future<> get_write_greedy(Operation *op) {
- return get_lock(op, [this] { return rwstate.get_write_lock(true); });
+ return get_lock(op, [this] {
+ return rwlock.lock_for_write(true);
+ });
}
- bool try_get_read_lock() {
- return rwstate.get_read_lock();
- }
- void drop_read() {
- return put_lock_type(RWState::RWREAD);
- }
bool get_recovery_read() {
- return rwstate.get_recovery_read();
+ if (rwlock.try_lock_for_read()) {
+ recovery_read_marker = true;
+ return true;
+ } else {
+ return false;
+ }
}
seastar::future<> wait_recovery_read() {
- if (rwstate.get_recovery_read()) {
- return seastar::make_ready_future<>();
- }
- return with_queue([this] {
- return rwstate.get_recovery_read();
+ return rwlock.lock_for_read().then([this] {
+ recovery_read_marker = true;
});
}
void drop_recovery_read() {
- ceph_assert(rwstate.recovery_read_marker);
- drop_read();
- rwstate.recovery_read_marker = false;
+ assert(recovery_read_marker);
+ rwlock.unlock_for_read();
+ recovery_read_marker = false;
}
bool maybe_get_excl() {
- return rwstate.get_excl_lock();
- }
-
- bool is_request_pending() const {
- return !rwstate.empty();
+ return rwlock.try_lock_for_excl();
}
};
using ObjectContextRef = ObjectContext::Ref;