// ==========================================================
-
-/** preprocess_op - preprocess an op (before it gets queued).
- * fasttrack read
- */
-bool ReplicatedPG::preprocess_op(MOSDOp *op, utime_t now)
-{
-#if 0
- // we only care about reads here on out..
- if (op->may_write() ||
- op->ops.size() < 1)
- return false;
-
- ceph_osd_op& readop = op->ops[0];
-
- object_t oid = op->get_oid();
- sobject_t soid(oid, op->get_snapid());
-
- // -- load balance reads --
- if (is_primary()) {
- // -- read on primary+acker ---
-
- // test
- if (false) {
- if (acting.size() > 1) {
- int peer = acting[1];
- dout(-10) << "preprocess_op fwd client read op to osd" << peer
- << " for " << op->get_orig_source() << " " << op->get_orig_source_inst() << dendl;
- osd->messenger->forward_message(op, osd->osdmap->get_inst(peer));
- return true;
- }
- }
-
-#if 0
- // -- balance reads?
- if (g_conf.osd_balance_reads &&
- !op->get_source().is_osd()) {
- // flash crowd?
- bool is_flash_crowd_candidate = false;
- if (g_conf.osd_flash_crowd_iat_threshold > 0) {
- osd->iat_averager.add_sample( oid, (double)g_clock.now() );
- is_flash_crowd_candidate = osd->iat_averager.is_flash_crowd_candidate( oid );
- }
-
- // hot?
- double temp = 0;
- if (stat_object_temp_rd.count(soid))
- temp = stat_object_temp_rd[soid].get(op->get_recv_stamp());
- bool is_hotly_read = temp > g_conf.osd_balance_reads_temp;
-
- dout(20) << "balance_reads oid " << oid << " temp " << temp
- << (is_hotly_read ? " hotly_read":"")
- << (is_flash_crowd_candidate ? " flash_crowd_candidate":"")
- << dendl;
-
- bool should_balance = is_flash_crowd_candidate || is_hotly_read;
- bool is_balanced = false;
- bool b;
- // *** FIXME *** this may block, and we're in the fast path! ***
- if (g_conf.osd_balance_reads &&
- osd->store->getattr(info.pgid.to_coll(), soid, "balance-reads", &b, 1) >= 0)
- is_balanced = true;
-
- if (!is_balanced && should_balance &&
- balancing_reads.count(soid) == 0) {
- dout(-10) << "preprocess_op balance-reads on " << oid << dendl;
- balancing_reads.insert(soid);
- ceph_object_layout layout;
- layout.ol_pgid = info.pgid.u.pg64;
- layout.ol_stripe_unit = 0;
- MOSDOp *pop = new MOSDOp(0, osd->get_tid(),
- oid,
- layout,
- osd->osdmap->get_epoch(),
- CEPH_OSD_FLAG_MODIFY);
- pop->add_simple_op(CEPH_OSD_OP_BALANCEREADS, 0, 0);
- do_op(pop);
- }
- if (is_balanced && !should_balance &&
- !unbalancing_reads.count(soid) == 0) {
- dout(-10) << "preprocess_op unbalance-reads on " << oid << dendl;
- unbalancing_reads.insert(soid);
- ceph_object_layout layout;
- layout.ol_pgid = info.pgid.u.pg64;
- layout.ol_stripe_unit = 0;
- MOSDOp *pop = new MOSDOp(0, osd->get_tid(),
- oid,
- layout,
- osd->osdmap->get_epoch(),
- CEPH_OSD_FLAG_MODIFY);
- pop->add_simple_op(CEPH_OSD_OP_UNBALANCEREADS, 0, 0);
- do_op(pop);
- }
- }
-#endif
-
- // -- read shedding
- if (g_conf.osd_shed_reads &&
- g_conf.osd_stat_refresh_interval > 0 &&
- !op->get_source().is_osd()) { // no re-shedding!
- Mutex::Locker lock(osd->peer_stat_lock);
-
- osd->_refresh_my_stat(now);
-
- // check my load.
- // TODO xxx we must also compare with our own load
- // if i am x percentage higher than replica ,
- // redirect the read
-
- int shedto = -1;
- double bestscore = 0.0; // highest positive score wins
-
- // we calculate score values such that we can interpret them as a probability.
-
- switch (g_conf.osd_shed_reads) {
- case LOAD_LATENCY:
- // above some minimum?
- if (osd->my_stat.read_latency >= g_conf.osd_shed_reads_min_latency) {
- for (unsigned i=1; i<acting.size(); ++i) {
- int peer = acting[i];
- if (osd->peer_stat.count(peer) == 0) continue;
-
- // assume a read_latency of 0 (technically, undefined) is OK, since
- // we'll be corrected soon enough if we're wrong.
-
- double plat = osd->peer_stat[peer].read_latency_mine;
-
- double diff = osd->my_stat.read_latency - plat;
- if (diff < g_conf.osd_shed_reads_min_latency_diff) continue;
-
- double c = .002; // add in a constant to smooth it a bit
- double latratio =
- (c+osd->my_stat.read_latency) /
- (c+plat);
- double p = (latratio - 1.0) / 2.0 / latratio;
- dout(15) << "preprocess_op " << op->get_reqid()
- << " my read latency " << osd->my_stat.read_latency
- << ", peer osd" << peer << " is " << plat << " (" << osd->peer_stat[peer].read_latency << ")"
- << ", latratio " << latratio
- << ", p=" << p
- << dendl;
- if (latratio > g_conf.osd_shed_reads_min_latency_ratio &&
- p > bestscore &&
- drand48() < p) {
- shedto = peer;
- bestscore = p;
- }
- }
- }
- break;
-
- case LOAD_HYBRID:
- // dumb mostly
- if (osd->my_stat.read_latency >= g_conf.osd_shed_reads_min_latency) {
- for (unsigned i=1; i<acting.size(); ++i) {
- int peer = acting[i];
- if (osd->peer_stat.count(peer) == 0/* ||
- osd->peer_stat[peer].read_latency <= 0*/) continue;
-
- if (osd->peer_stat[peer].qlen < osd->my_stat.qlen) {
-
- if (osd->my_stat.read_latency - osd->peer_stat[peer].read_latency >
- g_conf.osd_shed_reads_min_latency_diff) continue;
-
- double qratio = osd->pending_ops / osd->peer_stat[peer].qlen;
-
- double c = .002; // add in a constant to smooth it a bit
- double latratio =
- (c+osd->my_stat.read_latency)/
- (c+osd->peer_stat[peer].read_latency);
- double p = (latratio - 1.0) / 2.0 / latratio;
-
- dout(-15) << "preprocess_op " << op->get_reqid()
- << " my qlen / rdlat "
- << osd->pending_ops << " " << osd->my_stat.read_latency
- << ", peer osd" << peer << " is "
- << osd->peer_stat[peer].qlen << " " << osd->peer_stat[peer].read_latency
- << ", qratio " << qratio
- << ", latratio " << latratio
- << ", p=" << p
- << dendl;
- if (latratio > g_conf.osd_shed_reads_min_latency_ratio &&
- p > bestscore &&
- drand48() < p) {
- shedto = peer;
- bestscore = p;
- }
- }
- }
- }
- break;
-
- /*
- case LOAD_QUEUE_SIZE:
- // am i above my average? -- dumb
- if (osd->pending_ops > osd->my_stat.qlen) {
- // yes. is there a peer who is below my average?
- for (unsigned i=1; i<acting.size(); ++i) {
- int peer = acting[i];
- if (osd->peer_stat.count(peer) == 0) continue;
- if (osd->peer_stat[peer].qlen < osd->my_stat.qlen) {
- // calculate a probability that we should redirect
- float p = (osd->my_stat.qlen - osd->peer_stat[peer].qlen) / osd->my_stat.qlen; // this is dumb.
- float v = 1.0 - p;
-
- dout(10) << "my qlen " << osd->pending_ops << " > my_avg " << osd->my_stat.qlen
- << ", peer osd" << peer << " has qlen " << osd->peer_stat[peer].qlen
- << ", p=" << p
- << ", v= "<< v
- << dendl;
-
- if (v > bestscore) {
- shedto = peer;
- bestscore = v;
- }
- }
- }
- }
- break;*/
-
- }
-
- // shed?
- if (shedto >= 0) {
- dout(10) << "preprocess_op shedding read to peer osd" << shedto
- << " " << op->get_reqid()
- << dendl;
- op->set_peer_stat(osd->my_stat);
- osd->messenger->forward_message(op, osd->osdmap->get_inst(shedto));
- osd->stat_rd_ops_shed_out++;
- osd->logger->inc(l_osd_shdout);
- return true;
- }
- }
- } // endif balance reads
-
-
- // -- fastpath read?
- // if this is a read and the data is in the cache, do an immediate read..
- if ( g_conf.osd_immediate_read_from_cache ) {
- if (osd->store->is_cached(info.pgid.to_coll(), soid,
- readop.extent.offset,
- readop.length) == 0) {
- if (!is_primary() && !op->get_source().is_osd()) {
- // am i allowed?
- bool v;
- if (osd->store->getattr(info.pgid.to_coll(), soid, "balance-reads", &v, 1) < 0) {
- dout(-10) << "preprocess_op in-cache but no balance-reads on " << oid
- << ", fwd to primary" << dendl;
- osd->messenger->forward_message(op, osd->osdmap->get_inst(get_primary()));
- return true;
- }
- }
-
- // do it now
- dout(10) << "preprocess_op data is in cache, reading from cache" << *op << dendl;
- do_op(op);
- return true;
- }
- }
-#endif
- return false;
-}
-
void ReplicatedPG::do_pg_op(MOSDOp *op)
{
- dout(0) << "do_pg_op " << *op << dendl;
+ dout(10) << "do_pg_op " << *op << dendl;
bufferlist outdata;
int result = 0;