repop->applied = true;
+ repop->obc->finish_write();
+
put_object_context(repop->obc);
repop->obc = 0;
// -------------------------------------------------------
-ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(pobject_t poid)
+ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(sobject_t soid)
{
- ObjectContext *obc = &object_contexts[poid];
+ ObjectContext *obc = &object_contexts[soid];
obc->ref++;
if (obc->ref > 1) {
- dout(10) << "get_object_context " << poid << " "
+ dout(10) << "get_object_context " << soid << " "
<< (obc->ref-1) << " -> " << obc->ref << dendl;
return obc; // already had it
}
// pull info off disk
- obc->poid = poid;
+ obc->soid = soid;
struct stat st;
- int r = osd->store->stat(info.pgid.to_coll(), poid, &st);
+ int r = osd->store->stat(info.pgid.to_coll(), soid, &st);
if (r == 0) {
obc->exists = true;
obc->size = st.st_size;
bufferlist bv;
- r = osd->store->getattr(info.pgid.to_coll(), poid, OI_ATTR, bv);
+ r = osd->store->getattr(info.pgid.to_coll(), soid, OI_ATTR, bv);
assert(r >= 0);
obc->oi.decode(bv);
} else {
- obc->oi.soid = poid;
+ obc->oi.soid = soid;
obc->exists = false;
obc->size = 0;
}
- dout(10) << "get_object_context " << poid << " read " << obc->oi << dendl;
+ dout(10) << "get_object_context " << soid << " read " << obc->oi << dendl;
return obc;
}
void ReplicatedPG::put_object_context(ObjectContext *obc)
{
- dout(10) << "put_object_context " << obc->poid << " "
+ dout(10) << "put_object_context " << obc->soid << " "
<< obc->ref << " -> " << (obc->ref-1) << dendl;
+ if (obc->wake) {
+ osd->take_waiters(obc->waiting);
+ obc->wake = false;
+ }
+
--obc->ref;
if (obc->ref == 0) {
- object_contexts.erase(obc->poid);
+ assert(obc->waiting.empty());
+ object_contexts.erase(obc->soid);
if (object_contexts.empty())
kick();
}
// note my stats
utime_t now = g_clock.now();
+ obc->start_write();
+
// issue replica writes
tid_t rep_tid = osd->get_tid();
RepGather *repop = new_repop(ctx, obc, noop, rep_tid);
- idle
- no in-progress or waiting writes.
- read: ok
- - write: move to 'delayed' or 'rmw'
- - rmw: move to 'rmw'
+ - write: ok. move to 'delayed' or 'rmw'
+ - rmw: ok. move to 'rmw'
- delayed
- delayed write in progress. delay write application on primary.
- when done, move to 'idle'
- read: ok
- write: ok
- - rmw: move to 'delayed-flushing'
-
- - delayed-flushing
- - waiting for delayed writes to flush, then move to 'rmw'
- - read, write, rmw: wait
+ - rmw: no. move to 'delayed-flushing'
- rmw
- rmw cycles in flight. applied immediately at primary.
- when done, move to 'idle'
- read: same client ok. otherwise, move to 'rmw-flushing'
- - write: ok
- - rmw: same client ok. otherwise, wait for rmw to flush
+ - write: same client ok. otherwise, start write, but also move to 'rmw-flushing'
+ - rmw: same client ok. otherwise, move to 'rmw-flushing'
+ - delayed-flushing
+ - waiting for delayed writes to flush, then move to 'rmw'
+ - read, write, rmw: wait
+
- rmw-flushing
- waiting for rmw to flush, then move to 'idle'
- read, write, rmw: wait
* replicas ack.
*/
struct ObjectContext {
+ sobject_t soid;
+ int ref;
+
enum {
- IDLE, DELAYED, DELAYED_FLUSHING, RMW, RMW_FLUSHING
+ IDLE,
+ DELAYED,
+ RMW,
+ DELAYED_FLUSHING,
+ RMW_FLUSHING
} state;
- int ref;
- sobject_t poid;
+ static const char *get_state_name(int s) {
+ switch (s) {
+ case IDLE: return "idle";
+ case DELAYED: return "delayed";
+ case RMW: return "rmw";
+ case DELAYED_FLUSHING: return "delayed-flushing";
+ case RMW_FLUSHING: return "rmw-flushing";
+ default: return "???";
+ }
+ }
+
+ int num_wr, num_rmw;
+ entity_inst_t client;
+ list<Message*> waiting;
+ bool wake;
bool exists;
__u64 size;
object_info_t oi;
- ObjectContext() : state(IDLE), ref(0), exists(false), size(0), oi(poid) {}
+ bool try_read(entity_inst_t& c) {
+ switch (state) {
+ case IDLE:
+ case DELAYED:
+ return true;
+ case RMW:
+ if (c == client)
+ return true;
+ state = RMW_FLUSHING;
+ return false;
+ case DELAYED_FLUSHING:
+ case RMW_FLUSHING:
+ return false;
+ default:
+ assert(0);
+ }
+ }
+ bool try_write(entity_inst_t& c) {
+ switch (state) {
+ case IDLE:
+ state = DELAYED;
+ case DELAYED:
+ return true;
+ case RMW:
+ if (c == client)
+ return true;
+ state = RMW_FLUSHING;
+ return true;
+ case DELAYED_FLUSHING:
+ case RMW_FLUSHING:
+ return false;
+ default:
+ assert(0);
+ }
+ }
+ bool try_rmw(entity_inst_t& c) {
+ switch (state) {
+ case IDLE:
+ state = RMW;
+ client = c;
+ return true;
+ case DELAYED:
+ state = DELAYED_FLUSHING;
+ return false;
+ case RMW:
+ if (c == client)
+ return true;
+ state = RMW_FLUSHING;
+ return false;
+ case DELAYED_FLUSHING:
+ case RMW_FLUSHING:
+ return false;
+ default:
+ assert(0);
+ }
+ }
+
+ void start_write() {
+ num_wr++;
+ }
+ void finish_write() {
+ assert(num_wr > 0);
+ --num_wr;
+ if (num_wr == 0)
+ switch (state) {
+ case DELAYED:
+ assert(!num_rmw);
+ state = IDLE;
+ wake = true;
+ break;
+ case RMW:
+ case DELAYED_FLUSHING:
+ case RMW_FLUSHING:
+ if (!num_rmw && !num_wr) {
+ state = IDLE;
+ wake = true;
+ }
+ break;
+ default:
+ assert(0);
+ }
+ }
+
+ void start_rmw() {
+ ++num_rmw;
+ }
+ void finish_rmw() {
+ assert(num_rmw > 0);
+ --num_rmw;
+ if (num_rmw == 0) {
+ switch (state) {
+ case RMW:
+ case RMW_FLUSHING:
+ if (!num_rmw && !num_wr) {
+ state = IDLE;
+ wake = true;
+ }
+ break;
+ default:
+ assert(0);
+ }
+ }
+ }
+
+ ObjectContext() : ref(0), state(IDLE), num_wr(0), num_rmw(0), wake(false),
+ exists(false), size(0), oi(soid) {}
};
/*
};
+inline ostream& operator<<(ostream& out, ReplicatedPG::ObjectContext& obc)
+{
+ out << "obc(" << obc.soid << " " << obc.get_state_name(obc.state);
+ if (!obc.waiting.empty())
+ out << " WAITING";
+ out << ")";
+ return out;
+}
+
inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop)
{
out << "repgather(" << &repop << " rep_tid=" << repop.rep_tid