// -- load balance reads --
- if (g_conf.osd_balance_reads) {
-
- // replicate/unreplicate?
- if (!is_acker()) {
- // -- replica --
- if (op->get_op() == OSD_OP_REPLICATE) {
- dout(-10) << "preprocess_op replicating " << op->get_oid() << endl;
- replicated_objects.insert(op->get_oid());
- delete op;
- return true;
- }
- if (op->get_op() == OSD_OP_UNREPLICATE) {
- dout(-10) << "preprocess_op un-replicating " << op->get_oid() << endl;
- replicated_objects.erase(op->get_oid());
- delete op;
+ if (g_conf.osd_balance_reads &&
+ is_primary() &&
+ g_conf.osd_rep == OSD_REP_PRIMARY) {
+ // -- read on primary+acker ---
+
+ // test
+ if (false) {
+ if (acting.size() > 1) {
+ int peer = acting[1];
+ dout(-10) << "preprocess_op fwd client read op to osd" << peer << " for " << op->get_client() << " " << op->get_client_inst() << dendl;
+ osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
return true;
}
+ }
- if (!op->get_source().is_osd()) {
- // -- read on replica --
- if (!osd->store->exists(op->get_oid())) {
- // fwd to primary
- dout(-10) << "preprocess_op got read on replica, object dne, fwd to primary" << endl;
- osd->messenger->send_message(op, osd->osdmap->get_inst(get_primary()));
- return true;
- }
-
- // primary lock?
- // FIXME: this may cause a (blocking) stat+disk seek.
- char v;
- if (osd->store->getattr(op->get_oid(), "primary-lock", &v, 1) >= 0) {
- dout(-10) << "preprocess_op primary-lock on " << op->get_oid() << " fwd to primary" << endl;
- osd->messenger->send_message(op, osd->osdmap->get_inst(get_primary()));
- return true;
- }
-
- // in replicate list?
- if (replicated_objects.count(op->get_oid())) {
- // yes. continue.
- // note that we've already failed the fastpath above.
- dout(-10) << "preprocess_op got read on replica, object replicated, processing/queuing as usual" << endl;
- } else {
- // no. forward to primary.
- dout(-10) << "preprocess_op got read on replica, object not replicated, fwd to primary" << endl;
- osd->messenger->send_message(op, osd->osdmap->get_inst(get_primary()));
- return true;
- }
+ // -- flash crowd?
+ if (!op->get_source().is_osd()) {
+ // candidate?
+ bool is_flash_crowd_candidate = osd->iat_averager.is_flash_crowd_candidate( op->get_oid() );
+ bool is_balanced = false;
+ bool b;
+ if (osd->store->getattr(op->get_oid(), "balance-reads", &b, 1) >= 0)
+ is_balanced = true;
+
+ if (!is_balanced && is_flash_crowd_candidate &&
+ balancing_reads.count(op->get_oid()) == 0) {
+ dout(-10) << "preprocess_op balance-reads on " << op->get_oid() << dendl;
+ balancing_reads.insert(op->get_oid());
+ MOSDOp *pop = new MOSDOp(osd->messenger->get_myinst(), 0, osd->get_tid(),
+ op->get_oid(),
+ ObjectLayout(info.pgid),
+ osd->osdmap->get_epoch(),
+ OSD_OP_BALANCEREADS);
+ do_op(pop);
}
- }
-
- if (is_acker()) {
- // -- read on acker ---
-
- // test
- if (false) {
- if (acting.size() > 1) {
- int peer = acting[1];
- dout(-10) << "preprocess_op fwd client read op to osd" << peer << " for " << op->get_client() << " " << op->get_client_inst() << dendl;
- osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
- return true;
- }
+ if (is_balanced && !is_flash_crowd_candidate &&
+ !unbalancing_reads.count(op->get_oid()) == 0) {
+ dout(-10) << "preprocess_op unbalance-reads on " << op->get_oid() << dendl;
+ unbalancing_reads.insert(op->get_oid());
+ MOSDOp *pop = new MOSDOp(osd->messenger->get_myinst(), 0, osd->get_tid(),
+ op->get_oid(),
+ ObjectLayout(info.pgid),
+ osd->osdmap->get_epoch(),
+ OSD_OP_UNBALANCEREADS);
+ do_op(pop);
}
+ }
+
+
+ // check my load.
+ // TODO xxx we must also compare with our own load
+ // if i am x percentage higher than replica ,
+ // redirect the read
+
+ if ( g_conf.osd_balance_reads == LOAD_LATENCY) {
+ double mean_read_time = osd->load_calc.get_average();
- // -- flash crowd?
- if (!op->get_source().is_osd()) {
- // candidate?
- bool is_flash_crowd_candidate = osd->iat_averager.is_flash_crowd_candidate( op->get_oid() );
- bool is_replicated = replicated_objects.count( op->get_oid() );
+ if ( mean_read_time != -1 ) {
- if (!is_replicated && is_flash_crowd_candidate) {
- // replicate
- dout(-10) << "preprocess_op replicating " << op->get_oid() << endl;
- replicated_objects.insert(op->get_oid());
- for (unsigned i=1; i<acting.size(); ++i) {
- osd->messenger->send_message(new MOSDOp(osd->messenger->get_myinst(), 0, 0,
- op->get_oid(), ObjectLayout(info.pgid),
- osd->osdmap->get_epoch(),
- OSD_OP_REPLICATE),
- osd->osdmap->get_inst(acting[i]));
- }
- }
- if (is_replicated && !is_flash_crowd_candidate) {
- // unreplicate
- dout(-10) << "preprocess_op unreplicating " << op->get_oid() << endl;
- replicated_objects.erase(op->get_oid());
- for (unsigned i=1; i<acting.size(); ++i) {
- osd->messenger->send_message(new MOSDOp(osd->messenger->get_myinst(), 0, 0,
- op->get_oid(), ObjectLayout(info.pgid),
- osd->osdmap->get_epoch(),
- OSD_OP_REPLICATE),
- osd->osdmap->get_inst(acting[i]));
+ for (unsigned i=1;
+ i<acting.size();
+ ++i) {
+ int peer = acting[i];
+
+ dout(10) << "my read time " << mean_read_time
+ << "peer_readtime" << osd->peer_read_time[peer]
+ << " of peer" << peer << dendl;
+
+ if ( osd->peer_read_time.count(peer) &&
+ ( (osd->peer_read_time[peer]*100/mean_read_time) <
+ (100 - g_conf.osd_load_diff_percent))) {
+ dout(10) << " forwarding to peer osd" << peer << dendl;
+
+ osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
+ return true;
}
}
}
+ }
+ else if ( g_conf.osd_balance_reads == LOAD_QUEUE_SIZE ) {
+ // am i above my average?
+ float my_avg = osd->hb_stat_qlen / osd->hb_stat_ops;
-
- // check my load.
- // TODO xxx we must also compare with our own load
- // if i am x percentage higher than replica ,
- // redirect the read
-
- if ( g_conf.osd_balance_reads == LOAD_LATENCY) {
- double mean_read_time = osd->load_calc.get_average();
-
- if ( mean_read_time != -1 ) {
-
- for (unsigned i=1;
- i<acting.size();
- ++i) {
- int peer = acting[i];
-
- dout(10) << "my read time " << mean_read_time
- << "peer_readtime" << osd->peer_read_time[peer]
- << " of peer" << peer << endl;
+ if (osd->pending_ops > my_avg) {
+ // is there a peer who is below my average?
+ for (unsigned i=1; i<acting.size(); ++i) {
+ int peer = acting[i];
+ if (osd->peer_qlen.count(peer) &&
+ osd->peer_qlen[peer] < my_avg) {
+ // calculate a probability that we should redirect
+ float p = (my_avg - osd->peer_qlen[peer]) / my_avg; // this is dumb.
- if ( osd->peer_read_time.count(peer) &&
- ( (osd->peer_read_time[peer]*100/mean_read_time) <
- (100 - g_conf.osd_load_diff_percent))) {
- dout(10) << " forwarding to peer osd" << peer << endl;
-
+ if (drand48() <= p) {
+ // take the first one
+ dout(10) << "my qlen " << osd->pending_ops << " > my_avg " << my_avg
+ << ", p=" << p
+ << ", fwd to peer w/ qlen " << osd->peer_qlen[peer]
+ << " osd" << peer
+ << dendl;
osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
return true;
}
}
}
}
- else if ( g_conf.osd_balance_reads == LOAD_QUEUE_SIZE ) {
- // am i above my average?
- float my_avg = osd->hb_stat_qlen / osd->hb_stat_ops;
-
- if (osd->pending_ops > my_avg) {
- // is there a peer who is below my average?
- for (unsigned i=1; i<acting.size(); ++i) {
- int peer = acting[i];
- if (osd->peer_qlen.count(peer) &&
- osd->peer_qlen[peer] < my_avg) {
- // calculate a probability that we should redirect
- float p = (my_avg - osd->peer_qlen[peer]) / my_avg; // this is dumb.
-
- if (drand48() <= p) {
- // take the first one
- dout(10) << "my qlen " << osd->pending_ops << " > my_avg " << my_avg
- << ", p=" << p
- << ", fwd to peer w/ qlen " << osd->peer_qlen[peer]
- << " osd" << peer
- << dendl;
- osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
- return true;
- }
- }
- }
- }
- }
+ }
+
+ else if ( g_conf.osd_balance_reads == LOAD_HYBRID ) {
+ // am i above my average?
+ float my_avg = osd->hb_stat_qlen / osd->hb_stat_ops;
- else if ( g_conf.osd_balance_reads == LOAD_HYBRID ) {
- // am i above my average?
- float my_avg = osd->hb_stat_qlen / osd->hb_stat_ops;
-
- if (osd->pending_ops > my_avg) {
- // is there a peer who is below my average?
- for (unsigned i=1; i<acting.size(); ++i) {
- int peer = acting[i];
- if (osd->peer_qlen.count(peer) &&
- osd->peer_qlen[peer] < my_avg) {
- // calculate a probability that we should redirect
- //float p = (my_avg - peer_qlen[peer]) / my_avg; // this is dumb.
-
- double mean_read_time = osd->load_calc.get_average();
-
- if ( mean_read_time != -1 &&
- osd->peer_read_time.count(peer) &&
- ( (osd->peer_read_time[peer]*100/mean_read_time) <
- ( 100 - g_conf.osd_load_diff_percent) ) )
- //if (drand48() <= p) {
- // take the first one
- dout(10) << "using hybrid :my qlen " << osd->pending_ops << " > my_avg " << my_avg
- << "my read time "<< mean_read_time
- << "peer read time " << osd->peer_read_time[peer]
- << ", fwd to peer w/ qlen " << osd->peer_qlen[peer]
- << " osd" << peer
- << endl;
- osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
- return true;
- }
+ if (osd->pending_ops > my_avg) {
+ // is there a peer who is below my average?
+ for (unsigned i=1; i<acting.size(); ++i) {
+ int peer = acting[i];
+ if (osd->peer_qlen.count(peer) &&
+ osd->peer_qlen[peer] < my_avg) {
+ // calculate a probability that we should redirect
+ //float p = (my_avg - peer_qlen[peer]) / my_avg; // this is dumb.
+
+ double mean_read_time = osd->load_calc.get_average();
+
+ if ( mean_read_time != -1 &&
+ osd->peer_read_time.count(peer) &&
+ ( (osd->peer_read_time[peer]*100/mean_read_time) <
+ ( 100 - g_conf.osd_load_diff_percent) ) )
+ //if (drand48() <= p) {
+ // take the first one
+ dout(10) << "using hybrid :my qlen " << osd->pending_ops << " > my_avg " << my_avg
+ << "my read time "<< mean_read_time
+ << "peer read time " << osd->peer_read_time[peer]
+ << ", fwd to peer w/ qlen " << osd->peer_qlen[peer]
+ << " osd" << peer
+ << dendl;
+ osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
+ return true;
}
}
}
if (osd->store->is_cached( op->get_oid() ,
op->get_offset(),
op->get_length() ) == 0) {
+ if (!is_primary()) {
+ // am i allowed?
+ bool v;
+ if (osd->store->getattr(op->get_oid(), "balance-reads", &v, 1) < 0) {
+ dout(10) << "preprocess_op in-cache but no balance-reads on " << op->get_oid()
+ << ", fwd to primary" << dendl;
+ osd->messenger->send_message(op, osd->osdmap->get_inst(get_primary()));
+ return true;
+ }
+ }
+
// do it now
- dout(-10) << "preprocess_op data is in cache, reading from cache" << *op << endl;
+ dout(-10) << "preprocess_op data is in cache, reading from cache" << *op << dendl;
do_op(op);
return true;
}
// reads
case OSD_OP_READ:
- if (osd->block_if_wrlocked(op))
- return;
- op_read(op);
- break;
case OSD_OP_STAT:
- if (osd->block_if_wrlocked(op))
- return;
- op_stat(op);
+ op_read(op);
break;
// rep stuff
// ========================================================================
// READS
-int ReplicatedPG::op_read(MOSDOp *op)
+void ReplicatedPG::op_read(MOSDOp *op)
{
object_t oid = op->get_oid();
- dout(10) << "op_read " << oid
+ dout(10) << "op_read " << MOSDOp::get_opname(op->get_op())
+ << " " << oid
<< " " << op->get_offset() << "~" << op->get_length()
- //<< " in " << *pg
<< dendl;
+
+ // wrlocked?
+ if (block_if_wrlocked(op))
+ return;
+
+ // !primary and unbalanced?
+ if (!is_primary()) {
+ // make sure i exist and am balanced, otherwise fw back to acker.
+ bool b;
+ if (!osd->store->exists(oid) ||
+ osd->store->getattr(oid, "balance-reads", &b, 1) < 0) {
+ dout(-10) << "read on replica, object " << oid
+ << " dne or no balance-reads, fw back to primary" << dendl;
+ osd->messenger->send_message(op, osd->osdmap->get_inst(get_acker()));
+ return;
+ }
+ }
+
+ // set up reply
+ MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true);
long r = 0;
- bufferlist bl;
+ // do it.
if (oid.rev && !pick_object_rev(oid)) {
// we have no revision for this request.
r = -EEXIST;
} else {
- // read into a buffer
- r = osd->store->read(oid,
- op->get_offset(), op->get_length(),
- bl);
+ switch (op->get_op()) {
+ case OSD_OP_READ:
+ {
+ // read into a buffer
+ bufferlist bl;
+ r = osd->store->read(oid,
+ op->get_offset(), op->get_length(),
+ bl);
+ reply->set_data(bl);
+ reply->set_length(r);
+ dout(15) << " read got " << r << " / " << op->get_length() << " bytes from obj " << oid << dendl;
+ }
+ break;
+
+ case OSD_OP_STAT:
+ {
+ struct stat st;
+ memset(&st, sizeof(st), 0);
+ r = osd->store->stat(oid, &st);
+ if (r >= 0)
+ reply->set_object_size(st.st_size);
+ }
+ break;
+
+ default:
+ assert(0);
+ }
}
- // set up reply
- MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true);
if (r >= 0) {
reply->set_result(0);
- reply->set_data(bl);
- reply->set_length(r);
dout(10) << "READ TIME DIFF"
<< (double)g_clock.now()-op->get_received_time()
- << endl;
+ << dendl;
osd->load_calc.add((double)g_clock.now() - op->get_received_time());
} else {
reply->set_result(r); // error
- reply->set_length(0);
}
- dout(10) << " read got " << r << " / " << op->get_length() << " bytes from obj " << oid << dendl;
-
+
// send it
osd->messenger->send_message(reply, op->get_client_inst());
delete op;
-
- return r;
}
-void ReplicatedPG::op_stat(MOSDOp *op)
-{
- object_t oid = op->get_oid();
-
- struct stat st;
- memset(&st, sizeof(st), 0);
- int r = 0;
-
- if (oid.rev && !pick_object_rev(oid)) {
- // we have no revision for this request.
- r = -EEXIST;
- } else {
- r = osd->store->stat(oid, &st);
- }
-
- dout(3) << "op_stat on " << oid
- << " r = " << r
- << " size = " << st.st_size
- //<< " in " << *pg
- << dendl;
-
- MOSDOpReply *reply = new MOSDOpReply(op, r, osd->osdmap->get_epoch(), true);
- reply->set_object_size(st.st_size);
- osd->messenger->send_message(reply, op->get_client_inst());
-
- delete op;
-}
case OSD_OP_WRLOCK:
{ // lock object
- //r = store->setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t), oncommit);
t.setattr(oid, "wrlock", &op->get_client(), sizeof(entity_name_t));
}
break;
-
case OSD_OP_WRUNLOCK:
{ // unlock objects
- //r = store->rmattr(oid, "wrlock", oncommit);
t.rmattr(oid, "wrlock");
-
- // unblock all operations that were waiting for this object to become unlocked
- if (osd->waiting_for_wr_unlock.count(oid)) {
- osd->take_waiters(osd->waiting_for_wr_unlock[oid]);
- osd->waiting_for_wr_unlock.erase(oid);
- }
}
break;
- case OSD_OP_PRIMARYLOCK:
- { // lock object
- bool locked = true;
- t.setattr(oid, "primary-lock", &locked, sizeof(locked));
+ case OSD_OP_BALANCEREADS:
+ {
+ bool bal = true;
+ t.setattr(oid, "balance-reads", &bal, sizeof(bal));
}
- break;
-
- case OSD_OP_PRIMARYUNLOCK:
- { // unlock object
- t.rmattr(oid, "primary-lock");
-
- // kick waiters? -- only if we make replicas block ops instead of fwd to primary.
- if (osd->waiting_for_primary_unlock.count(oid)) {
- osd->take_waiters(osd->waiting_for_primary_unlock[oid]);
- osd->waiting_for_primary_unlock.erase(oid);
- }
+ case OSD_OP_UNBALANCEREADS:
+ {
+ t.rmattr(oid, "balance-reads");
}
- break;
// -- modify --
case OSD_OP_TRUNCATE:
{ // truncate
- //r = store->truncate(oid, op->get_offset());
t.truncate(oid, op->get_length() );
}
break;
case OSD_OP_DELETE:
{ // delete
- //r = store->remove(oid);
t.remove(oid);
}
break;
repop->op->get_data().clear();
repop->applied = true;
+
+
+ // any completion stuff to do here?
+ object_t oid = repop->op->get_oid();
+
+ switch (repop->op->get_op()) {
+ case OSD_OP_UNBALANCEREADS:
+ dout(-10) << "apply_repop completed unbalance-reads on " << oid << dendl;
+ unbalancing_reads.erase(oid);
+ if (waiting_for_unbalanced_reads.count(oid)) {
+ osd->take_waiters(waiting_for_unbalanced_reads[oid]);
+ waiting_for_unbalanced_reads.erase(oid);
+ }
+ break;
+
+ case OSD_OP_BALANCEREADS:
+ dout(-10) << "apply_repop completed balance-reads on " << oid << dendl;
+ if (waiting_for_balanced_reads.count(oid)) {
+ osd->take_waiters(waiting_for_balanced_reads[oid]);
+ waiting_for_balanced_reads.erase(oid);
+ }
+ break;
+
+ case OSD_OP_WRUNLOCK:
+ dout(-10) << "apply_repop completed wrunlock on " << oid << dendl;
+ if (waiting_for_wr_unlock.count(oid)) {
+ osd->take_waiters(waiting_for_wr_unlock[oid]);
+ waiting_for_wr_unlock.erase(oid);
+ }
+ break;
+ }
+
+
}
void ReplicatedPG::put_rep_gather(RepGather *repop)
if (repop->can_send_commit() &&
repop->op->wants_commit()) {
// send commit.
- MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), true);
- dout(10) << "put_repop sending commit on " << *repop << " " << reply << dendl;
- osd->messenger->send_message(reply, repop->op->get_client_inst());
+ if (repop->op->wants_reply()) {
+ MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), true);
+ dout(10) << "put_repop sending commit on " << *repop << " " << reply << dendl;
+ osd->messenger->send_message(reply, repop->op->get_client_inst());
+ }
repop->sent_commit = true;
}
apply_repop(repop);
// send ack
- MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), false);
- dout(10) << "put_repop sending ack on " << *repop << " " << reply << dendl;
- osd->messenger->send_message(reply, repop->op->get_client_inst());
+ if (repop->op->wants_reply()) {
+ MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), false);
+ dout(10) << "put_repop sending ack on " << *repop << " " << reply << dendl;
+ osd->messenger->send_message(reply, repop->op->get_client_inst());
+ }
repop->sent_ack = true;
utime_t now = g_clock.now();
object_t oid = op->get_oid();
const char *opname = MOSDOp::get_opname(op->get_op());
- // locked by someone else?
- // for _any_ op type -- eg only the locker can unlock!
+ // --- locking ---
+
+ // wrlock?
if (op->get_op() != OSD_OP_WRNOOP && // except WRNOOP; we just want to flush
- osd->block_if_wrlocked(op))
+ block_if_wrlocked(op))
return; // op will be handled later, after the object unlocks
+ // balance-reads set?
+ char v;
+ if (osd->store->getattr(op->get_oid(), "balance-reads", &v, 1) >= 0 ||
+ balancing_reads.count(op->get_oid())) {
+
+ if (!unbalancing_reads.count(op->get_oid())) {
+ // unbalance
+ dout(-10) << "preprocess_op unbalancing-reads on " << op->get_oid() << dendl;
+ unbalancing_reads.insert(op->get_oid());
+
+ MOSDOp *pop = new MOSDOp(osd->messenger->get_myinst(), 0, osd->get_tid(),
+ op->get_oid(),
+ ObjectLayout(info.pgid),
+ osd->osdmap->get_epoch(),
+ OSD_OP_UNBALANCEREADS);
+ do_op(pop);
+ }
+
+ // add to wait queue
+ dout(-10) << "preprocess_op waiting for unbalance-reads on " << op->get_oid() << dendl;
+ waiting_for_unbalanced_reads[op->get_oid()].push_back(op);
+ return;
+ }
+
+
// share latest osd map with rest of pg?
osd->osd_lock.Lock();
{
it++)
osd->take_waiters(it->second);
waiting_for_missing_object.clear();
-
- // clear object replica list?
- if (get_role() < 0)
- replicated_objects.clear(); // hmm, should i be less sloppy about this? FIXME.
}