}
};
-class PrimaryLogPG::C_OSD_OndiskWriteUnlock : public Context {
- ObjectContextRef obc, obc2, obc3;
- public:
- C_OSD_OndiskWriteUnlock(
- ObjectContextRef o,
- ObjectContextRef o2 = ObjectContextRef(),
- ObjectContextRef o3 = ObjectContextRef()) : obc(o), obc2(o2), obc3(o3) {}
- void finish(int r) override {
- obc->ondisk_write_unlock();
- if (obc2)
- obc2->ondisk_write_unlock();
- if (obc3)
- obc3->ondisk_write_unlock();
- }
-};
-
struct OnReadComplete : public Context {
PrimaryLogPG *pg;
PrimaryLogPG::OpContext *opcontext;
if (is_primary()) {
if (!is_delete) {
obc->obs.exists = true;
- obc->ondisk_write_lock();
bool got = obc->get_recovery_read();
assert(got);
assert(recovering.count(obc->obs.oi.soid));
recovering[obc->obs.oi.soid] = obc;
obc->obs.oi = recovery_info.oi; // may have been updated above
- t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
}
t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
ctx->user_at_version = obc->obs.oi.user_version;
dout(30) << __func__ << " user_at_version " << ctx->user_at_version << dendl;
- if (op->may_read()) {
- dout(10) << " taking ondisk_read_lock" << dendl;
- obc->ondisk_read_lock();
- }
-
{
#ifdef WITH_LTTNG
osd_reqid_t reqid = ctx->op->get_reqid();
reqid.name._num, reqid.tid, reqid.inc);
}
- if (op->may_read()) {
- dout(10) << " dropping ondisk_read_lock" << dendl;
- obc->ondisk_read_unlock();
- }
-
bool pending_async_reads = !ctx->pending_async_reads.empty();
if (result == -EINPROGRESS || pending_async_reads) {
// come back later.
}
}
- ctx->obc->ondisk_write_lock();
-
ctx->op_t->add_obc(ctx->obc);
if (ctx->clone_obc) {
- ctx->clone_obc->ondisk_write_lock();
ctx->op_t->add_obc(ctx->clone_obc);
}
if (ctx->head_obc) {
- ctx->head_obc->ondisk_write_lock();
ctx->op_t->add_obc(ctx->head_obc);
}
Context *on_all_commit = new C_OSD_RepopCommit(this, repop);
Context *on_all_applied = new C_OSD_RepopApplied(this, repop);
- Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(
- ctx->obc,
- ctx->clone_obc,
- ctx->head_obc);
if (!(ctx->log.empty())) {
assert(ctx->at_version >= projected_last_update);
projected_last_update = ctx->at_version;
min_last_complete_ondisk,
ctx->log,
ctx->updated_hset_history,
- onapplied_sync,
+ nullptr,
on_all_applied,
on_all_commit,
repop->rep_tid,
dout(10) << " already reverting " << soid << dendl;
} else {
dout(10) << " reverting " << soid << " to " << latest->prior_version << dendl;
- obc->ondisk_write_lock();
obc->obs.oi.version = latest->version;
ObjectStore::Transaction t;
++active_pushes;
- t.register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
t.register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
t.register_on_commit(new C_OSD_CommittedPushedObject(
this,
* a client write would be blocked since the object is degraded.
* In almost all cases, therefore, this lock should be uncontended.
*/
- obc->ondisk_read_lock();
int r = pgbackend->recover_object(
soid,
v,
ObjectContextRef(),
obc, // has snapset context
h);
- obc->ondisk_read_unlock();
if (r < 0) {
dout(0) << __func__ << " Error " << r << " on oid " << soid << dendl;
primary_failed(soid);
recovering.insert(make_pair(oid, obc));
// We need to take the read_lock here in order to flush in-progress writes
- obc->ondisk_read_lock();
int r = pgbackend->recover_object(
oid,
v,
ObjectContextRef(),
obc,
h);
- obc->ondisk_read_unlock();
if (r < 0) {
dout(0) << __func__ << " Error " << r << " on oid " << oid << dendl;
primary_failed(oid);
bufferlist bl;
{
- obc->ondisk_read_lock();
int r = osd->store->read(ch, ghobject_t(oid), 0, 0, bl);
assert(r >= 0);
- obc->ondisk_read_unlock();
}
HitSetRef hs(new HitSet);
bufferlist::iterator pbl = bl.begin();
Context *destructor_callback;
-private:
- Mutex lock;
public:
- Cond cond;
- int unstable_writes, readers, writers_waiting, readers_waiting;
-
// any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers.
map<pair<uint64_t, entity_name_t>, WatchRef> watchers;
ObjectContext()
: ssc(NULL),
destructor_callback(0),
- lock("PrimaryLogPG::ObjectContext::lock"),
- unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0),
blocked(false), requeue_scrub_on_unblock(false) {}
~ObjectContext() {
return blocked;
}
- // do simple synchronous mutual exclusion, for now. no waitqueues or anything fancy.
- void ondisk_write_lock() {
- lock.Lock();
- writers_waiting++;
- while (readers_waiting || readers)
- cond.Wait(lock);
- writers_waiting--;
- unstable_writes++;
- lock.Unlock();
- }
- void ondisk_write_unlock() {
- lock.Lock();
- assert(unstable_writes > 0);
- unstable_writes--;
- if (!unstable_writes && readers_waiting)
- cond.Signal();
- lock.Unlock();
- }
- void ondisk_read_lock() {
- lock.Lock();
- readers_waiting++;
- while (unstable_writes)
- cond.Wait(lock);
- readers_waiting--;
- readers++;
- lock.Unlock();
- }
- void ondisk_read_unlock() {
- lock.Lock();
- assert(readers > 0);
- readers--;
- if (!readers && writers_waiting)
- cond.Signal();
- lock.Unlock();
- }
-
/// in-progress copyfrom ops for this object
bool blocked:1;
bool requeue_scrub_on_unblock:1; // true if we need to requeue scrub on unblock
EXPECT_TRUE(missing.is_missing(oid2));
}
-class ObjectContextTest : public ::testing::Test {
-protected:
-
- static const useconds_t DELAY_MAX = 20 * 1000 * 1000;
-
- class Thread_read_lock : public Thread {
- public:
- ObjectContext &obc;
-
- explicit Thread_read_lock(ObjectContext& _obc) :
- obc(_obc)
- {
- }
-
- void *entry() override {
- obc.ondisk_read_lock();
- return NULL;
- }
- };
-
- class Thread_write_lock : public Thread {
- public:
- ObjectContext &obc;
-
- explicit Thread_write_lock(ObjectContext& _obc) :
- obc(_obc)
- {
- }
-
- void *entry() override {
- obc.ondisk_write_lock();
- return NULL;
- }
- };
-
-};
-
-TEST_F(ObjectContextTest, read_write_lock)
-{
- {
- ObjectContext obc;
-
- //
- // write_lock
- // write_lock
- // write_unlock
- // write_unlock
- //
- EXPECT_EQ(0, obc.writers_waiting);
- EXPECT_EQ(0, obc.unstable_writes);
-
- obc.ondisk_write_lock();
-
- EXPECT_EQ(0, obc.writers_waiting);
- EXPECT_EQ(1, obc.unstable_writes);
-
- obc.ondisk_write_lock();
-
- EXPECT_EQ(0, obc.writers_waiting);
- EXPECT_EQ(2, obc.unstable_writes);
-
- obc.ondisk_write_unlock();
-
- EXPECT_EQ(0, obc.writers_waiting);
- EXPECT_EQ(1, obc.unstable_writes);
-
- obc.ondisk_write_unlock();
-
- EXPECT_EQ(0, obc.writers_waiting);
- EXPECT_EQ(0, obc.unstable_writes);
- }
-
- useconds_t delay = 0;
-
- {
- ObjectContext obc;
-
- //
- // write_lock
- // read_lock => wait
- // write_unlock => signal
- // read_unlock
- //
- EXPECT_EQ(0, obc.readers_waiting);
- EXPECT_EQ(0, obc.readers);
- EXPECT_EQ(0, obc.writers_waiting);
- EXPECT_EQ(0, obc.unstable_writes);
-
- obc.ondisk_write_lock();
-
- EXPECT_EQ(0, obc.readers_waiting);
- EXPECT_EQ(0, obc.readers);
- EXPECT_EQ(0, obc.writers_waiting);
- EXPECT_EQ(1, obc.unstable_writes);
-
- Thread_read_lock t(obc);
- t.create("obc_read");
-
- do {
- cout << "Trying (1) with delay " << delay << "us\n";
- usleep(delay);
- } while (obc.readers_waiting == 0 &&
- ( delay = delay * 2 + 1) < DELAY_MAX);
-
- EXPECT_EQ(1, obc.readers_waiting);
- EXPECT_EQ(0, obc.readers);
- EXPECT_EQ(0, obc.writers_waiting);
- EXPECT_EQ(1, obc.unstable_writes);
-
- obc.ondisk_write_unlock();
-
- do {
- cout << "Trying (2) with delay " << delay << "us\n";
- usleep(delay);
- } while ((obc.readers == 0 || obc.readers_waiting == 1) &&
- ( delay = delay * 2 + 1) < DELAY_MAX);
- EXPECT_EQ(0, obc.readers_waiting);
- EXPECT_EQ(1, obc.readers);
- EXPECT_EQ(0, obc.writers_waiting);
- EXPECT_EQ(0, obc.unstable_writes);
-
- obc.ondisk_read_unlock();
-
- EXPECT_EQ(0, obc.readers_waiting);
- EXPECT_EQ(0, obc.readers);
- EXPECT_EQ(0, obc.writers_waiting);
- EXPECT_EQ(0, obc.unstable_writes);
-
- t.join();
- }
-
- {
- ObjectContext obc;
-
- //
- // read_lock
- // write_lock => wait
- // read_unlock => signal
- // write_unlock
- //
- EXPECT_EQ(0, obc.readers_waiting);
- EXPECT_EQ(0, obc.readers);
- EXPECT_EQ(0, obc.writers_waiting);
- EXPECT_EQ(0, obc.unstable_writes);
-
- obc.ondisk_read_lock();
-
- EXPECT_EQ(0, obc.readers_waiting);
- EXPECT_EQ(1, obc.readers);
- EXPECT_EQ(0, obc.writers_waiting);
- EXPECT_EQ(0, obc.unstable_writes);
-
- Thread_write_lock t(obc);
- t.create("obc_write");
-
- do {
- cout << "Trying (3) with delay " << delay << "us\n";
- usleep(delay);
- } while ((obc.writers_waiting == 0) &&
- ( delay = delay * 2 + 1) < DELAY_MAX);
-
- EXPECT_EQ(0, obc.readers_waiting);
- EXPECT_EQ(1, obc.readers);
- EXPECT_EQ(1, obc.writers_waiting);
- EXPECT_EQ(0, obc.unstable_writes);
-
- obc.ondisk_read_unlock();
-
- do {
- cout << "Trying (4) with delay " << delay << "us\n";
- usleep(delay);
- } while ((obc.unstable_writes == 0 || obc.writers_waiting == 1) &&
- ( delay = delay * 2 + 1) < DELAY_MAX);
-
- EXPECT_EQ(0, obc.readers_waiting);
- EXPECT_EQ(0, obc.readers);
- EXPECT_EQ(0, obc.writers_waiting);
- EXPECT_EQ(1, obc.unstable_writes);
-
- obc.ondisk_write_unlock();
-
- EXPECT_EQ(0, obc.readers_waiting);
- EXPECT_EQ(0, obc.readers);
- EXPECT_EQ(0, obc.writers_waiting);
- EXPECT_EQ(0, obc.unstable_writes);
-
- t.join();
- }
-
-}
-
TEST(pg_pool_t_test, get_pg_num_divisor) {
pg_pool_t p;
p.set_pg_num(16);