From 0cdc9684f49e4debb4784dc290e4b32a088dd6bb Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 21 May 2009 12:35:26 -0700 Subject: [PATCH] osd: some ObjectContext changes --- src/TODO | 4 + src/osd/ReplicatedPG.cc | 30 +++++--- src/osd/ReplicatedPG.h | 160 ++++++++++++++++++++++++++++++++++++---- 3 files changed, 171 insertions(+), 23 deletions(-) diff --git a/src/TODO b/src/TODO index 82061d57ddbc6..b6ea6b965de79 100644 --- a/src/TODO +++ b/src/TODO @@ -52,6 +52,10 @@ v0.9 - make mds exhert memory pressure on client caps, leases - optionally separate osd interfaces (ips) for clients and osds (replication, peering, etc.) +- object access modes + - unify read/write paths + - add objectcontext for clones as they're created (esp if write is delayed) + - make snap selection behave diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index c025b54885595..07fb75548939f 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1442,6 +1442,8 @@ void ReplicatedPG::apply_repop(RepGather *repop) repop->applied = true; + repop->obc->finish_write(); + put_object_context(repop->obc); repop->obc = 0; @@ -1651,49 +1653,55 @@ void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type, // ------------------------------------------------------- -ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(pobject_t poid) +ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(sobject_t soid) { - ObjectContext *obc = &object_contexts[poid]; + ObjectContext *obc = &object_contexts[soid]; obc->ref++; if (obc->ref > 1) { - dout(10) << "get_object_context " << poid << " " + dout(10) << "get_object_context " << soid << " " << (obc->ref-1) << " -> " << obc->ref << dendl; return obc; // already had it } // pull info off disk - obc->poid = poid; + obc->soid = soid; struct stat st; - int r = osd->store->stat(info.pgid.to_coll(), poid, &st); + int r = osd->store->stat(info.pgid.to_coll(), soid, &st); if (r == 0) { obc->exists = true; obc->size = st.st_size; bufferlist bv; - r = osd->store->getattr(info.pgid.to_coll(), poid, OI_ATTR, bv); + r = osd->store->getattr(info.pgid.to_coll(), soid, OI_ATTR, bv); assert(r >= 0); obc->oi.decode(bv); } else { - obc->oi.soid = poid; + obc->oi.soid = soid; obc->exists = false; obc->size = 0; } - dout(10) << "get_object_context " << poid << " read " << obc->oi << dendl; + dout(10) << "get_object_context " << soid << " read " << obc->oi << dendl; return obc; } void ReplicatedPG::put_object_context(ObjectContext *obc) { - dout(10) << "put_object_context " << obc->poid << " " + dout(10) << "put_object_context " << obc->soid << " " << obc->ref << " -> " << (obc->ref-1) << dendl; + if (obc->wake) { + osd->take_waiters(obc->waiting); + obc->wake = false; + } + --obc->ref; if (obc->ref == 0) { - object_contexts.erase(obc->poid); + assert(obc->waiting.empty()); + object_contexts.erase(obc->soid); if (object_contexts.empty()) kick(); } @@ -1826,6 +1834,8 @@ void ReplicatedPG::op_modify(MOSDOp *op) // note my stats utime_t now = g_clock.now(); + obc->start_write(); + // issue replica writes tid_t rep_tid = osd->get_tid(); RepGather *repop = new_repop(ctx, obc, noop, rep_tid); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 4a5c03db66c8f..e29ac3398eae0 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -45,27 +45,27 @@ public: - idle - no in-progress or waiting writes. - read: ok - - write: move to 'delayed' or 'rmw' - - rmw: move to 'rmw' + - write: ok. move to 'delayed' or 'rmw' + - rmw: ok. move to 'rmw' - delayed - delayed write in progress. delay write application on primary. - when done, move to 'idle' - read: ok - write: ok - - rmw: move to 'delayed-flushing' - - - delayed-flushing - - waiting for delayed writes to flush, then move to 'rmw' - - read, write, rmw: wait + - rmw: no. move to 'delayed-flushing' - rmw - rmw cycles in flight. applied immediately at primary. - when done, move to 'idle' - read: same client ok. otherwise, move to 'rmw-flushing' - - write: ok - - rmw: same client ok. otherwise, wait for rmw to flush + - write: same client ok. otherwise, start write, but also move to 'rmw-flushing' + - rmw: same client ok. otherwise, move to 'rmw-flushing' + - delayed-flushing + - waiting for delayed writes to flush, then move to 'rmw' + - read, write, rmw: wait + - rmw-flushing - waiting for rmw to flush, then move to 'idle' - read, write, rmw: wait @@ -80,19 +80,144 @@ public: * replicas ack. */ struct ObjectContext { + sobject_t soid; + int ref; + enum { - IDLE, DELAYED, DELAYED_FLUSHING, RMW, RMW_FLUSHING + IDLE, + DELAYED, + RMW, + DELAYED_FLUSHING, + RMW_FLUSHING } state; - int ref; - sobject_t poid; + static const char *get_state_name(int s) { + switch (s) { + case IDLE: return "idle"; + case DELAYED: return "delayed"; + case RMW: return "rmw"; + case DELAYED_FLUSHING: return "delayed-flushing"; + case RMW_FLUSHING: return "rmw-flushing"; + default: return "???"; + } + } + + int num_wr, num_rmw; + entity_inst_t client; + list waiting; + bool wake; bool exists; __u64 size; object_info_t oi; - ObjectContext() : state(IDLE), ref(0), exists(false), size(0), oi(poid) {} + bool try_read(entity_inst_t& c) { + switch (state) { + case IDLE: + case DELAYED: + return true; + case RMW: + if (c == client) + return true; + state = RMW_FLUSHING; + return false; + case DELAYED_FLUSHING: + case RMW_FLUSHING: + return false; + default: + assert(0); + } + } + bool try_write(entity_inst_t& c) { + switch (state) { + case IDLE: + state = DELAYED; + case DELAYED: + return true; + case RMW: + if (c == client) + return true; + state = RMW_FLUSHING; + return true; + case DELAYED_FLUSHING: + case RMW_FLUSHING: + return false; + default: + assert(0); + } + } + bool try_rmw(entity_inst_t& c) { + switch (state) { + case IDLE: + state = RMW; + client = c; + return true; + case DELAYED: + state = DELAYED_FLUSHING; + return false; + case RMW: + if (c == client) + return true; + state = RMW_FLUSHING; + return false; + case DELAYED_FLUSHING: + case RMW_FLUSHING: + return false; + default: + assert(0); + } + } + + void start_write() { + num_wr++; + } + void finish_write() { + assert(num_wr > 0); + --num_wr; + if (num_wr == 0) + switch (state) { + case DELAYED: + assert(!num_rmw); + state = IDLE; + wake = true; + break; + case RMW: + case DELAYED_FLUSHING: + case RMW_FLUSHING: + if (!num_rmw && !num_wr) { + state = IDLE; + wake = true; + } + break; + default: + assert(0); + } + } + + void start_rmw() { + ++num_rmw; + } + void finish_rmw() { + assert(num_rmw > 0); + --num_rmw; + if (num_rmw == 0) { + switch (state) { + case RMW: + case RMW_FLUSHING: + if (!num_rmw && !num_wr) { + state = IDLE; + wake = true; + } + break; + default: + assert(0); + } + } + } + + ObjectContext() : ref(0), state(IDLE), num_wr(0), num_rmw(0), wake(false), + exists(false), size(0), oi(soid) {} }; /* @@ -317,6 +442,15 @@ public: }; +inline ostream& operator<<(ostream& out, ReplicatedPG::ObjectContext& obc) +{ + out << "obc(" << obc.soid << " " << obc.get_state_name(obc.state); + if (!obc.waiting.empty()) + out << " WAITING"; + out << ")"; + return out; +} + inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop) { out << "repgather(" << &repop << " rep_tid=" << repop.rep_tid -- 2.39.5