]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
* merged suresh's read balancing changes
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 22 May 2007 17:55:16 +0000 (17:55 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 22 May 2007 17:55:16 +0000 (17:55 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1363 29311d96-e01e-0410-9327-a35deaab8ce9

15 files changed:
branches/sage/pgs/Makefile
branches/sage/pgs/client/Client.cc
branches/sage/pgs/client/SyntheticClient.cc
branches/sage/pgs/client/SyntheticClient.h
branches/sage/pgs/config.cc
branches/sage/pgs/config.h
branches/sage/pgs/ebofs/Ebofs.cc
branches/sage/pgs/include/utime.h
branches/sage/pgs/mds/mdstypes.h
branches/sage/pgs/messages/MOSDOp.h
branches/sage/pgs/messages/MOSDPing.h
branches/sage/pgs/osd/OSD.cc
branches/sage/pgs/osd/OSD.h
branches/sage/pgs/osd/ObjectStore.h
branches/sage/pgs/osd/ReplicatedPG.cc

index 07657214a59847c27f4847edbf32718587994098..97246c8d618d991f0521658ee4b0b350ed705743 100644 (file)
@@ -12,7 +12,7 @@ CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -D_REENTRANT -D_THREAD_SAFE -DDARWI
 LDINC = ar -rc
 else
 # For linux
-CFLAGS = -g -fPIC -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE 
+CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE 
 LDINC = ld -i -o
 endif
 
index fc1546ff9ca0779711331d0704ddb3498e859c74..a6621b7731c6387127fe5355032e682de6450f1c 100644 (file)
@@ -2526,7 +2526,7 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset)
   if (!lazy && (in->file_caps() & (CAP_FILE_WRBUFFER|CAP_FILE_RDCACHE))) {
     // we're doing buffered i/o.  make sure we're inside the file.
     // we can trust size info bc we get accurate info when buffering/caching caps are issued.
-    dout(-10) << "file size: " << in->inode.size << endl;
+    dout(10) << "file size: " << in->inode.size << endl;
     if (offset > 0 && offset >= in->inode.size) {
       client_lock.Unlock();
       return 0;
index d6adf65cbdcf7bbab63a9efc55bbd95fa14c6cc2..7e7f95f71b1f39f1997e44797cf8cec1b1ac05da 100644 (file)
@@ -68,6 +68,18 @@ void parse_syn_options(vector<char*>& args)
         syn_modes.push_back( SYNCLIENT_MODE_READFILE );
         syn_iargs.push_back( atoi(args[++i]) );
         syn_iargs.push_back( atoi(args[++i]) );
+      } else if (strcmp(args[i],"readwriterandom") == 0) {
+        syn_modes.push_back( SYNCLIENT_MODE_RDWRRANDOM );
+        syn_iargs.push_back( atoi(args[++i]) );
+        syn_iargs.push_back( atoi(args[++i]) );
+      } else if (strcmp(args[i],"readwriterandom_ex") == 0) {
+        syn_modes.push_back( SYNCLIENT_MODE_RDWRRANDOM_EX );
+        syn_iargs.push_back( atoi(args[++i]) );
+        syn_iargs.push_back( atoi(args[++i]) );
+      } else if (strcmp(args[i],"readshared") == 0) {
+        syn_modes.push_back( SYNCLIENT_MODE_READSHARED );
+        syn_iargs.push_back( atoi(args[++i]) );
+        syn_iargs.push_back( atoi(args[++i]) );
       } else if (strcmp(args[i],"rw") == 0) {
         int a = atoi(args[++i]);
         int b = atoi(args[++i]);
@@ -439,6 +451,7 @@ int SyntheticClient::run()
         string sarg1 = get_sarg(0);
         int iarg1 = iargs.front();  iargs.pop_front();
         int iarg2 = iargs.front();  iargs.pop_front();
+        cout << "WRITING SYN CLIENT" << endl;
         if (run_me())
           write_file(sarg1, iarg1, iarg2);
       }
@@ -452,6 +465,15 @@ int SyntheticClient::run()
           write_file(sarg1, iarg1, iarg2);
       }
       break;
+    case SYNCLIENT_MODE_READSHARED:
+      {
+        string sarg1 = "shared";
+        int iarg1 = iargs.front();  iargs.pop_front();
+        int iarg2 = iargs.front();  iargs.pop_front();
+        if (run_me())
+          read_file(sarg1, iarg1, iarg2);
+      }
+      break;
     case SYNCLIENT_MODE_WRITEBATCH:
       {
           int iarg1 = iargs.front(); iargs.pop_front();
@@ -468,11 +490,36 @@ int SyntheticClient::run()
         string sarg1 = get_sarg(0);
         int iarg1 = iargs.front();  iargs.pop_front();
         int iarg2 = iargs.front();  iargs.pop_front();
+
+        cout << "READING SYN CLIENT" << endl;
         if (run_me())
           read_file(sarg1, iarg1, iarg2);
       }
       break;
 
+    case SYNCLIENT_MODE_RDWRRANDOM:
+      {
+        string sarg1 = get_sarg(0);
+        int iarg1 = iargs.front();  iargs.pop_front();
+        int iarg2 = iargs.front();  iargs.pop_front();
+
+        cout << "RANDOM READ WRITE SYN CLIENT" << endl;
+        if (run_me())
+          read_random(sarg1, iarg1, iarg2);
+      }
+      break;
+
+    case SYNCLIENT_MODE_RDWRRANDOM_EX:
+      {
+        string sarg1 = get_sarg(0);
+        int iarg1 = iargs.front();  iargs.pop_front();
+        int iarg2 = iargs.front();  iargs.pop_front();
+
+        cout << "RANDOM READ WRITE SYN CLIENT" << endl;
+        if (run_me())
+          read_random_ex(sarg1, iarg1, iarg2);
+      }
+      break;
     case SYNCLIENT_MODE_TRACE:
       {
         string tfile = get_sarg(0);
@@ -1098,7 +1145,7 @@ int SyntheticClient::read_file(string& fn, int size, int rdsize)   // size is in
       dout(1) << "read_file got r = " << r << ", probably end of file" << endl;
       break;
     }
-
     // verify fingerprint
     int bad = 0;
     __int64_t *p = (__int64_t*)buf;
@@ -1128,6 +1175,302 @@ int SyntheticClient::read_file(string& fn, int size, int rdsize)   // size is in
   return 0;
 }
 
