// --- osd ---
osd_rep: OSD_REP_PRIMARY,
- osd_balance_reads: false,
- osd_immediate_read_from_cache: true, // osds to read from the cache immediately?
- osd_exclusive_caching: true, // replicas evict replicated writes
- osd_load_diff_percent: 20, // load diff for read forwarding
+ osd_balance_reads: false, // send from client to replica
osd_flash_crowd_iat_threshold: 100,
osd_flash_crowd_iat_alpha: 0.125,
+
+ osd_shed_reads: false, // forward from primary to replica
+ osd_shed_reads_min_latency: .001,
+ osd_shed_reads_min_load_diff: .2,
+
+ osd_immediate_read_from_cache: true, // osds to read from the cache immediately?
+ osd_exclusive_caching: true, // replicas evict replicated writes
osd_pg_bits: 0, // 0 == let osdmonitor decide
osd_object_layout: OBJECT_LAYOUT_HASHINO,
else if (strcmp(args[i], "--osd_balance_reads") == 0)
g_conf.osd_balance_reads = atoi(args[++i]);
- else if ( strcmp(args[i],"--osd_immediate_read_from_cache" ) == 0)
- g_conf.osd_immediate_read_from_cache = atoi(args[++i]);
- else if ( strcmp(args[i],"--osd_exclusive_caching" ) == 0)
- g_conf.osd_exclusive_caching = atoi(args[++i]);
- else if (strcmp(args[i], "--osd_load_diff_percent") == 0)
- g_conf.osd_load_diff_percent = atoi(args[++i]);
else if (strcmp(args[i], "--osd_flash_crowd_iat_threshold") == 0)
g_conf.osd_flash_crowd_iat_threshold = atoi(args[++i]);
else if (strcmp(args[i], "--osd_flash_crowd_iat_alpha") == 0)
g_conf.osd_flash_crowd_iat_alpha = atoi(args[++i]);
+ else if (strcmp(args[i], "--osd_shed_reads") == 0)
+ g_conf.osd_shed_reads = atoi(args[++i]);
+ else if (strcmp(args[i], "--osd_shed_reads_min_latency") == 0)
+ g_conf.osd_shed_reads_min_latency = atof(args[++i]);
+ else if (strcmp(args[i], "--osd_shed_reads_min_load_diff") == 0)
+ g_conf.osd_shed_reads_min_load_diff = atof(args[++i]);
+
+ else if ( strcmp(args[i],"--osd_immediate_read_from_cache" ) == 0)
+ g_conf.osd_immediate_read_from_cache = atoi(args[++i]);
+ else if ( strcmp(args[i],"--osd_exclusive_caching" ) == 0)
+ g_conf.osd_exclusive_caching = atoi(args[++i]);
+
else if (strcmp(args[i], "--osd_rep") == 0)
g_conf.osd_rep = atoi(args[++i]);
else if (strcmp(args[i], "--osd_rep_chain") == 0)
// osd
int osd_rep;
- bool osd_balance_reads;
- bool osd_immediate_read_from_cache;
- bool osd_exclusive_caching;
- int osd_load_diff_percent;
+ bool osd_balance_reads;
int osd_flash_crowd_iat_threshold; // flash crowd interarrival time threshold in ms
double osd_flash_crowd_iat_alpha;
+ bool osd_shed_reads;
+ double osd_shed_reads_min_latency;
+ double osd_shed_reads_min_load_diff; // .5 == 50%
+
+ bool osd_immediate_read_from_cache;
+ bool osd_exclusive_caching;
+
int osd_pg_bits;
int osd_object_layout;
int osd_pg_layout;
bool want_ack;
bool want_commit;
bool retry_attempt;
+
+ osd_peer_stat_t peer_stat;
} st;
bufferlist data;
return request_received_time;
}
+ void set_peer_stat(osd_peer_stat_t& stat) { st.peer_stat = stat; }
+ const osd_peer_stat_t& get_peer_stat() { return st.peer_stat; }
void set_data(bufferlist &d) {
data.claim(d);
eversion_t pg_complete_thru;
epoch_t map_epoch;
+
+ osd_peer_stat_t peer_stat;
} st;
bufferlist data;
void set_op(int op) { st.op = op; }
void set_rep_tid(tid_t t) { st.rep_tid = t; }
+ void set_peer_stat(osd_peer_stat_t& stat) { st.peer_stat = stat; }
+ const osd_peer_stat_t& get_peer_stat() { return st.peer_stat; }
+
// data payload
void set_data(bufferlist &d) {
data.claim(d);
#include "common/Clock.h"
#include "msg/Message.h"
+#include "osd/osd_types.h"
class MOSDPing : public Message {
public:
epoch_t map_epoch;
bool ack;
- float avg_qlen;
- double read_mean_time;
+ osd_peer_stat_t peer_stat;
- MOSDPing(epoch_t e,
- float aq,
- double _read_mean_time,
- bool a=false) : Message(MSG_OSD_PING), map_epoch(e), ack(a), avg_qlen(aq), read_mean_time(_read_mean_time) {
- }
+ MOSDPing(epoch_t e, osd_peer_stat_t& ps, bool a=false) :
+ Message(MSG_OSD_PING), map_epoch(e), ack(a), peer_stat(ps) { }
MOSDPing() {}
virtual void decode_payload() {
int off = 0;
::_decode(map_epoch, payload, off);
::_decode(ack, payload, off);
- ::_decode(avg_qlen, payload, off);
- ::_decode(read_mean_time, payload, off);
+ ::_decode(peer_stat, payload, off);
}
virtual void encode_payload() {
::_encode(map_epoch, payload);
::_encode(ack, payload);
- ::_encode(avg_qlen, payload);
- ::_encode(read_mean_time, payload);
+ ::_encode(peer_stat, payload);
}
virtual char *get_type_name() { return "osd_ping"; }
OSD::OSD(int id, Messenger *m, MonMap *mm, char *dev) :
timer(osd_lock),
- load_calc(g_conf.osd_max_opq<1?1:g_conf.osd_max_opq),
+ stat_oprate(5.0),
+ read_latency_calc(g_conf.osd_max_opq<1?1:g_conf.osd_max_opq),
+ qlen_calc(3),
iat_averager(g_conf.osd_flash_crowd_iat_alpha)
{
whoami = id;
state = STATE_BOOTING;
- hb_stat_ops = 0;
- hb_stat_qlen = 0;
+ stat_ops = 0;
+ stat_qlen = 0;
pending_ops = 0;
waiting_for_no_ops = false;
// -------------------------------------
+void OSD::_refresh_my_stat(utime_t now)
+{
+ assert(peer_stat_lock.is_locked());
+
+ // refresh?
+ if (now - my_stat.stamp > .5) {
+ my_stat.stamp = now;
+
+ my_stat.oprate = stat_oprate.get(now);
+
+ // qlen
+ //double qlen = 0;
+ //if (stat_ops) qlen =
+ //qlen_calc.add(qlen);
+ my_stat.qlen = (float)stat_qlen / (float)stat_ops; //get_average();
+ stat_ops = 0;
+ stat_qlen = 0;
+
+ my_stat.read_latency = read_latency_calc.get_average();
+ if (my_stat.read_latency < 0) my_stat.read_latency = 0;
+
+ dout(-10) << "_refresh_my_stat " << my_stat << dendl;
+ }
+}
+
void OSD::heartbeat()
{
utime_t now = g_clock.now();
since.sec_ref() -= g_conf.osd_heartbeat_interval;
// calc my stats
- float avg_qlen = 0;
- if (hb_stat_ops) avg_qlen = (float)hb_stat_qlen / (float)hb_stat_ops;
+ peer_stat_lock.Lock();
+ _refresh_my_stat(now);
+ dout(-5) << "heartbeat: " << my_stat << dendl;
+ peer_stat_lock.Unlock();
- double read_mean_time = load_calc.get_average();
-
- dout(5) << "heartbeat " << now
- << ": ops " << hb_stat_ops
- << ", avg qlen " << avg_qlen
- << ", mean read time " << read_mean_time
- << dendl;
+ //load_calc.set_size(stat_ops);
- // reset until next time around
- hb_stat_ops = 0;
- hb_stat_qlen = 0;
-
// send pings
set<int> pingset;
for (hash_map<pg_t, PG*>::iterator i = pg_map.begin();
i != pingset.end();
i++) {
_share_map_outgoing( osdmap->get_inst(*i) );
- messenger->send_message(new MOSDPing(osdmap->get_epoch(),
- avg_qlen,
- read_mean_time ),
- osdmap->get_inst(*i));
+ messenger->send_message(new MOSDPing(osdmap->get_epoch(), my_stat),
+ osdmap->get_inst(*i));
}
if (logger) logger->set("pingset", pingset.size());
_share_map_incoming(m->get_source_inst(), ((MOSDPing*)m)->map_epoch);
int from = m->get_source().num();
- peer_qlen[from] = m->avg_qlen;
- peer_read_time[from] = m->read_mean_time;
+ peer_stat_lock.Lock();
+ peer_stat[from] = m->peer_stat;
+ peer_stat_lock.Unlock();
delete m;
}
// mark the read request received time for finding the
// read througput load.
- op->set_received_time(g_clock.now());
+ utime_t now = g_clock.now();
+ op->set_received_time(now);
// update qlen stats
- hb_stat_ops++;
- hb_stat_qlen += pending_ops;
+ stat_oprate.hit(now);
+ stat_ops++;
+ stat_qlen += pending_ops;
// require same or newer map
if (!require_same_or_newer_map(op, op->get_map_epoch())) {
#include "ObjectStore.h"
#include "PG.h"
+#include "common/DecayCounter.h"
+
#include <map>
using namespace std;
static const int STATE_STOPPING = 3;
+
+ /** OSD **/
+protected:
+ Mutex osd_lock; // global lock
+ SafeTimer timer; // safe timer
+
+ Messenger *messenger;
+ Logger *logger;
+ ObjectStore *store;
+ MonMap *monmap;
+
+ int whoami;
+ char dev_path[100];
+
+public:
+ int get_nodeid() { return whoami; }
+
+private:
+ /** superblock **/
+ OSDSuperblock superblock;
+ epoch_t boot_epoch;
+
+ object_t get_osdmap_object_name(epoch_t epoch) { return object_t(0,epoch << 1); }
+ object_t get_inc_osdmap_object_name(epoch_t epoch) { return object_t(0, (epoch << 1) + 1); }
+
+ void write_superblock();
+ void write_superblock(ObjectStore::Transaction& t);
+ int read_superblock();
+
+
+ // -- state --
+ int state;
+
+public:
+ bool is_booting() { return state == STATE_BOOTING; }
+ bool is_active() { return state == STATE_ACTIVE; }
+ bool is_stopping() { return state == STATE_STOPPING; }
+
+private:
+
+ // heartbeat
+ void heartbeat();
+
+ class C_Heartbeat : public Context {
+ OSD *osd;
+ public:
+ C_Heartbeat(OSD *o) : osd(o) {}
+ void finish(int r) {
+ osd->heartbeat();
+ }
+ };
+
+
+ // -- stats --
+ DecayCounter stat_oprate;
+ int stat_ops; // ops since last heartbeat
+ int stat_qlen; // cumulative queue length since last refresh
+
+ Mutex peer_stat_lock;
+ osd_peer_stat_t my_stat;
+ hash_map<int, osd_peer_stat_t> peer_stat;
+
+ void _refresh_my_stat(utime_t now);
+
// load calculation
//current implementation is moving averges.
- class LoadCalculator {
+ class MovingAverager {
private:
Mutex lock;
- deque<double> m_Data ;
- unsigned m_Size ;
- double m_Total ;
+ deque<double> m_Data;
+ unsigned m_Size;
+ double m_Total;
public:
- LoadCalculator( unsigned size ) : m_Size(0), m_Total(0) { }
+ MovingAverager(unsigned size) : m_Size(size), m_Total(0) { }
+
+ void set_size(unsigned size) {
+ m_Size = size;
+ }
- void add( double element ) {
+ void add(double value) {
Mutex::Locker locker(lock);
// add item
- m_Data.push_back(element);
- m_Total += element;
+ m_Data.push_back(value);
+ m_Total += value;
// trim
while (m_Data.size() > m_Size) {
double get_average() {
Mutex::Locker locker(lock);
-
- if (m_Data.empty())
- return -1;
+ if (m_Data.empty()) return -1;
return m_Total / (double)m_Data.size();
}
- };
+ } read_latency_calc, qlen_calc;
class IATAverager {
public:
}
};
-
- /** OSD **/
-protected:
- Mutex osd_lock; // global lock
- SafeTimer timer; // safe timer
-
- Messenger *messenger;
- Logger *logger;
- ObjectStore *store;
- MonMap *monmap;
-
- LoadCalculator load_calc;
IATAverager iat_averager;
-
- int whoami;
- char dev_path[100];
-
-public:
- int get_nodeid() { return whoami; }
-
-private:
- /** superblock **/
- OSDSuperblock superblock;
- epoch_t boot_epoch;
-
- object_t get_osdmap_object_name(epoch_t epoch) { return object_t(0,epoch << 1); }
- object_t get_inc_osdmap_object_name(epoch_t epoch) { return object_t(0, (epoch << 1) + 1); }
-
- void write_superblock();
- void write_superblock(ObjectStore::Transaction& t);
- int read_superblock();
-
-
- // -- state --
- int state;
-
-public:
- bool is_booting() { return state == STATE_BOOTING; }
- bool is_active() { return state == STATE_ACTIVE; }
- bool is_stopping() { return state == STATE_STOPPING; }
-
-private:
-
- // heartbeat
- void heartbeat();
-
- class C_Heartbeat : public Context {
- OSD *osd;
- public:
- C_Heartbeat(OSD *o) : osd(o) {}
- void finish(int r) {
- osd->heartbeat();
- }
- };
-
-
- // -- stats --
- int hb_stat_ops; // ops since last heartbeat
- int hb_stat_qlen; // cumulative queue length since last hb
-
- hash_map<int, float> peer_qlen;
- hash_map<int, double> peer_read_time;
-
-
+
// -- waiters --
list<class Message*> finished;
// -- load balance reads --
- if (g_conf.osd_balance_reads &&
+ if (g_conf.osd_shed_reads &&
is_primary() &&
g_conf.osd_rep == OSD_REP_PRIMARY) {
// -- read on primary+acker ---
}
// -- flash crowd?
- if (!op->get_source().is_osd() &&
+ if (g_conf.osd_balance_reads &&
+ !op->get_source().is_osd() &&
is_primary()) {
// add sample
osd->iat_averager.add_sample( op->get_oid(), (double)g_clock.now() );
}
}
-
+ // -- read shedding
+
// check my load.
// TODO xxx we must also compare with our own load
// if i am x percentage higher than replica ,
// redirect the read
- if ( g_conf.osd_balance_reads == LOAD_LATENCY) {
- double mean_read_time = osd->load_calc.get_average();
-
- if ( mean_read_time != -1 ) {
-
- for (unsigned i=1;
- i<acting.size();
- ++i) {
+ if (g_conf.osd_shed_reads == LOAD_LATENCY) {
+ Mutex::Locker lock(osd->peer_stat_lock);
+
+ // above some minimum?
+ if (osd->my_stat.read_latency >= .001) {
+ for (unsigned i=1; i<acting.size(); ++i) {
int peer = acting[i];
+ if (osd->peer_stat.count(peer) == 0) continue;
- dout(10) << "my read time " << mean_read_time
- << "peer_readtime" << osd->peer_read_time[peer]
+ dout(-10) << "preprocess_op my read latency " << osd->my_stat.read_latency
+ << ", peer " << peer << " is " << osd->peer_stat[peer].read_latency
<< " of peer" << peer << dendl;
- if ( osd->peer_read_time.count(peer) &&
- ( (osd->peer_read_time[peer]*100/mean_read_time) <
- (100 - g_conf.osd_load_diff_percent))) {
+ if ((osd->peer_stat[peer].read_latency/osd->my_stat.read_latency) <
+ (1.0 - g_conf.osd_shed_reads_min_load_diff)) {
dout(10) << " forwarding to peer osd" << peer << dendl;
-
osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
return true;
}
}
}
}
- else if ( g_conf.osd_balance_reads == LOAD_QUEUE_SIZE ) {
+ else if (g_conf.osd_shed_reads == LOAD_QUEUE_SIZE && osd->stat_ops) {
+ Mutex::Locker lock(osd->peer_stat_lock);
+
// am i above my average?
- float my_avg = osd->hb_stat_qlen / osd->hb_stat_ops;
+ float my_avg = osd->stat_qlen / osd->stat_ops;
if (osd->pending_ops > my_avg) {
// is there a peer who is below my average?
for (unsigned i=1; i<acting.size(); ++i) {
int peer = acting[i];
- if (osd->peer_qlen.count(peer) &&
- osd->peer_qlen[peer] < my_avg) {
+ Mutex::Locker lock(osd->peer_stat_lock);
+ if (osd->peer_stat.count(peer) &&
+ osd->peer_stat[peer].qlen < my_avg) {
// calculate a probability that we should redirect
- float p = (my_avg - osd->peer_qlen[peer]) / my_avg; // this is dumb.
+ float p = (my_avg - osd->peer_stat[peer].qlen) / my_avg; // this is dumb.
if (drand48() <= p) {
// take the first one
dout(10) << "my qlen " << osd->pending_ops << " > my_avg " << my_avg
<< ", p=" << p
- << ", fwd to peer w/ qlen " << osd->peer_qlen[peer]
+ << ", fwd to peer w/ qlen " << osd->peer_stat[peer].qlen
<< " osd" << peer
<< dendl;
osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
}
}
- else if ( g_conf.osd_balance_reads == LOAD_HYBRID ) {
+ else if ( g_conf.osd_shed_reads == LOAD_HYBRID ) {
// am i above my average?
- float my_avg = osd->hb_stat_qlen / osd->hb_stat_ops;
+ float my_avg = osd->stat_qlen / osd->stat_ops;
if (osd->pending_ops > my_avg) {
// is there a peer who is below my average?
for (unsigned i=1; i<acting.size(); ++i) {
int peer = acting[i];
- if (osd->peer_qlen.count(peer) &&
- osd->peer_qlen[peer] < my_avg) {
+ Mutex::Locker lock(osd->peer_stat_lock);
+ if (osd->peer_stat.count(peer) &&
+ osd->peer_stat[peer].qlen < my_avg) {
// calculate a probability that we should redirect
- //float p = (my_avg - peer_qlen[peer]) / my_avg; // this is dumb.
+ //float p = (my_avg - peer_stat[peer].qlen) / my_avg; // this is dumb.
- double mean_read_time = osd->load_calc.get_average();
+ double mean_read_time = osd->my_stat.read_latency;
if ( mean_read_time != -1 &&
- osd->peer_read_time.count(peer) &&
- ( (osd->peer_read_time[peer]*100/mean_read_time) <
- ( 100 - g_conf.osd_load_diff_percent) ) )
+ osd->peer_stat.count(peer) &&
+ ( (osd->peer_stat[peer].read_latency/mean_read_time) <
+ ( 1.0 - g_conf.osd_shed_reads_min_load_diff) ) )
//if (drand48() <= p) {
// take the first one
dout(10) << "using hybrid :my qlen " << osd->pending_ops << " > my_avg " << my_avg
<< "my read time "<< mean_read_time
- << "peer read time " << osd->peer_read_time[peer]
- << ", fwd to peer w/ qlen " << osd->peer_qlen[peer]
+ << "peer read time " << osd->peer_stat[peer].read_latency
+ << ", fwd to peer w/ qlen " << osd->peer_stat[peer].read_latency
<< " osd" << peer
<< dendl;
osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
if (osd->store->is_cached( op->get_oid() ,
op->get_offset(),
op->get_length() ) == 0) {
- if (!is_primary()) {
+ if (!is_primary() && !op->get_source().is_osd()) {
// am i allowed?
bool v;
if (osd->store->getattr(op->get_oid(), "balance-reads", &v, 1) < 0) {
- dout(10) << "preprocess_op in-cache but no balance-reads on " << op->get_oid()
+ dout(-10) << "preprocess_op in-cache but no balance-reads on " << op->get_oid()
<< ", fwd to primary" << dendl;
osd->messenger->send_message(op, osd->osdmap->get_inst(get_primary()));
return true;
if (r >= 0) {
reply->set_result(0);
- dout(10) << "READ TIME DIFF"
- << (double)g_clock.now()-op->get_received_time()
- << dendl;
- osd->load_calc.add((double)g_clock.now() - op->get_received_time());
+ utime_t diff = g_clock.now();
+ diff -= op->get_received_time();
+ dout(10) << "op_read total op latency " << diff << dendl;
+ osd->read_latency_calc.add(diff);
} else {
reply->set_result(r); // error
}
-
// send it
osd->messenger->send_message(reply, op->get_client_inst());
}
-void ReplicatedPG::issue_repop(MOSDOp *op, int dest)
+void ReplicatedPG::issue_repop(MOSDOp *op, int dest, osd_peer_stat_t& stat)
{
object_t oid = op->get_oid();
wr->set_rep_tid(op->get_rep_tid());
wr->set_pg_trim_to(peers_complete_thru);
+
+ wr->set_peer_stat(stat);
osd->messenger->send_message(wr, osd->osdmap->get_inst(dest));
}
osd->logger->inc("c_wrb", op->get_length());
}
+ // note my stats
+ osd->peer_stat_lock.Lock();
+ osd->_refresh_my_stat(g_clock.now());
+ osd_peer_stat_t stat = osd->my_stat;
+ osd->peer_stat_lock.Unlock();
+
+
// issue replica writes
RepGather *repop = 0;
bool alone = (acting.size() == 1);
int next = acting[1];
if (acting.size() > 2)
next = acting[2];
- issue_repop(op, next);
+ issue_repop(op, next, stat);
}
else if (g_conf.osd_rep == OSD_REP_SPLAY && !alone) {
// splay rep. send to rest.
for (unsigned i=1; i<acting.size(); ++i)
//for (unsigned i=acting.size()-1; i>=1; --i)
- issue_repop(op, acting[i]);
+ issue_repop(op, acting[i], stat);
} else {
// primary rep, or alone.
repop = new_rep_gather(op);
// send to rest.
if (!alone)
for (unsigned i=1; i<acting.size(); i++)
- issue_repop(op, acting[i]);
+ issue_repop(op, acting[i], stat);
}
if (repop) {
<< " " << op->get_offset() << "~" << op->get_length()
<< dendl;
+ // note peer's stat
+ int fromosd = op->get_source().num();
+ osd->peer_stat_lock.Lock();
+ osd->peer_stat[fromosd] = op->get_peer_stat();
+ dout(20) << "op_rep_modify got peer " << fromosd << " stat " << op->get_peer_stat() << dendl;
+ osd->peer_stat_lock.Unlock();
+
// we better not be missing this.
assert(!missing.is_missing(oid));
}
// infer ack from source
- int fromosd = op->get_source().num();
get_rep_gather(repop);
{
//assert(repop->waitfor_ack.count(fromosd)); // no, we may come thru here twice.
// chain? forward?
if (g_conf.osd_rep == OSD_REP_CHAIN && !is_acker()) {
+ osd_peer_stat_t stat = osd->my_stat; // FIXME
+
// chain rep, not at the tail yet.
int myrank = osd->osdmap->calc_pg_rank(osd->get_nodeid(), acting);
int next = myrank+1;
if (next == (int)acting.size())
next = 1;
- issue_repop(op, acting[next]);
+ issue_repop(op, acting[next], stat);
}
}
void get_rep_gather(RepGather*);
void apply_repop(RepGather *repop);
void put_rep_gather(RepGather*);
- void issue_repop(MOSDOp *op, int osd);
+ void issue_repop(MOSDOp *op, int osd, osd_peer_stat_t& stat);
RepGather *new_rep_gather(MOSDOp *op);
void repop_ack(RepGather *repop,
int result, bool commit,
+struct osd_peer_stat_t {
+ utime_t stamp;
+ double oprate;
+ double qlen;
+ double read_latency;
+ osd_peer_stat_t() : oprate(0), qlen(0), read_latency(0) {}
+};
+
+inline ostream& operator<<(ostream& out, const osd_peer_stat_t &stat) {
+ return out << "stat(" << stat.stamp
+ << " oprate=" << stat.oprate
+ << " qlen=" << stat.qlen
+ << " read_latency=" << stat.read_latency
+ << ")";
+}
+
// -----------------------------------------
class ObjectExtent {