LDINC = ar -rc
else
# For linux
-CFLAGS = -g -fPIC -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE
+CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE
LDINC = ld -i -o
endif
if (!lazy && (in->file_caps() & (CAP_FILE_WRBUFFER|CAP_FILE_RDCACHE))) {
// we're doing buffered i/o. make sure we're inside the file.
// we can trust size info bc we get accurate info when buffering/caching caps are issued.
- dout(-10) << "file size: " << in->inode.size << endl;
+ dout(10) << "file size: " << in->inode.size << endl;
if (offset > 0 && offset >= in->inode.size) {
client_lock.Unlock();
return 0;
syn_modes.push_back( SYNCLIENT_MODE_READFILE );
syn_iargs.push_back( atoi(args[++i]) );
syn_iargs.push_back( atoi(args[++i]) );
+ } else if (strcmp(args[i],"readwriterandom") == 0) {
+ syn_modes.push_back( SYNCLIENT_MODE_RDWRRANDOM );
+ syn_iargs.push_back( atoi(args[++i]) );
+ syn_iargs.push_back( atoi(args[++i]) );
+ } else if (strcmp(args[i],"readwriterandom_ex") == 0) {
+ syn_modes.push_back( SYNCLIENT_MODE_RDWRRANDOM_EX );
+ syn_iargs.push_back( atoi(args[++i]) );
+ syn_iargs.push_back( atoi(args[++i]) );
+ } else if (strcmp(args[i],"readshared") == 0) {
+ syn_modes.push_back( SYNCLIENT_MODE_READSHARED );
+ syn_iargs.push_back( atoi(args[++i]) );
+ syn_iargs.push_back( atoi(args[++i]) );
} else if (strcmp(args[i],"rw") == 0) {
int a = atoi(args[++i]);
int b = atoi(args[++i]);
string sarg1 = get_sarg(0);
int iarg1 = iargs.front(); iargs.pop_front();
int iarg2 = iargs.front(); iargs.pop_front();
+ cout << "WRITING SYN CLIENT" << endl;
if (run_me())
write_file(sarg1, iarg1, iarg2);
}
write_file(sarg1, iarg1, iarg2);
}
break;
+ case SYNCLIENT_MODE_READSHARED:
+ {
+ string sarg1 = "shared";
+ int iarg1 = iargs.front(); iargs.pop_front();
+ int iarg2 = iargs.front(); iargs.pop_front();
+ if (run_me())
+ read_file(sarg1, iarg1, iarg2);
+ }
+ break;
case SYNCLIENT_MODE_WRITEBATCH:
{
int iarg1 = iargs.front(); iargs.pop_front();
string sarg1 = get_sarg(0);
int iarg1 = iargs.front(); iargs.pop_front();
int iarg2 = iargs.front(); iargs.pop_front();
+
+ cout << "READING SYN CLIENT" << endl;
if (run_me())
read_file(sarg1, iarg1, iarg2);
}
break;
+ case SYNCLIENT_MODE_RDWRRANDOM:
+ {
+ string sarg1 = get_sarg(0);
+ int iarg1 = iargs.front(); iargs.pop_front();
+ int iarg2 = iargs.front(); iargs.pop_front();
+
+ cout << "RANDOM READ WRITE SYN CLIENT" << endl;
+ if (run_me())
+ read_random(sarg1, iarg1, iarg2);
+ }
+ break;
+
+ case SYNCLIENT_MODE_RDWRRANDOM_EX:
+ {
+ string sarg1 = get_sarg(0);
+ int iarg1 = iargs.front(); iargs.pop_front();
+ int iarg2 = iargs.front(); iargs.pop_front();
+
+ cout << "RANDOM READ WRITE SYN CLIENT" << endl;
+ if (run_me())
+ read_random_ex(sarg1, iarg1, iarg2);
+ }
+ break;
case SYNCLIENT_MODE_TRACE:
{
string tfile = get_sarg(0);
dout(1) << "read_file got r = " << r << ", probably end of file" << endl;
break;
}
-
+
// verify fingerprint
int bad = 0;
__int64_t *p = (__int64_t*)buf;
return 0;
}
+int SyntheticClient::read_random(string& fn, int size, int rdsize) // size is in MB, wrsize in bytes
+{
+ __uint64_t chunks = (__uint64_t)size * (__uint64_t)(1024*1024) / (__uint64_t)rdsize;
+
+ int fd = client->open(fn.c_str(), O_RDWR);
+ dout(5) << "reading from " << fn << " fd " << fd << endl;
+
+ // cout << "READING FROM " << fn << " fd " << fd << endl;
+
+ // cout << "filename " << fn << " size:" << size << " read size|" << rdsize << "|" << "\ chunks: |" << chunks <<"|" << endl;
+
+ if (fd < 0) return fd;
+ int offset;
+ char * buf = NULL;
+
+ for (unsigned i=0; i<2000; i++) {
+ if (time_to_stop()) break;
+
+ bool read=false;
+
+ time_t seconds;
+ time( &seconds);
+ srand(seconds);
+
+ // use rand instead ??
+ double x = drand48();
+
+ //cout << "RANDOM NUMBER RETURN |" << x << "|" << endl;
+
+ if ( x < 0.5)
+ {
+ //cout << "DECIDED TO READ " << x << endl;
+ buf = new char[rdsize];
+ memset(buf, 1, rdsize);
+ read=true;
+ }
+ else
+ {
+ // cout << "DECIDED TO WRITE " << x << endl;
+ buf = new char[rdsize+100]; // 1 MB
+ memset(buf, 7, rdsize);
+ }
+
+ //double y = drand48() ;
+
+ //cout << "OFFSET is |" << offset << "| chunks |" << chunks<< endl;
+
+ if ( read)
+ {
+ offset=(rand())%(chunks+1);
+ dout(2) << "reading block " << offset << "/" << chunks << endl;
+
+ int r = client->read(fd, buf, rdsize,
+ offset*rdsize);
+ if (r < rdsize) {
+ dout(1) << "read_file got r = " << r << ", probably end of file" << endl;
+ }
+ }
+ else
+ {
+ dout(2) << "writing block " << offset << "/" << chunks << endl;
+
+ // fill buf with a 16 byte fingerprint
+ // 64 bits : file offset
+ // 64 bits : client id
+ // = 128 bits (16 bytes)
+
+ //if (true )
+ //{
+ //int count = rand()%10;
+
+ //for ( int j=0;j<count; j++ )
+ //{
+
+ offset=(rand())%(chunks+1);
+ __uint64_t *p = (__uint64_t*)buf;
+ while ((char*)p < buf + rdsize) {
+ *p = offset*rdsize + (char*)p - buf;
+ p++;
+ *p = client->get_nodeid();
+ p++;
+ }
+
+ client->write(fd, buf, rdsize,
+ offset*rdsize);
+ //}
+ //}
+ }
+
+ // verify fingerprint
+ if ( read )
+ {
+ int bad = 0;
+ __int64_t *p = (__int64_t*)buf;
+ __int64_t readoff, readclient;
+ while ((char*)p + 32 < buf + rdsize) {
+ readoff = *p;
+ __int64_t wantoff = offset*rdsize + (__int64_t)((char*)p - buf);
+ p++;
+ readclient = *p;
+ p++;
+ if (readoff != wantoff ||
+ readclient != client->get_nodeid()) {
+ if (!bad)
+ dout(0) << "WARNING: wrong data from OSD, block says fileoffset=" << readoff << " client=" << readclient
+ << ", should be offset " << wantoff << " clietn " << client->get_nodeid()
+ << endl;
+ bad++;
+ }
+ }
+ if (bad)
+ dout(0) << " + " << (bad-1) << " other bad 16-byte bits in this block" << endl;
+ }
+ }
+
+ client->close(fd);
+ delete[] buf;
+
+ return 0;
+}
+
+
+//#include<stdio.h>
+//#include<stdlib.h>
+
+int normdist(int min, int max, int stdev) /* specifies input values */;
+//main()
+//{
+ // for ( int i=0; i < 10; i++ )
+ // normdist ( 0 , 10, 1 );
+
+//}
+
+
+int normdist(int min, int max, int stdev) /* specifies input values */
+{
+/* min: Minimum value; max: Maximum value; stdev: degree of deviation */
+
+//int min, max, stdev; {
+ time_t seconds;
+ time( &seconds);
+ srand(seconds);
+
+ int range, iterate, result;
+/* declare range, iterate and result as integers, to avoid the need for
+floating point math*/
+
+ result = 0;
+/* ensure result is initialized to 0 */
+
+ range = max -min;
+/* calculate range of possible values between the max and min values */
+
+ iterate = range / stdev;
+/* this number of iterations ensures the proper shape of the resulting
+curve */
+
+ stdev += 1; /* compensation for integer vs. floating point math */
+ for (int c = iterate; c != 0; c--) /* loop through iterations */
+ {
+ // result += (uniform (1, 100) * stdev) / 100; /* calculate and
+ result += ( (rand()%100 + 1) * stdev) / 100;
+ // printf("result=%d\n", result );
+ }
+ printf("\n final result=%d\n", result );
+ return result + min; /* send final result back */
+}
+int SyntheticClient::read_random_ex(string& fn, int size, int rdsize) // size is in MB, wrsize in bytes
+{
+ __uint64_t chunks = (__uint64_t)size * (__uint64_t)(1024*1024) / (__uint64_t)rdsize;
+
+ int fd = client->open(fn.c_str(), O_RDWR);
+ dout(5) << "reading from " << fn << " fd " << fd << endl;
+
+ // cout << "READING FROM " << fn << " fd " << fd << endl;
+
+ // cout << "filename " << fn << " size:" << size << " read size|" << rdsize << "|" << "\ chunks: |" << chunks <<"|" << endl;
+
+ if (fd < 0) return fd;
+ int offset;
+ char * buf = NULL;
+
+ for (unsigned i=0; i<2000; i++) {
+ if (time_to_stop()) break;
+
+ bool read=false;
+
+ time_t seconds;
+ time( &seconds);
+ srand(seconds);
+
+ // use rand instead ??
+ double x = drand48();
+
+ //cout << "RANDOM NUMBER RETURN |" << x << "|" << endl;
+
+ if ( x < 0.5)
+ {
+ //cout << "DECIDED TO READ " << x << endl;
+ buf = new char[rdsize];
+ memset(buf, 1, rdsize);
+ read=true;
+ }
+ else
+ {
+ // cout << "DECIDED TO WRITE " << x << endl;
+ buf = new char[rdsize+100]; // 1 MB
+ memset(buf, 7, rdsize);
+ }
+
+ //double y = drand48() ;
+
+ //cout << "OFFSET is |" << offset << "| chunks |" << chunks<< endl;
+
+ if ( read)
+ {
+ //offset=(rand())%(chunks+1);
+
+ /* if ( chunks > 10000 )
+ offset= normdist( 0 , chunks/1000 , 5 )*1000;
+ else if ( chunks > 1000 )
+ offset= normdist( 0 , chunks/100 , 5 )*100;
+ else if ( chunks > 100 )
+ offset= normdist( 0 , chunks/20 , 5 )*20;*/
+
+
+ dout(2) << "reading block " << offset << "/" << chunks << endl;
+
+ int r = client->read(fd, buf, rdsize,
+ offset*rdsize);
+ if (r < rdsize) {
+ dout(1) << "read_file got r = " << r << ", probably end of file" << endl;
+ }
+ }
+ else
+ {
+ dout(2) << "writing block " << offset << "/" << chunks << endl;
+
+ // fill buf with a 16 byte fingerprint
+ // 64 bits : file offset
+ // 64 bits : client id
+ // = 128 bits (16 bytes)
+
+ //if (true )
+ //{
+ int count = rand()%10;
+
+ for ( int j=0;j<count; j++ )
+ {
+
+ offset=(rand())%(chunks+1);
+ __uint64_t *p = (__uint64_t*)buf;
+ while ((char*)p < buf + rdsize) {
+ *p = offset*rdsize + (char*)p - buf;
+ p++;
+ *p = client->get_nodeid();
+ p++;
+ }
+
+ client->write(fd, buf, rdsize,
+ offset*rdsize);
+ }
+ //}
+ }
+
+ // verify fingerprint
+ if ( read )
+ {
+ int bad = 0;
+ __int64_t *p = (__int64_t*)buf;
+ __int64_t readoff, readclient;
+ while ((char*)p + 32 < buf + rdsize) {
+ readoff = *p;
+ __int64_t wantoff = offset*rdsize + (__int64_t)((char*)p - buf);
+ p++;
+ readclient = *p;
+ p++;
+ if (readoff != wantoff ||
+ readclient != client->get_nodeid()) {
+ if (!bad)
+ dout(0) << "WARNING: wrong data from OSD, block says fileoffset=" << readoff << " client=" << readclient
+ << ", should be offset " << wantoff << " clietn " << client->get_nodeid()
+ << endl;
+ bad++;
+ }
+ }
+ if (bad)
+ dout(0) << " + " << (bad-1) << " other bad 16-byte bits in this block" << endl;
+ }
+ }
+
+ client->close(fd);
+ delete[] buf;
+
+ return 0;
+}
int SyntheticClient::random_walk(int num_req)
#define SYNCLIENT_MODE_READFILE 21
#define SYNCLIENT_MODE_WRITEBATCH 22
#define SYNCLIENT_MODE_WRSHARED 23
+#define SYNCLIENT_MODE_READSHARED 24
+#define SYNCLIENT_MODE_RDWRRANDOM 25
+#define SYNCLIENT_MODE_RDWRRANDOM_EX 26
#define SYNCLIENT_MODE_TRACE 30
int write_file(string& fn, int mb, int chunk);
int write_batch(int nfile, int mb, int chunk);
int read_file(string& fn, int mb, int chunk);
+ int read_random(string& fn, int mb, int chunk);
+ int read_random_ex(string& fn, int mb, int chunk);
int clean_dir(string& basedir);
// --- osd ---
osd_rep: OSD_REP_PRIMARY,
osd_balance_reads: false,
+ osd_immediate_read_from_cache: false, //osds to read from the cache immediately?
+ osd_exclusive_caching: false,
+ osd_load_diff_percent: 20, // load diff for read forwarding
+ osd_load_balance_scheme: 1,
+
osd_pg_bits: 0, // 0 == let osdmonitor decide
osd_object_layout: OBJECT_LAYOUT_HASHINO,
osd_pg_layout: PG_LAYOUT_CRUSH,
else if (strcmp(args[i], "--osd_balance_reads") == 0)
g_conf.osd_balance_reads = atoi(args[++i]);
+ else if (strcmp(args[i], "--osd_load_diff_percent") == 0)
+ g_conf.osd_load_diff_percent = atoi(args[++i]);
+ else if (strcmp(args[i], "--osd_load_balance_scheme") == 0)
+ g_conf.osd_load_balance_scheme = atoi(args[++i]);
+ else if ( strcmp(args[i],"--osd_immediate_read_from_cache" ) == 0)
+ g_conf.osd_immediate_read_from_cache = atoi(args[++i]);
+ else if ( strcmp(args[i],"--osd_exclusive_caching" ) == 0)
+ g_conf.osd_exclusive_caching = atoi(args[++i]);
+
else if (strcmp(args[i], "--osd_rep") == 0)
g_conf.osd_rep = atoi(args[++i]);
else if (strcmp(args[i], "--osd_rep_chain") == 0)
// osd
int osd_rep;
bool osd_balance_reads;
+ bool osd_immediate_read_from_cache;
+ bool osd_exclusive_caching;
+ int osd_load_diff_percent;
+ int osd_load_balance_scheme;
int osd_pg_bits;
int osd_object_layout;
int osd_pg_layout;
if (!rx.empty()) {
BufferHead *wait_on = rx.begin()->second;
Context *c = new C_Cond(will_wait_on, will_wait_on_bool);
- dout(1) << "attempt_read waiting for read to finish on " << *wait_on << " c " << c << endl;
+ dout(20) << "attempt_read waiting for read to finish on " << *wait_on << " c " << c << endl;
block_t b = MAX(wait_on->start(), bstart);
wait_on->waitfor_read[b].push_back(c);
return false;
#define __UTIME_H
#include <math.h>
-
+#include <sys/time.h>
// --------
// utime_t
// ---------------------------------------------
// locking
// noop unless overloaded.
- virtual SimpleLock* get_lock(int type) { assert(0); }
+ virtual SimpleLock* get_lock(int type) { assert(0); return 0; }
virtual void set_mlock_info(MLock *m) { assert(0); }
virtual void encode_lock_state(int type, bufferlist& bl) { assert(0); }
virtual void decode_lock_state(int type, bufferlist& bl) { assert(0); }
bufferlist data;
map<string,bufferptr> attrset;
+ double request_received_time;
+
friend class MOSDOpReply;
const bool wants_ack() { return st.want_ack; }
const bool wants_commit() { return st.want_commit; }
+ void set_received_time(double time) {
+ request_received_time = time;
+ }
+ double get_received_time() {
+ return request_received_time;
+ }
+
void set_data(bufferlist &d) {
data.claim(d);
#ifndef __MOSDPING_H
#define __MOSDPING_H
+#include "common/Clock.h"
+
#include "msg/Message.h"
epoch_t map_epoch;
bool ack;
float avg_qlen;
+ double read_mean_time;
MOSDPing(epoch_t e,
float aq,
- bool a=false) : Message(MSG_OSD_PING), map_epoch(e), ack(a), avg_qlen(aq) {
+ double _read_mean_time,
+ bool a=false) : Message(MSG_OSD_PING), map_epoch(e), ack(a), avg_qlen(aq), read_mean_time(_read_mean_time) {
}
MOSDPing() {}
off += sizeof(ack);
payload.copy(off, sizeof(avg_qlen), (char*)&avg_qlen);
off += sizeof(avg_qlen);
+ payload.copy(off, sizeof(read_mean_time), (char*)&read_mean_time);
+ off += sizeof(read_mean_time);
}
virtual void encode_payload() {
payload.append((char*)&map_epoch, sizeof(map_epoch));
payload.append((char*)&ack, sizeof(ack));
payload.append((char*)&avg_qlen, sizeof(avg_qlen));
+ payload.append((char*)&read_mean_time, sizeof(read_mean_time));
}
virtual char *get_type_name() { return "oping"; }
char *osd_base_path = "./osddata";
char *ebofs_base_path = "./dev";
+const int LOAD_LATENCY =1;
+const int LOAD_QUEUE_SIZE=2;
+const int LOAD_HYBRID =3;
object_t SUPERBLOCK_OBJECT(0,0);
LogType osd_logtype;
-OSD::OSD(int id, Messenger *m, MonMap *mm, char *dev) : timer(osd_lock)
+OSD::OSD(int id, Messenger *m, MonMap *mm, char *dev) :
+ timer(osd_lock),
+ load_calc(g_conf.osd_max_opq<1?1:g_conf.osd_max_opq)
{
whoami = id;
messenger = m;
float avg_qlen = 0;
if (hb_stat_ops) avg_qlen = (float)hb_stat_qlen / (float)hb_stat_ops;
+ double read_mean_time = load_calc.get_average();
+
dout(5) << "heartbeat " << now
<< ": ops " << hb_stat_ops
<< ", avg qlen " << avg_qlen
+ << ", mean read time " << read_mean_time
<< dendl;
// reset until next time around
i != pingset.end();
i++) {
_share_map_outgoing( osdmap->get_inst(*i) );
- messenger->send_message(new MOSDPing(osdmap->get_epoch(), avg_qlen),
- osdmap->get_inst(*i));
+ messenger->send_message(new MOSDPing(osdmap->get_epoch(),
+ avg_qlen,
+ read_mean_time ),
+ osdmap->get_inst(*i));
}
if (logger) logger->set("pingset", pingset.size());
void OSD::handle_osd_ping(MOSDPing *m)
{
dout(20) << "osdping from " << m->get_source() << dendl;
+
_share_map_incoming(m->get_source_inst(), ((MOSDPing*)m)->map_epoch);
int from = m->get_source().num();
peer_qlen[from] = m->avg_qlen;
+ peer_read_time[from] = m->read_mean_time;
//if (!m->ack)
//messenger->send_message(new MOSDPing(osdmap->get_epoch(), true),
const pg_t pgid = op->get_pg();
PG *pg = _have_pg(pgid) ? _lock_pg(pgid):0;
-
logger->set("buf", buffer_total_alloc);
+ // mark the read request received time for finding the
+ // read througput load.
+ op->set_received_time(g_clock.now());
+
// update qlen stats
hb_stat_ops++;
hb_stat_qlen += pending_ops;
// require same or newer map
if (!require_same_or_newer_map(op, op->get_map_epoch())) {
- _unlock_pg(pgid);
+ if (pg) _unlock_pg(pgid);
return;
}
<< pgid
<< ", waiting" << dendl;
waiting_for_pg[pgid].push_back(op);
- _unlock_pg(pgid);
return;
}
}
*/
+
+ dout(10) << "handle_op " << *op << " in " << *pg << endl;
+
+ // if this is a read and the data is in the cache ,do an immediate read..
+ if ( read && g_conf.osd_immediate_read_from_cache ) {
+ dout(10) << "trying to see whether data is in cache" << *op << endl;
+ if ( store->is_cached( op->get_oid() ,
+ op->get_offset(),
+ op->get_length() ) == 0 )
+ {
+ dout(10) << "data is in cache, reading from cache" << *op << endl;
+ pg->do_op(op); // do it now
+ _unlock_pg(pgid);
+ return;
+ }
+ }
+
dout(7) << "handle_op " << *op << " in " << *pg << dendl;
-
// balance reads?
if (read &&
g_conf.osd_balance_reads &&
- pg->get_acker() == whoami) {
- // test
- if (false) {
- if (pg->acting.size() > 1) {
- int peer = pg->acting[1];
- dout(-10) << "fwd client read op to osd" << peer << " for " << op->get_client() << " " << op->get_client_inst() << dendl;
- messenger->send_message(op, osdmap->get_inst(peer));
- _unlock_pg(pgid);
- return;
+ pg->get_acker() == whoami)
+ {
+ // test
+ if (false) {
+ if (pg->acting.size() > 1) {
+ int peer = pg->acting[1];
+ dout(-10) << "fwd client read op to osd" << peer << " for " << op->get_client() << " " << op->get_client_inst() << dendl;
+ messenger->send_message(op, osdmap->get_inst(peer));
+ _unlock_pg(pgid);
+ return;
+ }
}
- }
-
- // am i above my average?
- float my_avg = hb_stat_qlen / hb_stat_ops;
- if (pending_ops > my_avg) {
- // is there a peer who is below my average?
- for (unsigned i=1; i<pg->acting.size(); ++i) {
- int peer = pg->acting[i];
- if (peer_qlen.count(peer) &&
- peer_qlen[peer] < my_avg) {
- // calculate a probability that we should redirect
- float p = (my_avg - peer_qlen[peer]) / my_avg; // this is dumb.
+
+
+ // check my load.
+ // TODO xxx we must also compare with our own load
+ // if i am x percentage higher than replica ,
+ // redirect the read
+
+ //if ( g_conf.osd_load_balance_scheme == LOAD_LATENCY)
+ if ( g_conf.osd_balance_reads == LOAD_LATENCY)
+ {
+
+ double mean_read_time = load_calc.get_average();
- if (drand48() <= p) {
- // take the first one
- dout(-10) << "my qlen " << pending_ops << " > my_avg " << my_avg
- << ", p=" << p
- << ", fwd to peer w/ qlen " << peer_qlen[peer]
- << " osd" << peer
- << dendl;
- messenger->send_message(op, osdmap->get_inst(peer));
- _unlock_pg(pgid);
- return;
+ if ( mean_read_time != -1 )
+ {
+
+ for (unsigned i=1;
+ i<pg->acting.size();
+ ++i)
+ {
+ int peer = pg->acting[i];
+
+ dout(10) << "my read time " << mean_read_time
+ << "peer_readtime" << peer_read_time[peer]
+ << " of peer" << peer << endl;
+
+ if ( peer_read_time.count(peer) &&
+ ( (peer_read_time[peer]*100/mean_read_time) <
+ ( 100 - g_conf.osd_load_diff_percent)))
+ {
+ dout(10) << " forwarding to peer osd" << peer << endl;
+
+ messenger->send_message(op, osdmap->get_inst(peer));
+ _unlock_pg(pgid);
+ return;
+ }
+ }
+ }
+ }
+ //else if ( g_conf.osd_load_balance_scheme == LOAD_QUEUE_SIZE )
+ else if ( g_conf.osd_balance_reads == LOAD_QUEUE_SIZE )
+ {
+
+
+ // am i above my average?
+ float my_avg = hb_stat_qlen / hb_stat_ops;
+
+ if (pending_ops > my_avg) {
+ // is there a peer who is below my average?
+ for (unsigned i=1; i<pg->acting.size(); ++i) {
+ int peer = pg->acting[i];
+ if (peer_qlen.count(peer) &&
+ peer_qlen[peer] < my_avg) {
+ // calculate a probability that we should redirect
+ float p = (my_avg - peer_qlen[peer]) / my_avg; // this is dumb.
+
+ if (drand48() <= p) {
+ // take the first one
+ dout(10) << "my qlen " << pending_ops << " > my_avg " << my_avg
+ << ", p=" << p
+ << ", fwd to peer w/ qlen " << peer_qlen[peer]
+ << " osd" << peer
+ << dendl;
+ messenger->send_message(op, osdmap->get_inst(peer));
+ _unlock_pg(pgid);
+ return;
+ }
+ }
+ }
}
+
}
- }
+ //else if ( g_conf.osd_load_balance_scheme == LOAD_HYBRID )
+ else if ( g_conf.osd_balance_reads == LOAD_HYBRID )
+ {
+
+ // am i above my average?
+ float my_avg = hb_stat_qlen / hb_stat_ops;
+
+ if (pending_ops > my_avg) {
+ // is there a peer who is below my average?
+ for (unsigned i=1; i<pg->acting.size(); ++i) {
+ int peer = pg->acting[i];
+ if (peer_qlen.count(peer) &&
+ peer_qlen[peer] < my_avg) {
+ // calculate a probability that we should redirect
+ //float p = (my_avg - peer_qlen[peer]) / my_avg; // this is dumb.
+
+ double mean_read_time = load_calc.get_average();
+
+ if ( mean_read_time != -1 &&
+ peer_read_time.count(peer) &&
+ ( (peer_read_time[peer]*100/mean_read_time) <
+ ( 100 - g_conf.osd_load_diff_percent) ) )
+ //if (drand48() <= p) {
+ // take the first one
+ dout(10) << "using hybrid :my qlen " << pending_ops << " > my_avg " << my_avg
+ << "my read time "<< mean_read_time
+ << "peer read time " << peer_read_time[peer]
+ << ", fwd to peer w/ qlen " << peer_qlen[peer]
+ << " osd" << peer
+ << endl;
+ messenger->send_message(op, osdmap->get_inst(peer));
+ _unlock_pg(pgid);
+ return;
+ //}
+ }
+ }
+ }
+ }
+
}
- }
-
} else {
// REPLICATION OP (it's from another OSD)
assert(pg->get_role() >= 0);
dout(7) << "handle_rep_op " << op << " in " << *pg << dendl;
+
+ // a redirected read...handle this differently ..
+ // if the data is in cache ( a rare case? ), return the data immediately
+ if ( read && g_conf.osd_immediate_read_from_cache )
+ {
+ dout(10) << "redirected read, trying to see whether data is in cache " << *op << endl;
+ if ( store->is_cached( op->get_oid() ,
+ op->get_offset(),
+ op->get_length()) == 0 )
+ {
+ dout(10) << "redirected read, data is in cache, reading from cache " << *op << endl;
+ pg->do_op(op); // do it now
+ _unlock_pg(pgid);
+ return;
+ }
+ }
}
+
if (g_conf.osd_maxthreads < 1) {
_unlock_pg(pgid);
} else {
_unlock_pg(pgid);
- // queue for worker threads
- /*if (read)
- enqueue_op(0, op); // no locking needed for reads
- else
- */
- enqueue_op(pgid, op);
+ enqueue_op(pgid, op); // queue for worker threads
}
}
+
void OSD::handle_op_reply(MOSDOpReply *op)
{
if (op->get_map_epoch() < boot_epoch) {
static const int STATE_STOPPING = 3;
+ // load calculation
+ //current implementation is moving averges.
+ class LoadCalculator {
+ private:
+ deque<double> m_Data ;
+ unsigned m_Size ;
+ double m_Total ;
+
+ public:
+ LoadCalculator( unsigned size ) : m_Size(0), m_Total(0) { }
+
+ void add( double element ) {
+ // add item
+ m_Data.push_back(element);
+ m_Total += element;
+
+ // trim
+ while (m_Data.size() > m_Size) {
+ m_Total -= m_Data.front();
+ m_Data.pop_front();
+ }
+ }
+
+ double get_average() {
+ if (m_Data.empty())
+ return -1;
+ return m_Total / (double)m_Data.size();
+ }
+ };
+
+
/** OSD **/
protected:
Mutex osd_lock; // global lock
Logger *logger;
ObjectStore *store;
MonMap *monmap;
+ LoadCalculator load_calc;
int whoami;
char dev_path[100];
int hb_stat_qlen; // cumulative queue length since last hb
hash_map<int, float> peer_qlen;
+ hash_map<int, double> peer_read_time;
// -- waiters --
Context *onsafe) = 0;//{ return -1; }
virtual void trim_from_cache(object_t oid,
off_t offset, size_t len) { }
+ virtual int is_cached(object_t oid,
+ off_t offset,
+ size_t len) { return -1; }
virtual int setattr(object_t oid, const char *name,
const void *value, size_t size,
reply->set_result(0);
reply->set_data(bl);
reply->set_length(r);
+
+ dout(10) << "READ TIME DIFF"
+ << (double)g_clock.now()-op->get_received_time()
+ << endl;
+ osd->load_calc.add((double)g_clock.now() - op->get_received_time());
+
} else {
reply->set_result(r); // error
reply->set_length(0);
put_rep_gather(repop);
} else {
+ // not acker.
// chain or splay. apply.
ObjectStore::Transaction t;
prepare_log_transaction(t, op, nv, crev, op->get_rev(), peers_complete_thru);
assert(r == 0);
}
+ // lets evict the data from our cache to maintain a total large cache size
+ if (g_conf.osd_exclusive_caching)
+ osd->store->trim_from_cache(op->get_oid() , op->get_offset(), op->get_length());
+
oncommit->ack();
}