+int SyntheticClient::read_random(string& fn, int size, int rdsize)   // size is in MB, wrsize in bytes
+{
+  __uint64_t chunks = (__uint64_t)size * (__uint64_t)(1024*1024) / (__uint64_t)rdsize;
+
+  int fd = client->open(fn.c_str(), O_RDWR);
+  dout(5) << "reading from " << fn << " fd " << fd << endl;
+   
+ // cout << "READING FROM  " << fn << " fd " << fd << endl;
+
+ // cout << "filename " << fn << " size:" << size  << " read size|" << rdsize << "|" <<  "\ chunks: |" << chunks <<"|" <<  endl;
+
+  if (fd < 0) return fd;
+  int offset;
+  char * buf = NULL;
+
+  for (unsigned i=0; i<2000; i++) {
+    if (time_to_stop()) break;
+
+    bool read=false;
+
+    time_t seconds;
+    time( &seconds);
+    srand(seconds);
+
+    // use rand instead ??
+    double x = drand48();
+
+    //cout << "RANDOM NUMBER RETURN |" << x << "|" << endl;
+
+    if ( x < 0.5) 
+    {
+        //cout << "DECIDED TO READ " << x << endl;
+        buf = new char[rdsize]; 
+        memset(buf, 1, rdsize);
+        read=true;
+    }
+    else
+    {
+       // cout << "DECIDED TO WRITE " << x << endl;
+        buf = new char[rdsize+100];   // 1 MB
+        memset(buf, 7, rdsize);
+    }
+
+    //double  y  = drand48() ;
+
+    //cout << "OFFSET is |" << offset << "| chunks |" << chunks<<  endl;
+    
+    if ( read)
+    {
+        offset=(rand())%(chunks+1);
+        dout(2) << "reading block " << offset << "/" << chunks << endl;
+
+        int r = client->read(fd, buf, rdsize,
+                        offset*rdsize);
+        if (r < rdsize) {
+                  dout(1) << "read_file got r = " << r << ", probably end of file" << endl;
+    }
+    }
+    else
+    {
+        dout(2) << "writing block " << offset << "/" << chunks << endl;
+
+    // fill buf with a 16 byte fingerprint
+    // 64 bits : file offset
+    // 64 bits : client id
+    // = 128 bits (16 bytes)
+
+      //if (true )
+      //{
+      //int count = rand()%10;
+
+      //for ( int j=0;j<count; j++ )
+      //{
+
+      offset=(rand())%(chunks+1);
+    __uint64_t *p = (__uint64_t*)buf;
+    while ((char*)p < buf + rdsize) {
+      *p = offset*rdsize + (char*)p - buf;      
+      p++;
+      *p = client->get_nodeid();
+      p++;
+    }
+
+      client->write(fd, buf, rdsize,
+                        offset*rdsize);
+      //}
+      //}
+    }
+
+    // verify fingerprint
+    if ( read )
+    {
+    int bad = 0;
+    __int64_t *p = (__int64_t*)buf;
+    __int64_t readoff, readclient;
+    while ((char*)p + 32 < buf + rdsize) {
+      readoff = *p;
+      __int64_t wantoff = offset*rdsize + (__int64_t)((char*)p - buf);
+      p++;
+      readclient = *p;
+      p++;
+      if (readoff != wantoff ||
+         readclient != client->get_nodeid()) {
+        if (!bad)
+          dout(0) << "WARNING: wrong data from OSD, block says fileoffset=" << readoff << " client=" << readclient
+                 << ", should be offset " << wantoff << " clietn " << client->get_nodeid()
+                 << endl;
+        bad++;
+      }
+    }
+    if (bad) 
+      dout(0) << " + " << (bad-1) << " other bad 16-byte bits in this block" << endl;
+  }
+  }
+  
+  client->close(fd);
+  delete[] buf;
+
+  return 0;
+}
+
+
+//#include<stdio.h>
+//#include<stdlib.h>
+
+int normdist(int min, int max, int stdev) /* specifies input values */;
+//main()
+//{
+ // for ( int i=0; i < 10; i++ )
+ //  normdist ( 0 , 10, 1 );
+   
+//}
+
+
+int normdist(int min, int max, int stdev) /* specifies input values */
+{
+/* min: Minimum value; max: Maximum value; stdev: degree of deviation */
+//int min, max, stdev; {
+    time_t seconds;
+    time( &seconds);
+    srand(seconds);
+    int range, iterate, result;
+/* declare range, iterate and result as integers, to avoid the need for
+floating point math*/
+    result = 0;
+/* ensure result is initialized to 0 */
+    range = max -min;
+/* calculate range of possible values between the max and min values */
+    iterate = range / stdev;
+/* this number of iterations ensures the proper shape of the resulting
+curve */
+    stdev += 1; /* compensation for integer vs. floating point math */
+    for (int c = iterate; c != 0; c--) /* loop through iterations */
+    {
+      //  result += (uniform (1, 100) * stdev) / 100; /* calculate and
+        result += ( (rand()%100 + 1)  * stdev) / 100;
+       // printf("result=%d\n", result );
+    }
+        printf("\n final result=%d\n", result );
+    return result + min; /* send final result back */
+}
+int SyntheticClient::read_random_ex(string& fn, int size, int rdsize)   // size is in MB, wrsize in bytes
+{
+  __uint64_t chunks = (__uint64_t)size * (__uint64_t)(1024*1024) / (__uint64_t)rdsize;
+
+  int fd = client->open(fn.c_str(), O_RDWR);
+  dout(5) << "reading from " << fn << " fd " << fd << endl;
+   
+ // cout << "READING FROM  " << fn << " fd " << fd << endl;
+
+ // cout << "filename " << fn << " size:" << size  << " read size|" << rdsize << "|" <<  "\ chunks: |" << chunks <<"|" <<  endl;
+
+  if (fd < 0) return fd;
+  int offset;
+  char * buf = NULL;
+
+  for (unsigned i=0; i<2000; i++) {
+    if (time_to_stop()) break;
+
+    bool read=false;
+
+    time_t seconds;
+    time( &seconds);
+    srand(seconds);
+
+    // use rand instead ??
+    double x = drand48();
+
+    //cout << "RANDOM NUMBER RETURN |" << x << "|" << endl;
+
+    if ( x < 0.5) 
+    {
+        //cout << "DECIDED TO READ " << x << endl;
+        buf = new char[rdsize]; 
+        memset(buf, 1, rdsize);
+        read=true;
+    }
+    else
+    {
+       // cout << "DECIDED TO WRITE " << x << endl;
+        buf = new char[rdsize+100];   // 1 MB
+        memset(buf, 7, rdsize);
+    }
+
+    //double  y  = drand48() ;
+
+    //cout << "OFFSET is |" << offset << "| chunks |" << chunks<<  endl;
+    
+    if ( read)
+    {
+        //offset=(rand())%(chunks+1);
+
+    /*    if ( chunks > 10000 ) 
+        offset= normdist( 0 , chunks/1000 , 5  )*1000;
+        else if ( chunks > 1000 )
+                offset= normdist( 0 , chunks/100 , 5  )*100;
+        else if ( chunks > 100 )
+                offset= normdist( 0 , chunks/20 , 5  )*20;*/
+
+
+        dout(2) << "reading block " << offset << "/" << chunks << endl;
+
+        int r = client->read(fd, buf, rdsize,
+                        offset*rdsize);
+        if (r < rdsize) {
+                  dout(1) << "read_file got r = " << r << ", probably end of file" << endl;
+    }
+    }
+    else
+    {
+        dout(2) << "writing block " << offset << "/" << chunks << endl;
+
+    // fill buf with a 16 byte fingerprint
+    // 64 bits : file offset
+    // 64 bits : client id
+    // = 128 bits (16 bytes)
+
+      //if (true )
+      //{
+      int count = rand()%10;
+
+      for ( int j=0;j<count; j++ )
+      {
+
+      offset=(rand())%(chunks+1);
+    __uint64_t *p = (__uint64_t*)buf;
+    while ((char*)p < buf + rdsize) {
+      *p = offset*rdsize + (char*)p - buf;      
+      p++;
+      *p = client->get_nodeid();
+      p++;
+    }
+
+      client->write(fd, buf, rdsize,
+                        offset*rdsize);
+      }
+      //}
+    }
+
+    // verify fingerprint
+    if ( read )
+    {
+    int bad = 0;
+    __int64_t *p = (__int64_t*)buf;
+    __int64_t readoff, readclient;
+    while ((char*)p + 32 < buf + rdsize) {
+      readoff = *p;
+      __int64_t wantoff = offset*rdsize + (__int64_t)((char*)p - buf);
+      p++;
+      readclient = *p;
+      p++;
+      if (readoff != wantoff ||
+         readclient != client->get_nodeid()) {
+        if (!bad)
+          dout(0) << "WARNING: wrong data from OSD, block says fileoffset=" << readoff << " client=" << readclient
+                 << ", should be offset " << wantoff << " clietn " << client->get_nodeid()
+                 << endl;
+        bad++;
+      }
+    }
+    if (bad) 
+      dout(0) << " + " << (bad-1) << " other bad 16-byte bits in this block" << endl;
+  }
+  }
+  
+  client->close(fd);
+  delete[] buf;
+
+  return 0;
+}
 
 
 int SyntheticClient::random_walk(int num_req)
