From 943bbbf1715d5b408d1762129fa91a64b2da2478 Mon Sep 17 00:00:00 2001 From: sageweil Date: Wed, 6 Jun 2007 16:01:43 +0000 Subject: [PATCH] * recast in terms of read-balance attribute git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1399 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/pgs/client/SyntheticClient.cc | 8 +- branches/sage/pgs/client/SyntheticClient.h | 2 +- branches/sage/pgs/messages/MOSDOp.h | 15 +- branches/sage/pgs/osd/OSD.cc | 43 +- branches/sage/pgs/osd/OSD.h | 14 +- branches/sage/pgs/osd/PG.cc | 26 + branches/sage/pgs/osd/PG.h | 12 +- branches/sage/pgs/osd/ReplicatedPG.cc | 520 ++++++++++---------- branches/sage/pgs/osd/ReplicatedPG.h | 8 +- 9 files changed, 334 insertions(+), 314 deletions(-) diff --git a/branches/sage/pgs/client/SyntheticClient.cc b/branches/sage/pgs/client/SyntheticClient.cc index 7618c627adb6b..a42a8e8b94632 100644 --- a/branches/sage/pgs/client/SyntheticClient.cc +++ b/branches/sage/pgs/client/SyntheticClient.cc @@ -482,7 +482,7 @@ int SyntheticClient::run() int iarg1 = iargs.front(); iargs.pop_front(); int iarg2 = iargs.front(); iargs.pop_front(); if (run_me()) - read_file(sarg1, iarg1, iarg2); + read_file(sarg1, iarg1, iarg2, true); } break; case SYNCLIENT_MODE_WRITEBATCH: @@ -1138,7 +1138,7 @@ int SyntheticClient::write_batch(int nfile, int size, int wrsize) return 0; } -int SyntheticClient::read_file(string& fn, int size, int rdsize) // size is in MB, wrsize in bytes +int SyntheticClient::read_file(string& fn, int size, int rdsize, bool ignoreprint) // size is in MB, wrsize in bytes { char *buf = new char[rdsize]; memset(buf, 1, rdsize); @@ -1169,14 +1169,14 @@ int SyntheticClient::read_file(string& fn, int size, int rdsize) // size is in p++; if (readoff != wantoff || readclient != client->get_nodeid()) { - if (!bad) + if (!bad && !ignoreprint) dout(0) << "WARNING: wrong data from OSD, block says fileoffset=" << readoff << " client=" << readclient << ", should be offset " << wantoff << " clietn " << client->get_nodeid() << endl; bad++; } } - if (bad) + if (bad && !ignoreprint) dout(0) << " + " << (bad-1) << " other bad 16-byte bits in this block" << endl; } diff --git a/branches/sage/pgs/client/SyntheticClient.h b/branches/sage/pgs/client/SyntheticClient.h index cd6033f1fcc12..dc1cf58121d26 100644 --- a/branches/sage/pgs/client/SyntheticClient.h +++ b/branches/sage/pgs/client/SyntheticClient.h @@ -213,7 +213,7 @@ class SyntheticClient { int write_file(string& fn, int mb, int chunk); int write_batch(int nfile, int mb, int chunk); - int read_file(string& fn, int mb, int chunk); + int read_file(string& fn, int mb, int chunk, bool ignoreprint=false); int read_random(string& fn, int mb, int chunk); int read_random_ex(string& fn, int mb, int chunk); diff --git a/branches/sage/pgs/messages/MOSDOp.h b/branches/sage/pgs/messages/MOSDOp.h index fc3c410094fc1..220da427aeb02 100644 --- a/branches/sage/pgs/messages/MOSDOp.h +++ b/branches/sage/pgs/messages/MOSDOp.h @@ -47,12 +47,12 @@ #define OSD_OP_UPLOCK 24 #define OSD_OP_DNLOCK 25 -#define OSD_OP_PRIMARYLOCK 26 -#define OSD_OP_PRIMARYUNLOCK 27 - #define OSD_OP_PULL 30 #define OSD_OP_PUSH 31 +#define OSD_OP_BALANCEREADS 101 +#define OSD_OP_UNBALANCEREADS 102 + class MOSDOp : public Message { @@ -74,8 +74,8 @@ public: case OSD_OP_UPLOCK: return "uplock"; case OSD_OP_DNLOCK: return "dnlock"; - case OSD_OP_PRIMARYLOCK: return "primary-lock"; - case OSD_OP_PRIMARYUNLOCK: return "primary-unlock"; + case OSD_OP_BALANCEREADS: return "balance-reads"; + case OSD_OP_UNBALANCEREADS: return "unbalance-reads"; case OSD_OP_PULL: return "pull"; case OSD_OP_PUSH: return "push"; @@ -129,6 +129,11 @@ private: const entity_inst_t& get_client_inst() { return st.client; } void set_client_inst(const entity_inst_t& i) { st.client = i; } + bool wants_reply() { + if (st.op < 100) return true; + return false; // no reply needed for primary-lock, -unlock. + } + const tid_t get_rep_tid() { return st.rep_tid; } void set_rep_tid(tid_t t) { st.rep_tid = t; } diff --git a/branches/sage/pgs/osd/OSD.cc b/branches/sage/pgs/osd/OSD.cc index 3d4af30d3b03d..96e80e8337c01 100644 --- a/branches/sage/pgs/osd/OSD.cc +++ b/branches/sage/pgs/osd/OSD.cc @@ -616,12 +616,15 @@ void OSD::activate_pg(pg_t pgid, epoch_t epoch) } // finishers? + finished_lock.Lock(); if (finished.empty()) { + finished_lock.Unlock(); osd_lock.Unlock(); } else { list waiting; waiting.splice(waiting.begin(), finished); + finished_lock.Unlock(); osd_lock.Unlock(); for (list::iterator it = waiting.begin(); @@ -858,10 +861,12 @@ void OSD::dispatch(Message *m) } // finishers? + finished_lock.Lock(); if (!finished.empty()) { list waiting; waiting.splice(waiting.begin(), finished); + finished_lock.Unlock(); osd_lock.Unlock(); for (list::iterator it = waiting.begin(); @@ -872,6 +877,7 @@ void OSD::dispatch(Message *m) return; } + finished_lock.Unlock(); osd_lock.Unlock(); } @@ -1530,17 +1536,17 @@ bool OSD::require_current_map(Message *m, epoch_t ep) */ bool OSD::require_same_or_newer_map(Message *m, epoch_t epoch) { - dout(10) << "require_same_or_newer_map " << epoch << " (i am " << osdmap->get_epoch() << ")" << dendl; + dout(15) << "require_same_or_newer_map " << epoch << " (i am " << osdmap->get_epoch() << ")" << dendl; // newer map? if (epoch > osdmap->get_epoch()) { - dout(7) << " from newer map epoch " << epoch << " > " << osdmap->get_epoch() << dendl; + dout(7) << "waiting for newer map epoch " << epoch << " > my " << osdmap->get_epoch() << dendl; wait_for_new_map(m); return false; } if (epoch < boot_epoch) { - dout(7) << " from pre-boot epoch " << epoch << " < " << boot_epoch << dendl; + dout(7) << "from pre-boot epoch " << epoch << " < " << boot_epoch << dendl; delete m; return false; } @@ -2072,7 +2078,7 @@ void OSD::handle_op(MOSDOp *op) return; } - dout(10) << "handle_op " << *op << " in " << *pg << endl; + dout(10) << "handle_op " << *op << " in " << *pg << dendl; } else { // REPLICATION OP (it's from another OSD) @@ -2260,32 +2266,3 @@ void OSD::wait_for_no_ops() -// ============================== -// Object locking - -// -// If the target object of the operation op is locked for writing by another client, the function puts op to the waiting queue waiting_for_wr_unlock -// returns true if object was locked, otherwise returns false -// -bool OSD::block_if_wrlocked(MOSDOp* op) -{ - object_t oid = op->get_oid(); - - entity_name_t source; - int len = store->getattr(oid, "wrlock", &source, sizeof(entity_name_t)); - //cout << "getattr returns " << len << " on " << oid << dendl; - - if (len == sizeof(source) && - source != op->get_client()) { - //the object is locked for writing by someone else -- add the op to the waiting queue - waiting_for_wr_unlock[oid].push_back(op); - return true; - } - - return false; //the object wasn't locked, so the operation can be handled right away -} - - - -// =============================== -// OPS diff --git a/branches/sage/pgs/osd/OSD.h b/branches/sage/pgs/osd/OSD.h index 3701e1d49eb3a..d067260c4729d 100644 --- a/branches/sage/pgs/osd/OSD.h +++ b/branches/sage/pgs/osd/OSD.h @@ -175,24 +175,20 @@ private: int hb_stat_ops; // ops since last heartbeat int hb_stat_qlen; // cumulative queue length since last hb - hash_map peer_qlen; + hash_map peer_qlen; hash_map peer_read_time; // -- waiters -- list finished; - + Mutex finished_lock; + void take_waiters(list& ls) { + finished_lock.Lock(); finished.splice(finished.end(), ls); + finished_lock.Unlock(); } - // -- object locking -- - hash_map > waiting_for_wr_unlock; - hash_map > waiting_for_primary_unlock; - - bool block_if_wrlocked(class MOSDOp* op); - - // -- op queue -- class ThreadPool *threadpool; hash_map > op_queue; diff --git a/branches/sage/pgs/osd/PG.cc b/branches/sage/pgs/osd/PG.cc index ad93fb701ad52..2592bd9ca69f8 100644 --- a/branches/sage/pgs/osd/PG.cc +++ b/branches/sage/pgs/osd/PG.cc @@ -1141,6 +1141,31 @@ void PG::read_log(ObjectStore *store) +// ============================== +// Object locking + +// +// If the target object of the operation op is locked for writing by another client, the function puts op to the waiting queue waiting_for_wr_unlock +// returns true if object was locked, otherwise returns false +// +bool PG::block_if_wrlocked(MOSDOp* op) +{ + object_t oid = op->get_oid(); + + entity_name_t source; + int len = osd->store->getattr(oid, "wrlock", &source, sizeof(entity_name_t)); + //cout << "getattr returns " << len << " on " << oid << dendl; + + if (len == sizeof(source) && + source != op->get_client()) { + //the object is locked for writing by someone else -- add the op to the waiting queue + waiting_for_wr_unlock[oid].push_back(op); + return true; + } + + return false; //the object wasn't locked, so the operation can be handled right away +} + @@ -1201,3 +1226,4 @@ bool PG::pick_object_rev(object_t& oid) + diff --git a/branches/sage/pgs/osd/PG.h b/branches/sage/pgs/osd/PG.h index 591abacda46a0..e591bd3f457f0 100644 --- a/branches/sage/pgs/osd/PG.h +++ b/branches/sage/pgs/osd/PG.h @@ -461,6 +461,11 @@ protected: list > waiting_for_missing_object; map replay_queue; + hash_map > waiting_for_wr_unlock; + + bool block_if_wrlocked(MOSDOp* op); + + // recovery map objects_pulling; // which objects are currently being pulled @@ -549,7 +554,12 @@ public: void set_role(int r) { role = r; } bool is_primary() const { return role == PG_ROLE_HEAD; } - bool is_acker() const { return role == PG_ROLE_ACKER; } + bool is_acker() const { + if (g_conf.osd_rep == OSD_REP_PRIMARY) + return is_primary(); + else + return role == PG_ROLE_ACKER; + } bool is_head() const { return role == PG_ROLE_HEAD; } bool is_middle() const { return role == PG_ROLE_MIDDLE; } bool is_residual() const { return role == PG_ROLE_STRAY; } diff --git a/branches/sage/pgs/osd/ReplicatedPG.cc b/branches/sage/pgs/osd/ReplicatedPG.cc index c1d80baaa223d..ebadf9ded163f 100644 --- a/branches/sage/pgs/osd/ReplicatedPG.cc +++ b/branches/sage/pgs/osd/ReplicatedPG.cc @@ -108,190 +108,142 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op) // -- load balance reads -- - if (g_conf.osd_balance_reads) { - - // replicate/unreplicate? - if (!is_acker()) { - // -- replica -- - if (op->get_op() == OSD_OP_REPLICATE) { - dout(-10) << "preprocess_op replicating " << op->get_oid() << endl; - replicated_objects.insert(op->get_oid()); - delete op; - return true; - } - if (op->get_op() == OSD_OP_UNREPLICATE) { - dout(-10) << "preprocess_op un-replicating " << op->get_oid() << endl; - replicated_objects.erase(op->get_oid()); - delete op; + if (g_conf.osd_balance_reads && + is_primary() && + g_conf.osd_rep == OSD_REP_PRIMARY) { + // -- read on primary+acker --- + + // test + if (false) { + if (acting.size() > 1) { + int peer = acting[1]; + dout(-10) << "preprocess_op fwd client read op to osd" << peer << " for " << op->get_client() << " " << op->get_client_inst() << dendl; + osd->messenger->send_message(op, osd->osdmap->get_inst(peer)); return true; } + } - if (!op->get_source().is_osd()) { - // -- read on replica -- - if (!osd->store->exists(op->get_oid())) { - // fwd to primary - dout(-10) << "preprocess_op got read on replica, object dne, fwd to primary" << endl; - osd->messenger->send_message(op, osd->osdmap->get_inst(get_primary())); - return true; - } - - // primary lock? - // FIXME: this may cause a (blocking) stat+disk seek. - char v; - if (osd->store->getattr(op->get_oid(), "primary-lock", &v, 1) >= 0) { - dout(-10) << "preprocess_op primary-lock on " << op->get_oid() << " fwd to primary" << endl; - osd->messenger->send_message(op, osd->osdmap->get_inst(get_primary())); - return true; - } - - // in replicate list? - if (replicated_objects.count(op->get_oid())) { - // yes. continue. - // note that we've already failed the fastpath above. - dout(-10) << "preprocess_op got read on replica, object replicated, processing/queuing as usual" << endl; - } else { - // no. forward to primary. - dout(-10) << "preprocess_op got read on replica, object not replicated, fwd to primary" << endl; - osd->messenger->send_message(op, osd->osdmap->get_inst(get_primary())); - return true; - } + // -- flash crowd? + if (!op->get_source().is_osd()) { + // candidate? + bool is_flash_crowd_candidate = osd->iat_averager.is_flash_crowd_candidate( op->get_oid() ); + bool is_balanced = false; + bool b; + if (osd->store->getattr(op->get_oid(), "balance-reads", &b, 1) >= 0) + is_balanced = true; + + if (!is_balanced && is_flash_crowd_candidate && + balancing_reads.count(op->get_oid()) == 0) { + dout(-10) << "preprocess_op balance-reads on " << op->get_oid() << dendl; + balancing_reads.insert(op->get_oid()); + MOSDOp *pop = new MOSDOp(osd->messenger->get_myinst(), 0, osd->get_tid(), + op->get_oid(), + ObjectLayout(info.pgid), + osd->osdmap->get_epoch(), + OSD_OP_BALANCEREADS); + do_op(pop); } - } - - if (is_acker()) { - // -- read on acker --- - - // test - if (false) { - if (acting.size() > 1) { - int peer = acting[1]; - dout(-10) << "preprocess_op fwd client read op to osd" << peer << " for " << op->get_client() << " " << op->get_client_inst() << dendl; - osd->messenger->send_message(op, osd->osdmap->get_inst(peer)); - return true; - } + if (is_balanced && !is_flash_crowd_candidate && + !unbalancing_reads.count(op->get_oid()) == 0) { + dout(-10) << "preprocess_op unbalance-reads on " << op->get_oid() << dendl; + unbalancing_reads.insert(op->get_oid()); + MOSDOp *pop = new MOSDOp(osd->messenger->get_myinst(), 0, osd->get_tid(), + op->get_oid(), + ObjectLayout(info.pgid), + osd->osdmap->get_epoch(), + OSD_OP_UNBALANCEREADS); + do_op(pop); } + } + + + // check my load. + // TODO xxx we must also compare with our own load + // if i am x percentage higher than replica , + // redirect the read + + if ( g_conf.osd_balance_reads == LOAD_LATENCY) { + double mean_read_time = osd->load_calc.get_average(); - // -- flash crowd? - if (!op->get_source().is_osd()) { - // candidate? - bool is_flash_crowd_candidate = osd->iat_averager.is_flash_crowd_candidate( op->get_oid() ); - bool is_replicated = replicated_objects.count( op->get_oid() ); + if ( mean_read_time != -1 ) { - if (!is_replicated && is_flash_crowd_candidate) { - // replicate - dout(-10) << "preprocess_op replicating " << op->get_oid() << endl; - replicated_objects.insert(op->get_oid()); - for (unsigned i=1; imessenger->send_message(new MOSDOp(osd->messenger->get_myinst(), 0, 0, - op->get_oid(), ObjectLayout(info.pgid), - osd->osdmap->get_epoch(), - OSD_OP_REPLICATE), - osd->osdmap->get_inst(acting[i])); - } - } - if (is_replicated && !is_flash_crowd_candidate) { - // unreplicate - dout(-10) << "preprocess_op unreplicating " << op->get_oid() << endl; - replicated_objects.erase(op->get_oid()); - for (unsigned i=1; imessenger->send_message(new MOSDOp(osd->messenger->get_myinst(), 0, 0, - op->get_oid(), ObjectLayout(info.pgid), - osd->osdmap->get_epoch(), - OSD_OP_REPLICATE), - osd->osdmap->get_inst(acting[i])); + for (unsigned i=1; + ipeer_read_time[peer] + << " of peer" << peer << dendl; + + if ( osd->peer_read_time.count(peer) && + ( (osd->peer_read_time[peer]*100/mean_read_time) < + (100 - g_conf.osd_load_diff_percent))) { + dout(10) << " forwarding to peer osd" << peer << dendl; + + osd->messenger->send_message(op, osd->osdmap->get_inst(peer)); + return true; } } } + } + else if ( g_conf.osd_balance_reads == LOAD_QUEUE_SIZE ) { + // am i above my average? + float my_avg = osd->hb_stat_qlen / osd->hb_stat_ops; - - // check my load. - // TODO xxx we must also compare with our own load - // if i am x percentage higher than replica , - // redirect the read - - if ( g_conf.osd_balance_reads == LOAD_LATENCY) { - double mean_read_time = osd->load_calc.get_average(); - - if ( mean_read_time != -1 ) { - - for (unsigned i=1; - ipeer_read_time[peer] - << " of peer" << peer << endl; + if (osd->pending_ops > my_avg) { + // is there a peer who is below my average? + for (unsigned i=1; ipeer_qlen.count(peer) && + osd->peer_qlen[peer] < my_avg) { + // calculate a probability that we should redirect + float p = (my_avg - osd->peer_qlen[peer]) / my_avg; // this is dumb. - if ( osd->peer_read_time.count(peer) && - ( (osd->peer_read_time[peer]*100/mean_read_time) < - (100 - g_conf.osd_load_diff_percent))) { - dout(10) << " forwarding to peer osd" << peer << endl; - + if (drand48() <= p) { + // take the first one + dout(10) << "my qlen " << osd->pending_ops << " > my_avg " << my_avg + << ", p=" << p + << ", fwd to peer w/ qlen " << osd->peer_qlen[peer] + << " osd" << peer + << dendl; osd->messenger->send_message(op, osd->osdmap->get_inst(peer)); return true; } } } } - else if ( g_conf.osd_balance_reads == LOAD_QUEUE_SIZE ) { - // am i above my average? - float my_avg = osd->hb_stat_qlen / osd->hb_stat_ops; - - if (osd->pending_ops > my_avg) { - // is there a peer who is below my average? - for (unsigned i=1; ipeer_qlen.count(peer) && - osd->peer_qlen[peer] < my_avg) { - // calculate a probability that we should redirect - float p = (my_avg - osd->peer_qlen[peer]) / my_avg; // this is dumb. - - if (drand48() <= p) { - // take the first one - dout(10) << "my qlen " << osd->pending_ops << " > my_avg " << my_avg - << ", p=" << p - << ", fwd to peer w/ qlen " << osd->peer_qlen[peer] - << " osd" << peer - << dendl; - osd->messenger->send_message(op, osd->osdmap->get_inst(peer)); - return true; - } - } - } - } - } + } + + else if ( g_conf.osd_balance_reads == LOAD_HYBRID ) { + // am i above my average? + float my_avg = osd->hb_stat_qlen / osd->hb_stat_ops; - else if ( g_conf.osd_balance_reads == LOAD_HYBRID ) { - // am i above my average? - float my_avg = osd->hb_stat_qlen / osd->hb_stat_ops; - - if (osd->pending_ops > my_avg) { - // is there a peer who is below my average? - for (unsigned i=1; ipeer_qlen.count(peer) && - osd->peer_qlen[peer] < my_avg) { - // calculate a probability that we should redirect - //float p = (my_avg - peer_qlen[peer]) / my_avg; // this is dumb. - - double mean_read_time = osd->load_calc.get_average(); - - if ( mean_read_time != -1 && - osd->peer_read_time.count(peer) && - ( (osd->peer_read_time[peer]*100/mean_read_time) < - ( 100 - g_conf.osd_load_diff_percent) ) ) - //if (drand48() <= p) { - // take the first one - dout(10) << "using hybrid :my qlen " << osd->pending_ops << " > my_avg " << my_avg - << "my read time "<< mean_read_time - << "peer read time " << osd->peer_read_time[peer] - << ", fwd to peer w/ qlen " << osd->peer_qlen[peer] - << " osd" << peer - << endl; - osd->messenger->send_message(op, osd->osdmap->get_inst(peer)); - return true; - } + if (osd->pending_ops > my_avg) { + // is there a peer who is below my average? + for (unsigned i=1; ipeer_qlen.count(peer) && + osd->peer_qlen[peer] < my_avg) { + // calculate a probability that we should redirect + //float p = (my_avg - peer_qlen[peer]) / my_avg; // this is dumb. + + double mean_read_time = osd->load_calc.get_average(); + + if ( mean_read_time != -1 && + osd->peer_read_time.count(peer) && + ( (osd->peer_read_time[peer]*100/mean_read_time) < + ( 100 - g_conf.osd_load_diff_percent) ) ) + //if (drand48() <= p) { + // take the first one + dout(10) << "using hybrid :my qlen " << osd->pending_ops << " > my_avg " << my_avg + << "my read time "<< mean_read_time + << "peer read time " << osd->peer_read_time[peer] + << ", fwd to peer w/ qlen " << osd->peer_qlen[peer] + << " osd" << peer + << dendl; + osd->messenger->send_message(op, osd->osdmap->get_inst(peer)); + return true; } } } @@ -305,8 +257,19 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op) if (osd->store->is_cached( op->get_oid() , op->get_offset(), op->get_length() ) == 0) { + if (!is_primary()) { + // am i allowed? + bool v; + if (osd->store->getattr(op->get_oid(), "balance-reads", &v, 1) < 0) { + dout(10) << "preprocess_op in-cache but no balance-reads on " << op->get_oid() + << ", fwd to primary" << dendl; + osd->messenger->send_message(op, osd->osdmap->get_inst(get_primary())); + return true; + } + } + // do it now - dout(-10) << "preprocess_op data is in cache, reading from cache" << *op << endl; + dout(-10) << "preprocess_op data is in cache, reading from cache" << *op << dendl; do_op(op); return true; } @@ -330,14 +293,8 @@ void ReplicatedPG::do_op(MOSDOp *op) // reads case OSD_OP_READ: - if (osd->block_if_wrlocked(op)) - return; - op_read(op); - break; case OSD_OP_STAT: - if (osd->block_if_wrlocked(op)) - return; - op_stat(op); + op_read(op); break; // rep stuff @@ -403,83 +360,91 @@ void ReplicatedPG::do_op_reply(MOSDOpReply *r) // ======================================================================== // READS -int ReplicatedPG::op_read(MOSDOp *op) +void ReplicatedPG::op_read(MOSDOp *op) { object_t oid = op->get_oid(); - dout(10) << "op_read " << oid + dout(10) << "op_read " << MOSDOp::get_opname(op->get_op()) + << " " << oid << " " << op->get_offset() << "~" << op->get_length() - //<< " in " << *pg << dendl; + + // wrlocked? + if (block_if_wrlocked(op)) + return; + + // !primary and unbalanced? + if (!is_primary()) { + // make sure i exist and am balanced, otherwise fw back to acker. + bool b; + if (!osd->store->exists(oid) || + osd->store->getattr(oid, "balance-reads", &b, 1) < 0) { + dout(-10) << "read on replica, object " << oid + << " dne or no balance-reads, fw back to primary" << dendl; + osd->messenger->send_message(op, osd->osdmap->get_inst(get_acker())); + return; + } + } + + // set up reply + MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true); long r = 0; - bufferlist bl; + // do it. if (oid.rev && !pick_object_rev(oid)) { // we have no revision for this request. r = -EEXIST; } else { - // read into a buffer - r = osd->store->read(oid, - op->get_offset(), op->get_length(), - bl); + switch (op->get_op()) { + case OSD_OP_READ: + { + // read into a buffer + bufferlist bl; + r = osd->store->read(oid, + op->get_offset(), op->get_length(), + bl); + reply->set_data(bl); + reply->set_length(r); + dout(15) << " read got " << r << " / " << op->get_length() << " bytes from obj " << oid << dendl; + } + break; + + case OSD_OP_STAT: + { + struct stat st; + memset(&st, sizeof(st), 0); + r = osd->store->stat(oid, &st); + if (r >= 0) + reply->set_object_size(st.st_size); + } + break; + + default: + assert(0); + } } - // set up reply - MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true); if (r >= 0) { reply->set_result(0); - reply->set_data(bl); - reply->set_length(r); dout(10) << "READ TIME DIFF" << (double)g_clock.now()-op->get_received_time() - << endl; + << dendl; osd->load_calc.add((double)g_clock.now() - op->get_received_time()); } else { reply->set_result(r); // error - reply->set_length(0); } - dout(10) << " read got " << r << " / " << op->get_length() << " bytes from obj " << oid << dendl; - + // send it osd->messenger->send_message(reply, op->get_client_inst()); delete op; - - return r; } -void ReplicatedPG::op_stat(MOSDOp *op) -{ - object_t oid = op->get_oid(); - - struct stat st; - memset(&st, sizeof(st), 0); - int r = 0; - - if (oid.rev && !pick_object_rev(oid)) { - // we have no revision for this request. - r = -EEXIST; - } else { - r = osd->store->stat(oid, &st); - } - - dout(3) << "op_stat on " << oid - << " r = " << r - << " size = " << st.st_size - //<< " in " << *pg - << dendl; - - MOSDOpReply *reply = new MOSDOpReply(op, r, osd->osdmap->get_epoch(), true); - reply->set_object_size(st.st_size); - osd->messenger->send_message(reply, op->get_client_inst()); - - delete op; -} @@ -576,42 +541,24 @@ void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t, case OSD_OP_WRLOCK: { // lock object - //r = store->setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t), oncommit); t.setattr(oid, "wrlock", &op->get_client(), sizeof(entity_name_t)); } break; - case OSD_OP_WRUNLOCK: { // unlock objects - //r = store->rmattr(oid, "wrlock", oncommit); t.rmattr(oid, "wrlock"); - - // unblock all operations that were waiting for this object to become unlocked - if (osd->waiting_for_wr_unlock.count(oid)) { - osd->take_waiters(osd->waiting_for_wr_unlock[oid]); - osd->waiting_for_wr_unlock.erase(oid); - } } break; - case OSD_OP_PRIMARYLOCK: - { // lock object - bool locked = true; - t.setattr(oid, "primary-lock", &locked, sizeof(locked)); + case OSD_OP_BALANCEREADS: + { + bool bal = true; + t.setattr(oid, "balance-reads", &bal, sizeof(bal)); } - break; - - case OSD_OP_PRIMARYUNLOCK: - { // unlock object - t.rmattr(oid, "primary-lock"); - - // kick waiters? -- only if we make replicas block ops instead of fwd to primary. - if (osd->waiting_for_primary_unlock.count(oid)) { - osd->take_waiters(osd->waiting_for_primary_unlock[oid]); - osd->waiting_for_primary_unlock.erase(oid); - } + case OSD_OP_UNBALANCEREADS: + { + t.rmattr(oid, "balance-reads"); } - break; // -- modify -- @@ -656,14 +603,12 @@ void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t, case OSD_OP_TRUNCATE: { // truncate - //r = store->truncate(oid, op->get_offset()); t.truncate(oid, op->get_length() ); } break; case OSD_OP_DELETE: { // delete - //r = store->remove(oid); t.remove(oid); } break; @@ -732,6 +677,39 @@ void ReplicatedPG::apply_repop(RepGather *repop) repop->op->get_data().clear(); repop->applied = true; + + + // any completion stuff to do here? + object_t oid = repop->op->get_oid(); + + switch (repop->op->get_op()) { + case OSD_OP_UNBALANCEREADS: + dout(-10) << "apply_repop completed unbalance-reads on " << oid << dendl; + unbalancing_reads.erase(oid); + if (waiting_for_unbalanced_reads.count(oid)) { + osd->take_waiters(waiting_for_unbalanced_reads[oid]); + waiting_for_unbalanced_reads.erase(oid); + } + break; + + case OSD_OP_BALANCEREADS: + dout(-10) << "apply_repop completed balance-reads on " << oid << dendl; + if (waiting_for_balanced_reads.count(oid)) { + osd->take_waiters(waiting_for_balanced_reads[oid]); + waiting_for_balanced_reads.erase(oid); + } + break; + + case OSD_OP_WRUNLOCK: + dout(-10) << "apply_repop completed wrunlock on " << oid << dendl; + if (waiting_for_wr_unlock.count(oid)) { + osd->take_waiters(waiting_for_wr_unlock[oid]); + waiting_for_wr_unlock.erase(oid); + } + break; + } + + } void ReplicatedPG::put_rep_gather(RepGather *repop) @@ -742,9 +720,11 @@ void ReplicatedPG::put_rep_gather(RepGather *repop) if (repop->can_send_commit() && repop->op->wants_commit()) { // send commit. - MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), true); - dout(10) << "put_repop sending commit on " << *repop << " " << reply << dendl; - osd->messenger->send_message(reply, repop->op->get_client_inst()); + if (repop->op->wants_reply()) { + MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), true); + dout(10) << "put_repop sending commit on " << *repop << " " << reply << dendl; + osd->messenger->send_message(reply, repop->op->get_client_inst()); + } repop->sent_commit = true; } @@ -755,9 +735,11 @@ void ReplicatedPG::put_rep_gather(RepGather *repop) apply_repop(repop); // send ack - MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), false); - dout(10) << "put_repop sending ack on " << *repop << " " << reply << dendl; - osd->messenger->send_message(reply, repop->op->get_client_inst()); + if (repop->op->wants_reply()) { + MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), false); + dout(10) << "put_repop sending ack on " << *repop << " " << reply << dendl; + osd->messenger->send_message(reply, repop->op->get_client_inst()); + } repop->sent_ack = true; utime_t now = g_clock.now(); @@ -1049,12 +1031,38 @@ void ReplicatedPG::op_modify(MOSDOp *op) object_t oid = op->get_oid(); const char *opname = MOSDOp::get_opname(op->get_op()); - // locked by someone else? - // for _any_ op type -- eg only the locker can unlock! + // --- locking --- + + // wrlock? if (op->get_op() != OSD_OP_WRNOOP && // except WRNOOP; we just want to flush - osd->block_if_wrlocked(op)) + block_if_wrlocked(op)) return; // op will be handled later, after the object unlocks + // balance-reads set? + char v; + if (osd->store->getattr(op->get_oid(), "balance-reads", &v, 1) >= 0 || + balancing_reads.count(op->get_oid())) { + + if (!unbalancing_reads.count(op->get_oid())) { + // unbalance + dout(-10) << "preprocess_op unbalancing-reads on " << op->get_oid() << dendl; + unbalancing_reads.insert(op->get_oid()); + + MOSDOp *pop = new MOSDOp(osd->messenger->get_myinst(), 0, osd->get_tid(), + op->get_oid(), + ObjectLayout(info.pgid), + osd->osdmap->get_epoch(), + OSD_OP_UNBALANCEREADS); + do_op(pop); + } + + // add to wait queue + dout(-10) << "preprocess_op waiting for unbalance-reads on " << op->get_oid() << dendl; + waiting_for_unbalanced_reads[op->get_oid()].push_back(op); + return; + } + + // share latest osd map with rest of pg? osd->osd_lock.Lock(); { @@ -1571,10 +1579,6 @@ void ReplicatedPG::on_role_change() it++) osd->take_waiters(it->second); waiting_for_missing_object.clear(); - - // clear object replica list? - if (get_role() < 0) - replicated_objects.clear(); // hmm, should i be less sloppy about this? FIXME. } diff --git a/branches/sage/pgs/osd/ReplicatedPG.h b/branches/sage/pgs/osd/ReplicatedPG.h index 49529e7a49914..703fb2d7f2bae 100644 --- a/branches/sage/pgs/osd/ReplicatedPG.h +++ b/branches/sage/pgs/osd/ReplicatedPG.h @@ -73,7 +73,10 @@ protected: map > waiting_for_repop; // load balancing - set replicated_objects; + set balancing_reads; + set unbalancing_reads; + hash_map > waiting_for_balanced_reads; + hash_map > waiting_for_unbalanced_reads; // i.e. primary-lock void get_rep_gather(RepGather*); void apply_repop(RepGather *repop); @@ -117,8 +120,7 @@ protected: void clean_replicas(); - void op_stat(MOSDOp *op); - int op_read(MOSDOp *op); + void op_read(MOSDOp *op); void op_modify(MOSDOp *op); void op_rep_modify(MOSDOp *op); void op_push(MOSDOp *op); -- 2.39.5