From b0469bdd2c54fff4989f23545ee6babbf1a09b1b Mon Sep 17 00:00:00 2001 From: sageweil Date: Tue, 22 May 2007 17:55:16 +0000 Subject: [PATCH] * merged suresh's read balancing changes git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1363 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/pgs/Makefile | 2 +- branches/sage/pgs/client/Client.cc | 2 +- branches/sage/pgs/client/SyntheticClient.cc | 345 +++++++++++++++++++- branches/sage/pgs/client/SyntheticClient.h | 5 + branches/sage/pgs/config.cc | 14 + branches/sage/pgs/config.h | 4 + branches/sage/pgs/ebofs/Ebofs.cc | 2 +- branches/sage/pgs/include/utime.h | 2 +- branches/sage/pgs/mds/mdstypes.h | 2 +- branches/sage/pgs/messages/MOSDOp.h | 9 + branches/sage/pgs/messages/MOSDPing.h | 9 +- branches/sage/pgs/osd/OSD.cc | 222 ++++++++++--- branches/sage/pgs/osd/OSD.h | 33 ++ branches/sage/pgs/osd/ObjectStore.h | 3 + branches/sage/pgs/osd/ReplicatedPG.cc | 11 + 15 files changed, 611 insertions(+), 54 deletions(-) diff --git a/branches/sage/pgs/Makefile b/branches/sage/pgs/Makefile index 07657214a5984..97246c8d618d9 100644 --- a/branches/sage/pgs/Makefile +++ b/branches/sage/pgs/Makefile @@ -12,7 +12,7 @@ CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -D_REENTRANT -D_THREAD_SAFE -DDARWI LDINC = ar -rc else # For linux -CFLAGS = -g -fPIC -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE +CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE LDINC = ld -i -o endif diff --git a/branches/sage/pgs/client/Client.cc b/branches/sage/pgs/client/Client.cc index fc1546ff9ca07..a6621b7731c63 100644 --- a/branches/sage/pgs/client/Client.cc +++ b/branches/sage/pgs/client/Client.cc @@ -2526,7 +2526,7 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset) if (!lazy && (in->file_caps() & (CAP_FILE_WRBUFFER|CAP_FILE_RDCACHE))) { // we're doing buffered i/o. make sure we're inside the file. // we can trust size info bc we get accurate info when buffering/caching caps are issued. - dout(-10) << "file size: " << in->inode.size << endl; + dout(10) << "file size: " << in->inode.size << endl; if (offset > 0 && offset >= in->inode.size) { client_lock.Unlock(); return 0; diff --git a/branches/sage/pgs/client/SyntheticClient.cc b/branches/sage/pgs/client/SyntheticClient.cc index d6adf65cbdcf7..7e7f95f71b1f3 100644 --- a/branches/sage/pgs/client/SyntheticClient.cc +++ b/branches/sage/pgs/client/SyntheticClient.cc @@ -68,6 +68,18 @@ void parse_syn_options(vector& args) syn_modes.push_back( SYNCLIENT_MODE_READFILE ); syn_iargs.push_back( atoi(args[++i]) ); syn_iargs.push_back( atoi(args[++i]) ); + } else if (strcmp(args[i],"readwriterandom") == 0) { + syn_modes.push_back( SYNCLIENT_MODE_RDWRRANDOM ); + syn_iargs.push_back( atoi(args[++i]) ); + syn_iargs.push_back( atoi(args[++i]) ); + } else if (strcmp(args[i],"readwriterandom_ex") == 0) { + syn_modes.push_back( SYNCLIENT_MODE_RDWRRANDOM_EX ); + syn_iargs.push_back( atoi(args[++i]) ); + syn_iargs.push_back( atoi(args[++i]) ); + } else if (strcmp(args[i],"readshared") == 0) { + syn_modes.push_back( SYNCLIENT_MODE_READSHARED ); + syn_iargs.push_back( atoi(args[++i]) ); + syn_iargs.push_back( atoi(args[++i]) ); } else if (strcmp(args[i],"rw") == 0) { int a = atoi(args[++i]); int b = atoi(args[++i]); @@ -439,6 +451,7 @@ int SyntheticClient::run() string sarg1 = get_sarg(0); int iarg1 = iargs.front(); iargs.pop_front(); int iarg2 = iargs.front(); iargs.pop_front(); + cout << "WRITING SYN CLIENT" << endl; if (run_me()) write_file(sarg1, iarg1, iarg2); } @@ -452,6 +465,15 @@ int SyntheticClient::run() write_file(sarg1, iarg1, iarg2); } break; + case SYNCLIENT_MODE_READSHARED: + { + string sarg1 = "shared"; + int iarg1 = iargs.front(); iargs.pop_front(); + int iarg2 = iargs.front(); iargs.pop_front(); + if (run_me()) + read_file(sarg1, iarg1, iarg2); + } + break; case SYNCLIENT_MODE_WRITEBATCH: { int iarg1 = iargs.front(); iargs.pop_front(); @@ -468,11 +490,36 @@ int SyntheticClient::run() string sarg1 = get_sarg(0); int iarg1 = iargs.front(); iargs.pop_front(); int iarg2 = iargs.front(); iargs.pop_front(); + + cout << "READING SYN CLIENT" << endl; if (run_me()) read_file(sarg1, iarg1, iarg2); } break; + case SYNCLIENT_MODE_RDWRRANDOM: + { + string sarg1 = get_sarg(0); + int iarg1 = iargs.front(); iargs.pop_front(); + int iarg2 = iargs.front(); iargs.pop_front(); + + cout << "RANDOM READ WRITE SYN CLIENT" << endl; + if (run_me()) + read_random(sarg1, iarg1, iarg2); + } + break; + + case SYNCLIENT_MODE_RDWRRANDOM_EX: + { + string sarg1 = get_sarg(0); + int iarg1 = iargs.front(); iargs.pop_front(); + int iarg2 = iargs.front(); iargs.pop_front(); + + cout << "RANDOM READ WRITE SYN CLIENT" << endl; + if (run_me()) + read_random_ex(sarg1, iarg1, iarg2); + } + break; case SYNCLIENT_MODE_TRACE: { string tfile = get_sarg(0); @@ -1098,7 +1145,7 @@ int SyntheticClient::read_file(string& fn, int size, int rdsize) // size is in dout(1) << "read_file got r = " << r << ", probably end of file" << endl; break; } - + // verify fingerprint int bad = 0; __int64_t *p = (__int64_t*)buf; @@ -1128,6 +1175,302 @@ int SyntheticClient::read_file(string& fn, int size, int rdsize) // size is in return 0; } +int SyntheticClient::read_random(string& fn, int size, int rdsize) // size is in MB, wrsize in bytes +{ + __uint64_t chunks = (__uint64_t)size * (__uint64_t)(1024*1024) / (__uint64_t)rdsize; + + int fd = client->open(fn.c_str(), O_RDWR); + dout(5) << "reading from " << fn << " fd " << fd << endl; + + // cout << "READING FROM " << fn << " fd " << fd << endl; + + // cout << "filename " << fn << " size:" << size << " read size|" << rdsize << "|" << "\ chunks: |" << chunks <<"|" << endl; + + if (fd < 0) return fd; + int offset; + char * buf = NULL; + + for (unsigned i=0; i<2000; i++) { + if (time_to_stop()) break; + + bool read=false; + + time_t seconds; + time( &seconds); + srand(seconds); + + // use rand instead ?? + double x = drand48(); + + //cout << "RANDOM NUMBER RETURN |" << x << "|" << endl; + + if ( x < 0.5) + { + //cout << "DECIDED TO READ " << x << endl; + buf = new char[rdsize]; + memset(buf, 1, rdsize); + read=true; + } + else + { + // cout << "DECIDED TO WRITE " << x << endl; + buf = new char[rdsize+100]; // 1 MB + memset(buf, 7, rdsize); + } + + //double y = drand48() ; + + //cout << "OFFSET is |" << offset << "| chunks |" << chunks<< endl; + + if ( read) + { + offset=(rand())%(chunks+1); + dout(2) << "reading block " << offset << "/" << chunks << endl; + + int r = client->read(fd, buf, rdsize, + offset*rdsize); + if (r < rdsize) { + dout(1) << "read_file got r = " << r << ", probably end of file" << endl; + } + } + else + { + dout(2) << "writing block " << offset << "/" << chunks << endl; + + // fill buf with a 16 byte fingerprint + // 64 bits : file offset + // 64 bits : client id + // = 128 bits (16 bytes) + + //if (true ) + //{ + //int count = rand()%10; + + //for ( int j=0;jget_nodeid(); + p++; + } + + client->write(fd, buf, rdsize, + offset*rdsize); + //} + //} + } + + // verify fingerprint + if ( read ) + { + int bad = 0; + __int64_t *p = (__int64_t*)buf; + __int64_t readoff, readclient; + while ((char*)p + 32 < buf + rdsize) { + readoff = *p; + __int64_t wantoff = offset*rdsize + (__int64_t)((char*)p - buf); + p++; + readclient = *p; + p++; + if (readoff != wantoff || + readclient != client->get_nodeid()) { + if (!bad) + dout(0) << "WARNING: wrong data from OSD, block says fileoffset=" << readoff << " client=" << readclient + << ", should be offset " << wantoff << " clietn " << client->get_nodeid() + << endl; + bad++; + } + } + if (bad) + dout(0) << " + " << (bad-1) << " other bad 16-byte bits in this block" << endl; + } + } + + client->close(fd); + delete[] buf; + + return 0; +} + + +//#include +//#include + +int normdist(int min, int max, int stdev) /* specifies input values */; +//main() +//{ + // for ( int i=0; i < 10; i++ ) + // normdist ( 0 , 10, 1 ); + +//} + + +int normdist(int min, int max, int stdev) /* specifies input values */ +{ +/* min: Minimum value; max: Maximum value; stdev: degree of deviation */ + +//int min, max, stdev; { + time_t seconds; + time( &seconds); + srand(seconds); + + int range, iterate, result; +/* declare range, iterate and result as integers, to avoid the need for +floating point math*/ + + result = 0; +/* ensure result is initialized to 0 */ + + range = max -min; +/* calculate range of possible values between the max and min values */ + + iterate = range / stdev; +/* this number of iterations ensures the proper shape of the resulting +curve */ + + stdev += 1; /* compensation for integer vs. floating point math */ + for (int c = iterate; c != 0; c--) /* loop through iterations */ + { + // result += (uniform (1, 100) * stdev) / 100; /* calculate and + result += ( (rand()%100 + 1) * stdev) / 100; + // printf("result=%d\n", result ); + } + printf("\n final result=%d\n", result ); + return result + min; /* send final result back */ +} +int SyntheticClient::read_random_ex(string& fn, int size, int rdsize) // size is in MB, wrsize in bytes +{ + __uint64_t chunks = (__uint64_t)size * (__uint64_t)(1024*1024) / (__uint64_t)rdsize; + + int fd = client->open(fn.c_str(), O_RDWR); + dout(5) << "reading from " << fn << " fd " << fd << endl; + + // cout << "READING FROM " << fn << " fd " << fd << endl; + + // cout << "filename " << fn << " size:" << size << " read size|" << rdsize << "|" << "\ chunks: |" << chunks <<"|" << endl; + + if (fd < 0) return fd; + int offset; + char * buf = NULL; + + for (unsigned i=0; i<2000; i++) { + if (time_to_stop()) break; + + bool read=false; + + time_t seconds; + time( &seconds); + srand(seconds); + + // use rand instead ?? + double x = drand48(); + + //cout << "RANDOM NUMBER RETURN |" << x << "|" << endl; + + if ( x < 0.5) + { + //cout << "DECIDED TO READ " << x << endl; + buf = new char[rdsize]; + memset(buf, 1, rdsize); + read=true; + } + else + { + // cout << "DECIDED TO WRITE " << x << endl; + buf = new char[rdsize+100]; // 1 MB + memset(buf, 7, rdsize); + } + + //double y = drand48() ; + + //cout << "OFFSET is |" << offset << "| chunks |" << chunks<< endl; + + if ( read) + { + //offset=(rand())%(chunks+1); + + /* if ( chunks > 10000 ) + offset= normdist( 0 , chunks/1000 , 5 )*1000; + else if ( chunks > 1000 ) + offset= normdist( 0 , chunks/100 , 5 )*100; + else if ( chunks > 100 ) + offset= normdist( 0 , chunks/20 , 5 )*20;*/ + + + dout(2) << "reading block " << offset << "/" << chunks << endl; + + int r = client->read(fd, buf, rdsize, + offset*rdsize); + if (r < rdsize) { + dout(1) << "read_file got r = " << r << ", probably end of file" << endl; + } + } + else + { + dout(2) << "writing block " << offset << "/" << chunks << endl; + + // fill buf with a 16 byte fingerprint + // 64 bits : file offset + // 64 bits : client id + // = 128 bits (16 bytes) + + //if (true ) + //{ + int count = rand()%10; + + for ( int j=0;jget_nodeid(); + p++; + } + + client->write(fd, buf, rdsize, + offset*rdsize); + } + //} + } + + // verify fingerprint + if ( read ) + { + int bad = 0; + __int64_t *p = (__int64_t*)buf; + __int64_t readoff, readclient; + while ((char*)p + 32 < buf + rdsize) { + readoff = *p; + __int64_t wantoff = offset*rdsize + (__int64_t)((char*)p - buf); + p++; + readclient = *p; + p++; + if (readoff != wantoff || + readclient != client->get_nodeid()) { + if (!bad) + dout(0) << "WARNING: wrong data from OSD, block says fileoffset=" << readoff << " client=" << readclient + << ", should be offset " << wantoff << " clietn " << client->get_nodeid() + << endl; + bad++; + } + } + if (bad) + dout(0) << " + " << (bad-1) << " other bad 16-byte bits in this block" << endl; + } + } + + client->close(fd); + delete[] buf; + + return 0; +} int SyntheticClient::random_walk(int num_req) diff --git a/branches/sage/pgs/client/SyntheticClient.h b/branches/sage/pgs/client/SyntheticClient.h index 59300ee893dc1..7646b5e4281c2 100644 --- a/branches/sage/pgs/client/SyntheticClient.h +++ b/branches/sage/pgs/client/SyntheticClient.h @@ -40,6 +40,9 @@ #define SYNCLIENT_MODE_READFILE 21 #define SYNCLIENT_MODE_WRITEBATCH 22 #define SYNCLIENT_MODE_WRSHARED 23 +#define SYNCLIENT_MODE_READSHARED 24 +#define SYNCLIENT_MODE_RDWRRANDOM 25 +#define SYNCLIENT_MODE_RDWRRANDOM_EX 26 #define SYNCLIENT_MODE_TRACE 30 @@ -193,6 +196,8 @@ class SyntheticClient { int write_file(string& fn, int mb, int chunk); int write_batch(int nfile, int mb, int chunk); int read_file(string& fn, int mb, int chunk); + int read_random(string& fn, int mb, int chunk); + int read_random_ex(string& fn, int mb, int chunk); int clean_dir(string& basedir); diff --git a/branches/sage/pgs/config.cc b/branches/sage/pgs/config.cc index 5267b6dd5ddec..9b2118c101a14 100644 --- a/branches/sage/pgs/config.cc +++ b/branches/sage/pgs/config.cc @@ -202,6 +202,11 @@ md_config_t g_conf = { // --- osd --- osd_rep: OSD_REP_PRIMARY, osd_balance_reads: false, + osd_immediate_read_from_cache: false, //osds to read from the cache immediately? + osd_exclusive_caching: false, + osd_load_diff_percent: 20, // load diff for read forwarding + osd_load_balance_scheme: 1, + osd_pg_bits: 0, // 0 == let osdmonitor decide osd_object_layout: OBJECT_LAYOUT_HASHINO, osd_pg_layout: PG_LAYOUT_CRUSH, @@ -714,6 +719,15 @@ void parse_config_options(std::vector& args) else if (strcmp(args[i], "--osd_balance_reads") == 0) g_conf.osd_balance_reads = atoi(args[++i]); + else if (strcmp(args[i], "--osd_load_diff_percent") == 0) + g_conf.osd_load_diff_percent = atoi(args[++i]); + else if (strcmp(args[i], "--osd_load_balance_scheme") == 0) + g_conf.osd_load_balance_scheme = atoi(args[++i]); + else if ( strcmp(args[i],"--osd_immediate_read_from_cache" ) == 0) + g_conf.osd_immediate_read_from_cache = atoi(args[++i]); + else if ( strcmp(args[i],"--osd_exclusive_caching" ) == 0) + g_conf.osd_exclusive_caching = atoi(args[++i]); + else if (strcmp(args[i], "--osd_rep") == 0) g_conf.osd_rep = atoi(args[++i]); else if (strcmp(args[i], "--osd_rep_chain") == 0) diff --git a/branches/sage/pgs/config.h b/branches/sage/pgs/config.h index 3e6550979cc49..6e64f8c3f57d8 100644 --- a/branches/sage/pgs/config.h +++ b/branches/sage/pgs/config.h @@ -203,6 +203,10 @@ struct md_config_t { // osd int osd_rep; bool osd_balance_reads; + bool osd_immediate_read_from_cache; + bool osd_exclusive_caching; + int osd_load_diff_percent; + int osd_load_balance_scheme; int osd_pg_bits; int osd_object_layout; int osd_pg_layout; diff --git a/branches/sage/pgs/ebofs/Ebofs.cc b/branches/sage/pgs/ebofs/Ebofs.cc index 2008d1961bfae..c4070d71d76cd 100644 --- a/branches/sage/pgs/ebofs/Ebofs.cc +++ b/branches/sage/pgs/ebofs/Ebofs.cc @@ -1764,7 +1764,7 @@ bool Ebofs::attempt_read(Onode *on, off_t off, size_t len, bufferlist& bl, if (!rx.empty()) { BufferHead *wait_on = rx.begin()->second; Context *c = new C_Cond(will_wait_on, will_wait_on_bool); - dout(1) << "attempt_read waiting for read to finish on " << *wait_on << " c " << c << endl; + dout(20) << "attempt_read waiting for read to finish on " << *wait_on << " c " << c << endl; block_t b = MAX(wait_on->start(), bstart); wait_on->waitfor_read[b].push_back(c); return false; diff --git a/branches/sage/pgs/include/utime.h b/branches/sage/pgs/include/utime.h index 88083b13dbf90..8bbd844c95eaa 100644 --- a/branches/sage/pgs/include/utime.h +++ b/branches/sage/pgs/include/utime.h @@ -15,7 +15,7 @@ #define __UTIME_H #include - +#include // -------- // utime_t diff --git a/branches/sage/pgs/mds/mdstypes.h b/branches/sage/pgs/mds/mdstypes.h index 41b7f69e2e51b..26a2dff75e6bd 100644 --- a/branches/sage/pgs/mds/mdstypes.h +++ b/branches/sage/pgs/mds/mdstypes.h @@ -494,7 +494,7 @@ protected: // --------------------------------------------- // locking // noop unless overloaded. - virtual SimpleLock* get_lock(int type) { assert(0); } + virtual SimpleLock* get_lock(int type) { assert(0); return 0; } virtual void set_mlock_info(MLock *m) { assert(0); } virtual void encode_lock_state(int type, bufferlist& bl) { assert(0); } virtual void decode_lock_state(int type, bufferlist& bl) { assert(0); } diff --git a/branches/sage/pgs/messages/MOSDOp.h b/branches/sage/pgs/messages/MOSDOp.h index 033c0ce0bc1a1..50896df1fb1bc 100644 --- a/branches/sage/pgs/messages/MOSDOp.h +++ b/branches/sage/pgs/messages/MOSDOp.h @@ -104,6 +104,8 @@ private: bufferlist data; map attrset; + double request_received_time; + friend class MOSDOpReply; @@ -149,6 +151,13 @@ private: const bool wants_ack() { return st.want_ack; } const bool wants_commit() { return st.want_commit; } + void set_received_time(double time) { + request_received_time = time; + } + double get_received_time() { + return request_received_time; + } + void set_data(bufferlist &d) { data.claim(d); diff --git a/branches/sage/pgs/messages/MOSDPing.h b/branches/sage/pgs/messages/MOSDPing.h index fae80edd91cfc..dd03aa4954415 100644 --- a/branches/sage/pgs/messages/MOSDPing.h +++ b/branches/sage/pgs/messages/MOSDPing.h @@ -14,6 +14,8 @@ #ifndef __MOSDPING_H #define __MOSDPING_H +#include "common/Clock.h" + #include "msg/Message.h" @@ -22,10 +24,12 @@ class MOSDPing : public Message { epoch_t map_epoch; bool ack; float avg_qlen; + double read_mean_time; MOSDPing(epoch_t e, float aq, - bool a=false) : Message(MSG_OSD_PING), map_epoch(e), ack(a), avg_qlen(aq) { + double _read_mean_time, + bool a=false) : Message(MSG_OSD_PING), map_epoch(e), ack(a), avg_qlen(aq), read_mean_time(_read_mean_time) { } MOSDPing() {} @@ -37,11 +41,14 @@ class MOSDPing : public Message { off += sizeof(ack); payload.copy(off, sizeof(avg_qlen), (char*)&avg_qlen); off += sizeof(avg_qlen); + payload.copy(off, sizeof(read_mean_time), (char*)&read_mean_time); + off += sizeof(read_mean_time); } virtual void encode_payload() { payload.append((char*)&map_epoch, sizeof(map_epoch)); payload.append((char*)&ack, sizeof(ack)); payload.append((char*)&avg_qlen, sizeof(avg_qlen)); + payload.append((char*)&read_mean_time, sizeof(read_mean_time)); } virtual char *get_type_name() { return "oping"; } diff --git a/branches/sage/pgs/osd/OSD.cc b/branches/sage/pgs/osd/OSD.cc index edc25731410c9..d943ee7f9ebbb 100644 --- a/branches/sage/pgs/osd/OSD.cc +++ b/branches/sage/pgs/osd/OSD.cc @@ -77,6 +77,9 @@ char *osd_base_path = "./osddata"; char *ebofs_base_path = "./dev"; +const int LOAD_LATENCY =1; +const int LOAD_QUEUE_SIZE=2; +const int LOAD_HYBRID =3; object_t SUPERBLOCK_OBJECT(0,0); @@ -109,7 +112,9 @@ void OSD::force_remount() LogType osd_logtype; -OSD::OSD(int id, Messenger *m, MonMap *mm, char *dev) : timer(osd_lock) +OSD::OSD(int id, Messenger *m, MonMap *mm, char *dev) : + timer(osd_lock), + load_calc(g_conf.osd_max_opq<1?1:g_conf.osd_max_opq) { whoami = id; messenger = m; @@ -642,9 +647,12 @@ void OSD::heartbeat() float avg_qlen = 0; if (hb_stat_ops) avg_qlen = (float)hb_stat_qlen / (float)hb_stat_ops; + double read_mean_time = load_calc.get_average(); + dout(5) << "heartbeat " << now << ": ops " << hb_stat_ops << ", avg qlen " << avg_qlen + << ", mean read time " << read_mean_time << dendl; // reset until next time around @@ -671,8 +679,10 @@ void OSD::heartbeat() i != pingset.end(); i++) { _share_map_outgoing( osdmap->get_inst(*i) ); - messenger->send_message(new MOSDPing(osdmap->get_epoch(), avg_qlen), - osdmap->get_inst(*i)); + messenger->send_message(new MOSDPing(osdmap->get_epoch(), + avg_qlen, + read_mean_time ), + osdmap->get_inst(*i)); } if (logger) logger->set("pingset", pingset.size()); @@ -910,10 +920,12 @@ void OSD::ms_handle_failure(Message *m, const entity_inst_t& inst) void OSD::handle_osd_ping(MOSDPing *m) { dout(20) << "osdping from " << m->get_source() << dendl; + _share_map_incoming(m->get_source_inst(), ((MOSDPing*)m)->map_epoch); int from = m->get_source().num(); peer_qlen[from] = m->avg_qlen; + peer_read_time[from] = m->read_mean_time; //if (!m->ack) //messenger->send_message(new MOSDPing(osdmap->get_epoch(), true), @@ -1937,9 +1949,12 @@ void OSD::handle_op(MOSDOp *op) const pg_t pgid = op->get_pg(); PG *pg = _have_pg(pgid) ? _lock_pg(pgid):0; - logger->set("buf", buffer_total_alloc); + // mark the read request received time for finding the + // read througput load. + op->set_received_time(g_clock.now()); + // update qlen stats hb_stat_ops++; hb_stat_qlen += pending_ops; @@ -1947,7 +1962,7 @@ void OSD::handle_op(MOSDOp *op) // require same or newer map if (!require_same_or_newer_map(op, op->get_map_epoch())) { - _unlock_pg(pgid); + if (pg) _unlock_pg(pgid); return; } @@ -1970,7 +1985,6 @@ void OSD::handle_op(MOSDOp *op) << pgid << ", waiting" << dendl; waiting_for_pg[pgid].push_back(op); - _unlock_pg(pgid); return; } @@ -2056,51 +2070,152 @@ void OSD::handle_op(MOSDOp *op) } */ + + dout(10) << "handle_op " << *op << " in " << *pg << endl; + + // if this is a read and the data is in the cache ,do an immediate read.. + if ( read && g_conf.osd_immediate_read_from_cache ) { + dout(10) << "trying to see whether data is in cache" << *op << endl; + if ( store->is_cached( op->get_oid() , + op->get_offset(), + op->get_length() ) == 0 ) + { + dout(10) << "data is in cache, reading from cache" << *op << endl; + pg->do_op(op); // do it now + _unlock_pg(pgid); + return; + } + } + dout(7) << "handle_op " << *op << " in " << *pg << dendl; - // balance reads? if (read && g_conf.osd_balance_reads && - pg->get_acker() == whoami) { - // test - if (false) { - if (pg->acting.size() > 1) { - int peer = pg->acting[1]; - dout(-10) << "fwd client read op to osd" << peer << " for " << op->get_client() << " " << op->get_client_inst() << dendl; - messenger->send_message(op, osdmap->get_inst(peer)); - _unlock_pg(pgid); - return; + pg->get_acker() == whoami) + { + // test + if (false) { + if (pg->acting.size() > 1) { + int peer = pg->acting[1]; + dout(-10) << "fwd client read op to osd" << peer << " for " << op->get_client() << " " << op->get_client_inst() << dendl; + messenger->send_message(op, osdmap->get_inst(peer)); + _unlock_pg(pgid); + return; + } } - } - - // am i above my average? - float my_avg = hb_stat_qlen / hb_stat_ops; - if (pending_ops > my_avg) { - // is there a peer who is below my average? - for (unsigned i=1; iacting.size(); ++i) { - int peer = pg->acting[i]; - if (peer_qlen.count(peer) && - peer_qlen[peer] < my_avg) { - // calculate a probability that we should redirect - float p = (my_avg - peer_qlen[peer]) / my_avg; // this is dumb. + + + // check my load. + // TODO xxx we must also compare with our own load + // if i am x percentage higher than replica , + // redirect the read + + //if ( g_conf.osd_load_balance_scheme == LOAD_LATENCY) + if ( g_conf.osd_balance_reads == LOAD_LATENCY) + { + + double mean_read_time = load_calc.get_average(); - if (drand48() <= p) { - // take the first one - dout(-10) << "my qlen " << pending_ops << " > my_avg " << my_avg - << ", p=" << p - << ", fwd to peer w/ qlen " << peer_qlen[peer] - << " osd" << peer - << dendl; - messenger->send_message(op, osdmap->get_inst(peer)); - _unlock_pg(pgid); - return; + if ( mean_read_time != -1 ) + { + + for (unsigned i=1; + iacting.size(); + ++i) + { + int peer = pg->acting[i]; + + dout(10) << "my read time " << mean_read_time + << "peer_readtime" << peer_read_time[peer] + << " of peer" << peer << endl; + + if ( peer_read_time.count(peer) && + ( (peer_read_time[peer]*100/mean_read_time) < + ( 100 - g_conf.osd_load_diff_percent))) + { + dout(10) << " forwarding to peer osd" << peer << endl; + + messenger->send_message(op, osdmap->get_inst(peer)); + _unlock_pg(pgid); + return; + } + } + } + } + //else if ( g_conf.osd_load_balance_scheme == LOAD_QUEUE_SIZE ) + else if ( g_conf.osd_balance_reads == LOAD_QUEUE_SIZE ) + { + + + // am i above my average? + float my_avg = hb_stat_qlen / hb_stat_ops; + + if (pending_ops > my_avg) { + // is there a peer who is below my average? + for (unsigned i=1; iacting.size(); ++i) { + int peer = pg->acting[i]; + if (peer_qlen.count(peer) && + peer_qlen[peer] < my_avg) { + // calculate a probability that we should redirect + float p = (my_avg - peer_qlen[peer]) / my_avg; // this is dumb. + + if (drand48() <= p) { + // take the first one + dout(10) << "my qlen " << pending_ops << " > my_avg " << my_avg + << ", p=" << p + << ", fwd to peer w/ qlen " << peer_qlen[peer] + << " osd" << peer + << dendl; + messenger->send_message(op, osdmap->get_inst(peer)); + _unlock_pg(pgid); + return; + } + } + } } + } - } + //else if ( g_conf.osd_load_balance_scheme == LOAD_HYBRID ) + else if ( g_conf.osd_balance_reads == LOAD_HYBRID ) + { + + // am i above my average? + float my_avg = hb_stat_qlen / hb_stat_ops; + + if (pending_ops > my_avg) { + // is there a peer who is below my average? + for (unsigned i=1; iacting.size(); ++i) { + int peer = pg->acting[i]; + if (peer_qlen.count(peer) && + peer_qlen[peer] < my_avg) { + // calculate a probability that we should redirect + //float p = (my_avg - peer_qlen[peer]) / my_avg; // this is dumb. + + double mean_read_time = load_calc.get_average(); + + if ( mean_read_time != -1 && + peer_read_time.count(peer) && + ( (peer_read_time[peer]*100/mean_read_time) < + ( 100 - g_conf.osd_load_diff_percent) ) ) + //if (drand48() <= p) { + // take the first one + dout(10) << "using hybrid :my qlen " << pending_ops << " > my_avg " << my_avg + << "my read time "<< mean_read_time + << "peer read time " << peer_read_time[peer] + << ", fwd to peer w/ qlen " << peer_qlen[peer] + << " osd" << peer + << endl; + messenger->send_message(op, osdmap->get_inst(peer)); + _unlock_pg(pgid); + return; + //} + } + } + } + } + } - } - } else { // REPLICATION OP (it's from another OSD) @@ -2125,7 +2240,24 @@ void OSD::handle_op(MOSDOp *op) assert(pg->get_role() >= 0); dout(7) << "handle_rep_op " << op << " in " << *pg << dendl; + + // a redirected read...handle this differently .. + // if the data is in cache ( a rare case? ), return the data immediately + if ( read && g_conf.osd_immediate_read_from_cache ) + { + dout(10) << "redirected read, trying to see whether data is in cache " << *op << endl; + if ( store->is_cached( op->get_oid() , + op->get_offset(), + op->get_length()) == 0 ) + { + dout(10) << "redirected read, data is in cache, reading from cache " << *op << endl; + pg->do_op(op); // do it now + _unlock_pg(pgid); + return; + } + } } + if (g_conf.osd_maxthreads < 1) { @@ -2139,15 +2271,11 @@ void OSD::handle_op(MOSDOp *op) _unlock_pg(pgid); } else { _unlock_pg(pgid); - // queue for worker threads - /*if (read) - enqueue_op(0, op); // no locking needed for reads - else - */ - enqueue_op(pgid, op); + enqueue_op(pgid, op); // queue for worker threads } } + void OSD::handle_op_reply(MOSDOpReply *op) { if (op->get_map_epoch() < boot_epoch) { diff --git a/branches/sage/pgs/osd/OSD.h b/branches/sage/pgs/osd/OSD.h index f1d560d368e48..3f0277ee089bd 100644 --- a/branches/sage/pgs/osd/OSD.h +++ b/branches/sage/pgs/osd/OSD.h @@ -48,6 +48,37 @@ public: static const int STATE_STOPPING = 3; + // load calculation + //current implementation is moving averges. + class LoadCalculator { + private: + deque m_Data ; + unsigned m_Size ; + double m_Total ; + + public: + LoadCalculator( unsigned size ) : m_Size(0), m_Total(0) { } + + void add( double element ) { + // add item + m_Data.push_back(element); + m_Total += element; + + // trim + while (m_Data.size() > m_Size) { + m_Total -= m_Data.front(); + m_Data.pop_front(); + } + } + + double get_average() { + if (m_Data.empty()) + return -1; + return m_Total / (double)m_Data.size(); + } + }; + + /** OSD **/ protected: Mutex osd_lock; // global lock @@ -57,6 +88,7 @@ protected: Logger *logger; ObjectStore *store; MonMap *monmap; + LoadCalculator load_calc; int whoami; char dev_path[100]; @@ -105,6 +137,7 @@ private: int hb_stat_qlen; // cumulative queue length since last hb hash_map peer_qlen; + hash_map peer_read_time; // -- waiters -- diff --git a/branches/sage/pgs/osd/ObjectStore.h b/branches/sage/pgs/osd/ObjectStore.h index 9ff94adfcae99..89e672a10a10a 100644 --- a/branches/sage/pgs/osd/ObjectStore.h +++ b/branches/sage/pgs/osd/ObjectStore.h @@ -449,6 +449,9 @@ public: Context *onsafe) = 0;//{ return -1; } virtual void trim_from_cache(object_t oid, off_t offset, size_t len) { } + virtual int is_cached(object_t oid, + off_t offset, + size_t len) { return -1; } virtual int setattr(object_t oid, const char *name, const void *value, size_t size, diff --git a/branches/sage/pgs/osd/ReplicatedPG.cc b/branches/sage/pgs/osd/ReplicatedPG.cc index b8de7e20afd74..61900fe30d4d0 100644 --- a/branches/sage/pgs/osd/ReplicatedPG.cc +++ b/branches/sage/pgs/osd/ReplicatedPG.cc @@ -210,6 +210,12 @@ int ReplicatedPG::op_read(MOSDOp *op) reply->set_result(0); reply->set_data(bl); reply->set_length(r); + + dout(10) << "READ TIME DIFF" + << (double)g_clock.now()-op->get_received_time() + << endl; + osd->load_calc.add((double)g_clock.now() - op->get_received_time()); + } else { reply->set_result(r); // error reply->set_length(0); @@ -891,6 +897,7 @@ void ReplicatedPG::op_modify(MOSDOp *op) put_rep_gather(repop); } else { + // not acker. // chain or splay. apply. ObjectStore::Transaction t; prepare_log_transaction(t, op, nv, crev, op->get_rev(), peers_complete_thru); @@ -905,6 +912,10 @@ void ReplicatedPG::op_modify(MOSDOp *op) assert(r == 0); } + // lets evict the data from our cache to maintain a total large cache size + if (g_conf.osd_exclusive_caching) + osd->store->trim_from_cache(op->get_oid() , op->get_offset(), op->get_length()); + oncommit->ack(); } -- 2.39.5