index 59300ee893dc16d750fbb456fe36ab49f517c568..7646b5e4281c2e4164a047331bc00765b63cc4b4 100644 (file)
@@ -40,6 +40,9 @@
 #define SYNCLIENT_MODE_READFILE    21
 #define SYNCLIENT_MODE_WRITEBATCH  22
 #define SYNCLIENT_MODE_WRSHARED    23
+#define SYNCLIENT_MODE_READSHARED    24
+#define SYNCLIENT_MODE_RDWRRANDOM    25
+#define SYNCLIENT_MODE_RDWRRANDOM_EX    26
 
 #define SYNCLIENT_MODE_TRACE       30
 
@@ -193,6 +196,8 @@ class SyntheticClient {
   int write_file(string& fn, int mb, int chunk);
   int write_batch(int nfile, int mb, int chunk);
   int read_file(string& fn, int mb, int chunk);
+  int read_random(string& fn, int mb, int chunk);
+  int read_random_ex(string& fn, int mb, int chunk);
 
   int clean_dir(string& basedir);
 
index 5267b6dd5ddec5e9c276e9d7afb509609203a0c3..9b2118c101a14b0fd51323ed6a563a83a8bb67ac 100644 (file)
@@ -202,6 +202,11 @@ md_config_t g_conf = {
   // --- osd ---
   osd_rep: OSD_REP_PRIMARY,
   osd_balance_reads: false,
+  osd_immediate_read_from_cache: false,     //osds to read from the cache immediately?
+  osd_exclusive_caching: false,
+  osd_load_diff_percent: 20, // load diff for read forwarding
+  osd_load_balance_scheme: 1,
+
   osd_pg_bits: 0,  // 0 == let osdmonitor decide
   osd_object_layout: OBJECT_LAYOUT_HASHINO,
   osd_pg_layout: PG_LAYOUT_CRUSH,
@@ -714,6 +719,15 @@ void parse_config_options(std::vector<char*>& args)
 
     else if (strcmp(args[i], "--osd_balance_reads") == 0) 
       g_conf.osd_balance_reads = atoi(args[++i]);
+    else if (strcmp(args[i], "--osd_load_diff_percent") == 0) 
+      g_conf.osd_load_diff_percent = atoi(args[++i]);
+    else if (strcmp(args[i], "--osd_load_balance_scheme") == 0) 
+      g_conf.osd_load_balance_scheme = atoi(args[++i]);
+    else if ( strcmp(args[i],"--osd_immediate_read_from_cache" ) == 0)
+      g_conf.osd_immediate_read_from_cache = atoi(args[++i]);
+    else if ( strcmp(args[i],"--osd_exclusive_caching" ) == 0)
+      g_conf.osd_exclusive_caching = atoi(args[++i]);
+
     else if (strcmp(args[i], "--osd_rep") == 0) 
       g_conf.osd_rep = atoi(args[++i]);
     else if (strcmp(args[i], "--osd_rep_chain") == 0) 
index 3e6550979cc491505b91231a62fe5d089720929f..6e64f8c3f57d89c94aad67ac2050ea90918c0f0c 100644 (file)
@@ -203,6 +203,10 @@ struct md_config_t {
   // osd
   int   osd_rep;
   bool  osd_balance_reads;
+  bool  osd_immediate_read_from_cache;
+  bool  osd_exclusive_caching;
+  int  osd_load_diff_percent;
+  int osd_load_balance_scheme;
   int   osd_pg_bits;
   int   osd_object_layout;
   int   osd_pg_layout;
index 2008d1961bfaed5cceb6b5335d3b91f3f5d07764..c4070d71d76cdd1ffd51dc85eafdfde599208335 100644 (file)
@@ -1764,7 +1764,7 @@ bool Ebofs::attempt_read(Onode *on, off_t off, size_t len, bufferlist& bl,
   if (!rx.empty()) {
     BufferHead *wait_on = rx.begin()->second;
     Context *c = new C_Cond(will_wait_on, will_wait_on_bool);
-    dout(1) << "attempt_read waiting for read to finish on " << *wait_on << " c " << c << endl;
+    dout(20) << "attempt_read waiting for read to finish on " << *wait_on << " c " << c << endl;
     block_t b = MAX(wait_on->start(), bstart);
     wait_on->waitfor_read[b].push_back(c);
     return false;
index 88083b13dbf90c12e57cabf8ca516a84b93e353a..8bbd844c95eaac8f68b7e0b4388525aee8965581 100644 (file)
@@ -15,7 +15,7 @@
 #define __UTIME_H
 
 #include <math.h>
-
+#include <sys/time.h>
 
 // --------
 // utime_t
index 41b7f69e2e51bf641754c7517ac7acc485b2cd50..26a2dff75e6bd806c92e0cb4fbbc261300052ddd 100644 (file)
@@ -494,7 +494,7 @@ protected:
   // ---------------------------------------------
   // locking
   // noop unless overloaded.
-  virtual SimpleLock* get_lock(int type) { assert(0); }
+  virtual SimpleLock* get_lock(int type) { assert(0); return 0; }
   virtual void set_mlock_info(MLock *m) { assert(0); }
   virtual void encode_lock_state(int type, bufferlist& bl) { assert(0); }
   virtual void decode_lock_state(int type, bufferlist& bl) { assert(0); }
index 033c0ce0bc1a138a1eb103425a79439c23683524..50896df1fb1bcaad27f64dda31654ac17b166b91 100644 (file)
@@ -104,6 +104,8 @@ private:
 
   bufferlist data;
   map<string,bufferptr> attrset;
+  double request_received_time;
+  
 
   friend class MOSDOpReply;
 
@@ -149,6 +151,13 @@ private:
   const bool wants_ack() { return st.want_ack; }
   const bool wants_commit() { return st.want_commit; }
 
+  void set_received_time(double time) {
+    request_received_time = time;
+  }
+  double get_received_time() {
+    return request_received_time;
+  }
+
   
   void set_data(bufferlist &d) {
     data.claim(d);
index fae80edd91cfcfec83b9ee3fabd429e45f49514a..dd03aa495441542d063b23cafc810c67bd859330 100644 (file)
@@ -14,6 +14,8 @@
 #ifndef __MOSDPING_H
 #define __MOSDPING_H
 
+#include "common/Clock.h"
+
 #include "msg/Message.h"
 
 
@@ -22,10 +24,12 @@ class MOSDPing : public Message {
   epoch_t map_epoch;
   bool ack;
   float avg_qlen;
+  double read_mean_time;
 
   MOSDPing(epoch_t e, 
           float aq,
-          bool a=false) : Message(MSG_OSD_PING), map_epoch(e), ack(a), avg_qlen(aq) {
+          double _read_mean_time,
+          bool a=false) : Message(MSG_OSD_PING), map_epoch(e), ack(a), avg_qlen(aq), read_mean_time(_read_mean_time) {
   }
   MOSDPing() {}
 
@@ -37,11 +41,14 @@ class MOSDPing : public Message {
     off += sizeof(ack);
     payload.copy(off, sizeof(avg_qlen), (char*)&avg_qlen);
     off += sizeof(avg_qlen);
+    payload.copy(off, sizeof(read_mean_time), (char*)&read_mean_time);
+    off += sizeof(read_mean_time);
   }
   virtual void encode_payload() {
     payload.append((char*)&map_epoch, sizeof(map_epoch));
     payload.append((char*)&ack, sizeof(ack));
     payload.append((char*)&avg_qlen, sizeof(avg_qlen));
+    payload.append((char*)&read_mean_time, sizeof(read_mean_time));
   }
 
   virtual char *get_type_name() { return "oping"; }
index edc25731410c9a0a25883cd31e9f31c3e8b54a6e..d943ee7f9ebbb5c768c57fc3165b0ce07b1dacd1 100644 (file)
@@ -77,6 +77,9 @@
 char *osd_base_path = "./osddata";
 char *ebofs_base_path = "./dev";
 
+const int LOAD_LATENCY =1;
+const int LOAD_QUEUE_SIZE=2;
+const int LOAD_HYBRID =3;
 
 object_t SUPERBLOCK_OBJECT(0,0);
 
@@ -109,7 +112,9 @@ void OSD::force_remount()
 
 LogType osd_logtype;
 
-OSD::OSD(int id, Messenger *m, MonMap *mm, char *dev) : timer(osd_lock)
+OSD::OSD(int id, Messenger *m, MonMap *mm, char *dev) : 
+  timer(osd_lock),
+  load_calc(g_conf.osd_max_opq<1?1:g_conf.osd_max_opq)
 {
   whoami = id;
   messenger = m;
@@ -642,9 +647,12 @@ void OSD::heartbeat()
   float avg_qlen = 0;
   if (hb_stat_ops) avg_qlen = (float)hb_stat_qlen / (float)hb_stat_ops;
 
+  double read_mean_time = load_calc.get_average();
+
   dout(5) << "heartbeat " << now 
          << ": ops " << hb_stat_ops
          << ", avg qlen " << avg_qlen
+         << ", mean read time " << read_mean_time
          << dendl;
   
   // reset until next time around
@@ -671,8 +679,10 @@ void OSD::heartbeat()
        i != pingset.end();
        i++) {
     _share_map_outgoing( osdmap->get_inst(*i) );
-    messenger->send_message(new MOSDPing(osdmap->get_epoch(), avg_qlen), 
-                            osdmap->get_inst(*i));
+    messenger->send_message(new MOSDPing(osdmap->get_epoch(), 
+                               avg_qlen, 
+                               read_mean_time ), 
+                               osdmap->get_inst(*i));
   }
 
   if (logger) logger->set("pingset", pingset.size());
@@ -910,10 +920,12 @@ void OSD::ms_handle_failure(Message *m, const entity_inst_t& inst)
 void OSD::handle_osd_ping(MOSDPing *m)
 {
   dout(20) << "osdping from " << m->get_source() << dendl;
+
   _share_map_incoming(m->get_source_inst(), ((MOSDPing*)m)->map_epoch);
   
   int from = m->get_source().num();
   peer_qlen[from] = m->avg_qlen;
+  peer_read_time[from] = m->read_mean_time;
 
   //if (!m->ack)
   //messenger->send_message(new MOSDPing(osdmap->get_epoch(), true),
@@ -1937,9 +1949,12 @@ void OSD::handle_op(MOSDOp *op)
   const pg_t pgid = op->get_pg();
   PG *pg = _have_pg(pgid) ? _lock_pg(pgid):0;
 
-
   logger->set("buf", buffer_total_alloc);
 
+  // mark the read request received time for finding the 
+  // read througput load.  
+  op->set_received_time(g_clock.now());
+
   // update qlen stats
   hb_stat_ops++;
   hb_stat_qlen += pending_ops;
@@ -1947,7 +1962,7 @@ void OSD::handle_op(MOSDOp *op)
 
   // require same or newer map
   if (!require_same_or_newer_map(op, op->get_map_epoch())) {
-    _unlock_pg(pgid);
+    if (pg) _unlock_pg(pgid);
     return;
   }
 
@@ -1970,7 +1985,6 @@ void OSD::handle_op(MOSDOp *op)
               << pgid 
               << ", waiting" << dendl;
       waiting_for_pg[pgid].push_back(op);
-      _unlock_pg(pgid);
       return;
     }
 
@@ -2056,51 +2070,152 @@ void OSD::handle_op(MOSDOp *op)
     }
     */
 
+
+    dout(10) << "handle_op " << *op << " in " << *pg << endl;
+
+    // if this is a read and the data is in the cache ,do an immediate read.. 
+    if ( read && g_conf.osd_immediate_read_from_cache ) {
+      dout(10) << "trying to see whether data is in cache" << *op <<  endl;
+      if ( store->is_cached( op->get_oid() , 
+                            op->get_offset(), 
+                            op->get_length() ) == 0  )
+       { 
+         dout(10) << "data is in cache, reading from cache" << *op <<  endl;
+         pg->do_op(op); // do it now
+         _unlock_pg(pgid);
+         return;
+        }
+    }
+
     dout(7) << "handle_op " << *op << " in " << *pg << dendl;
     
-    
     // balance reads?
     if (read &&
        g_conf.osd_balance_reads &&
-       pg->get_acker() == whoami) {
-      // test
-      if (false) {
-       if (pg->acting.size() > 1) {
-         int peer = pg->acting[1];
-         dout(-10) << "fwd client read op to osd" << peer << " for " << op->get_client() << " " << op->get_client_inst() << dendl;
-         messenger->send_message(op, osdmap->get_inst(peer));
-         _unlock_pg(pgid);
-         return;
+       pg->get_acker() == whoami) 
+      {
+       // test
+       if (false) {
+         if (pg->acting.size() > 1) {
+           int peer = pg->acting[1];
+           dout(-10) << "fwd client read op to osd" << peer << " for " << op->get_client() << " " << op->get_client_inst() << dendl;
+           messenger->send_message(op, osdmap->get_inst(peer));
+           _unlock_pg(pgid);
+           return;
+         }
        }
-      }
-      
-      // am i above my average?
-      float my_avg = hb_stat_qlen / hb_stat_ops;
-      if (pending_ops > my_avg) {
-       // is there a peer who is below my average?
-       for (unsigned i=1; i<pg->acting.size(); ++i) {
-         int peer = pg->acting[i];
-         if (peer_qlen.count(peer) &&
-             peer_qlen[peer] < my_avg) {
-           // calculate a probability that we should redirect
-           float p = (my_avg - peer_qlen[peer]) / my_avg;             // this is dumb.
+       
+
+       // check my load. 
+       // TODO xxx we must also compare with our own load
+       // if i am x percentage higher than replica , 
+       // redirect the read 
+       
+       //if ( g_conf.osd_load_balance_scheme == LOAD_LATENCY)
+       if ( g_conf.osd_balance_reads == LOAD_LATENCY)
+         {
+           
+           double mean_read_time = load_calc.get_average();
            
-           if (drand48() <= p) {
-             // take the first one
-             dout(-10) << "my qlen " << pending_ops << " > my_avg " << my_avg
-                       << ", p=" << p 
-                       << ", fwd to peer w/ qlen " << peer_qlen[peer]
-                       << " osd" << peer
-                       << dendl;
-             messenger->send_message(op, osdmap->get_inst(peer));
-             _unlock_pg(pgid);
-             return;
+           if ( mean_read_time != -1 )
+             {
+               
+               for (unsigned i=1; 
+                    i<pg->acting.size(); 
+                    ++i) 
+                 {
+                   int peer = pg->acting[i];
+                   
+                   dout(10) << "my read time " << mean_read_time 
+                            << "peer_readtime" << peer_read_time[peer] 
+                            << " of peer" << peer << endl;
+                   
+                   if ( peer_read_time.count(peer) &&
+                        ( (peer_read_time[peer]*100/mean_read_time) <
+                          ( 100 - g_conf.osd_load_diff_percent)))
+                     {
+                       dout(10) << " forwarding to peer osd" << peer << endl;
+                       
+                       messenger->send_message(op, osdmap->get_inst(peer));
+                       _unlock_pg(pgid);
+                       return;
+                     }
+                 } 
+             }
+         }
+       //else if ( g_conf.osd_load_balance_scheme == LOAD_QUEUE_SIZE )
+       else if ( g_conf.osd_balance_reads == LOAD_QUEUE_SIZE )
+         {
+           
+           
+           // am i above my average?
+           float my_avg = hb_stat_qlen / hb_stat_ops;
+           
+           if (pending_ops > my_avg) {
+             // is there a peer who is below my average?
+             for (unsigned i=1; i<pg->acting.size(); ++i) {
+               int peer = pg->acting[i];
+               if (peer_qlen.count(peer) &&
+                   peer_qlen[peer] < my_avg) {
+                 // calculate a probability that we should redirect
+                 float p = (my_avg - peer_qlen[peer]) / my_avg;             // this is dumb.
+                 
+                 if (drand48() <= p) {
+                   // take the first one
+                   dout(10) << "my qlen " << pending_ops << " > my_avg " << my_avg
+                            << ", p=" << p 
+                            << ", fwd to peer w/ qlen " << peer_qlen[peer]
+                            << " osd" << peer
+                            << dendl;
+                   messenger->send_message(op, osdmap->get_inst(peer));
+                   _unlock_pg(pgid);
+                   return;
+                 }
+               }
+             }
            }
+           
          }
-       }
+       //else if ( g_conf.osd_load_balance_scheme == LOAD_HYBRID )
+       else if ( g_conf.osd_balance_reads == LOAD_HYBRID )
+         {
+           
+           // am i above my average?
+           float my_avg = hb_stat_qlen / hb_stat_ops;
+           
+           if (pending_ops > my_avg) {
+             // is there a peer who is below my average?
+             for (unsigned i=1; i<pg->acting.size(); ++i) {
+               int peer = pg->acting[i];
+               if (peer_qlen.count(peer) &&
+                   peer_qlen[peer] < my_avg) {
+                 // calculate a probability that we should redirect
+                 //float p = (my_avg - peer_qlen[peer]) / my_avg;             // this is dumb.
+                 
+                 double mean_read_time = load_calc.get_average();
+                 
+                 if ( mean_read_time != -1 &&  
+                      peer_read_time.count(peer) &&
+                      ( (peer_read_time[peer]*100/mean_read_time) <
+                        ( 100 - g_conf.osd_load_diff_percent) ) )
+                   //if (drand48() <= p) {
+                   // take the first one
+                   dout(10) << "using hybrid :my qlen " << pending_ops << " > my_avg " << my_avg
+                            << "my read time  "<<  mean_read_time
+                            << "peer read time " << peer_read_time[peer]  
+                            << ", fwd to peer w/ qlen " << peer_qlen[peer]
+                            << " osd" << peer
+                            << endl;
+                 messenger->send_message(op, osdmap->get_inst(peer));
+                 _unlock_pg(pgid);
+                 return;
+                 //}
+               }
+             }
+           }
+         }
+       
       }
-    }
-
   } else {
     // REPLICATION OP (it's from another OSD)
 
@@ -2125,7 +2240,24 @@ void OSD::handle_op(MOSDOp *op)
 
     assert(pg->get_role() >= 0);
     dout(7) << "handle_rep_op " << op << " in " << *pg << dendl;
+
+    // a redirected read...handle this differently ..
+    // if the data is in cache ( a rare case? ), return the data immediately
+    if ( read && g_conf.osd_immediate_read_from_cache )
+    {
+      dout(10) << "redirected read, trying to see whether data is in cache " << *op <<  endl;
+      if ( store->is_cached( op->get_oid() , 
+                            op->get_offset(), 
+                            op->get_length()) == 0   )
+        { 
+         dout(10) << "redirected read, data is in cache, reading from cache " << *op <<  endl;
+         pg->do_op(op); // do it now
+         _unlock_pg(pgid);
+         return;
+        }
+    }
   }
+
   
   if (g_conf.osd_maxthreads < 1) {
 
@@ -2139,15 +2271,11 @@ void OSD::handle_op(MOSDOp *op)
     _unlock_pg(pgid);
   } else {
     _unlock_pg(pgid);
-    // queue for worker threads
-    /*if (read) 
-      enqueue_op(0, op);     // no locking needed for reads
-    else 
-    */
-      enqueue_op(pgid, op);     
+    enqueue_op(pgid, op);         // queue for worker threads
   }
 }
 
+
 void OSD::handle_op_reply(MOSDOpReply *op)
 {
   if (op->get_map_epoch() < boot_epoch) {
index f1d560d368e48643df2ae0c75895a211485f93f6..3f0277ee089bd73e939c713824c4f2f0af2423e0 100644 (file)
@@ -48,6 +48,37 @@ public:
   static const int STATE_STOPPING = 3;
 
 
+  // load calculation
+  //current implementation is moving averges.
+  class LoadCalculator {
+  private:
+    deque<double> m_Data ;
+    unsigned m_Size ;
+    double  m_Total ;
+    
+  public:
+    LoadCalculator( unsigned size ) : m_Size(0), m_Total(0) { }
+
+    void add( double element ) {
+      // add item
+      m_Data.push_back(element);
+      m_Total += element;
+
+      // trim
+      while (m_Data.size() > m_Size) {
+       m_Total -= m_Data.front();
+       m_Data.pop_front();
+      }
+    }
+    
+    double get_average() {
+      if (m_Data.empty())
+       return -1;
+      return m_Total / (double)m_Data.size();
+    }
+  };
+
+
   /** OSD **/
 protected:
   Mutex osd_lock;     // global lock
@@ -57,6 +88,7 @@ protected:
   Logger      *logger;
   ObjectStore *store;
   MonMap      *monmap;
+  LoadCalculator load_calc;
 
   int whoami;
   char dev_path[100];
@@ -105,6 +137,7 @@ private:
   int hb_stat_qlen; // cumulative queue length since last hb
 
   hash_map<int, float> peer_qlen;
+  hash_map<int, double> peer_read_time;
   
 
   // -- waiters --
index 9ff94adfcae995e46271c229192a356703d65d9c..89e672a10a10a3af23e10d933105a41d931ca043 100644 (file)
@@ -449,6 +449,9 @@ public:
                     Context *onsafe) = 0;//{ return -1; }
   virtual void trim_from_cache(object_t oid, 
                               off_t offset, size_t len) { }
+  virtual int is_cached(object_t oid, 
+                            off_t offset, 
+                             size_t len) { return -1; }
 
   virtual int setattr(object_t oid, const char *name,
                       const void *value, size_t size,
index b8de7e20afd743a6b2b8b7d6222fdd64fdb61d53..61900fe30d4d08ca888bcc09d776835ce4145ba2 100644 (file)
@@ -210,6 +210,12 @@ int ReplicatedPG::op_read(MOSDOp *op)
     reply->set_result(0);
     reply->set_data(bl);
     reply->set_length(r);
+
+    dout(10) <<  "READ TIME DIFF"
+            << (double)g_clock.now()-op->get_received_time()
+            << endl;
+    osd->load_calc.add((double)g_clock.now() - op->get_received_time());
+
   } else {
     reply->set_result(r);   // error
     reply->set_length(0);
@@ -891,6 +897,7 @@ void ReplicatedPG::op_modify(MOSDOp *op)
     put_rep_gather(repop);
 
   } else {
+    // not acker.  
     // chain or splay.  apply.
     ObjectStore::Transaction t;
     prepare_log_transaction(t, op, nv, crev, op->get_rev(), peers_complete_thru);
@@ -905,6 +912,10 @@ void ReplicatedPG::op_modify(MOSDOp *op)
       assert(r == 0);
     }
 
+    // lets evict the data from our cache to maintain a total large cache size
+    if (g_conf.osd_exclusive_caching)
+      osd->store->trim_from_cache(op->get_oid() , op->get_offset(), op->get_length());
+
     oncommit->ack();
   }