]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
objecter periodically requests new map if it hits an empty pg
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Sat, 25 Aug 2007 19:14:14 +0000 (19:14 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Sat, 25 Aug 2007 19:14:14 +0000 (19:14 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1691 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/config.cc
trunk/ceph/config.h
trunk/ceph/messages/MOSDGetMap.h
trunk/ceph/mon/OSDMonitor.cc
trunk/ceph/mon/OSDMonitor.h
trunk/ceph/osdc/Objecter.cc
trunk/ceph/osdc/Objecter.h

index c3fbfc5efad405d46dcc5fc285045e44fe753c51..0ef744597bc1f69cacb98d807ed45f11c1c8a437 100644 (file)
@@ -167,6 +167,7 @@ md_config_t g_conf = {
   
   // --- objecter ---
   objecter_buffer_uncommitted: true,  // this must be true for proper failure handling
+  objecter_map_request_interval: 15.0, // request a new map every N seconds, if we have pending io
 
   // --- journaler ---
   journaler_allow_split_entries: true,
index 5d7eff5d232042e63bea8ecb8f6719a7913f4a04..65025b10f1c5dea3a58a150cc90db2fe33f223b5 100644 (file)
@@ -164,6 +164,7 @@ struct md_config_t {
 
   // objecter
   bool  objecter_buffer_uncommitted;
+  double objecter_map_request_interval;
 
   // journaler
   bool  journaler_allow_split_entries;
index 3ced814886eafb697ca02f960e84acca8aa9e6d6..25f94ef3bcc92fb665f86c92a73260dd1b060e11 100644 (file)
 
 class MOSDGetMap : public Message {
  public:
-  epoch_t start;
+  epoch_t start, want;
 
-  MOSDGetMap(epoch_t s=0) : 
+  MOSDGetMap(epoch_t s=0, epoch_t w=0) : 
     Message(MSG_OSD_GETMAP),
-    start(s) {
-  }
+    start(s), want(w) { }
 
   epoch_t get_start_epoch() { return start; }
+  epoch_t get_want_epoch() { return want; }
 
   char *get_type_name() { return "get_osd_map"; }
   void print(ostream& out) {
-    out << "get_osd_map(" << start << ")";
+    out << "get_osd_map(have " << start;
+    if (want) out << " want " << want;
+    out << ")";
   }
   
   void encode_payload() {
     ::_encode(start, payload);
+    ::_encode(want, payload);
   }
   void decode_payload() {
     int off = 0;
     ::_decode(start, payload, off);
+    ::_decode(want, payload, off);
   }
 };
 
index 94ffedae881f778dc0e9cbbe8e18de6221fdb499..c8f8f6da481a2a5e71b67c1f84d0e3e159d4de4c 100644 (file)
@@ -284,7 +284,8 @@ bool OSDMonitor::update_from_paxos()
 
   // new map!
   bcast_latest_mds();
-  
+  send_to_waiting();
+    
   return true;
 }
 
@@ -416,9 +417,13 @@ void OSDMonitor::handle_osd_getmap(MOSDGetMap *m)
 {
   dout(7) << "handle_osd_getmap from " << m->get_source() << " from " << m->get_start_epoch() << dendl;
   
-  if (m->get_start_epoch())
-    send_incremental(m->get_source_inst(), m->get_start_epoch());
-  else
+  if (m->get_start_epoch()) {
+    if (m->get_want_epoch() <= osdmap.get_epoch())
+       send_incremental(m->get_source_inst(), m->get_start_epoch());
+    else
+      waiting_for_map[m->get_source_inst()] = pair<epoch_t,epoch_t>(m->get_start_epoch(),
+                                                                   m->get_want_epoch());
+  } else
     send_full(m->get_source_inst());
   
   delete m;
@@ -562,13 +567,23 @@ void OSDMonitor::send_to_waiting()
 {
   dout(10) << "send_to_waiting " << osdmap.get_epoch() << dendl;
 
-  for (map<entity_name_t,pair<entity_inst_t,epoch_t> >::iterator i = awaiting_map.begin();
-       i != awaiting_map.end();
-       i++) {
-    if (i->second.second)
-      send_incremental(i->second.first, i->second.second);
-    else
-      send_full(i->second.first);
+  map<entity_inst_t,pair<epoch_t,epoch_t> >::iterator i = waiting_for_map.begin();
+  while (i != waiting_for_map.end()) {
+    if (i->second.first) {
+      if (i->second.second <= osdmap.get_epoch())
+       send_incremental(i->first, i->second.first);
+      else {
+       dout(10) << "send_to_waiting skipping " << i->first
+                << " has " << i->second.first
+                << " wants " << i->second.second
+                << endl;
+       i++;
+       continue;
+      }
+    } else
+      send_full(i->first);
+
+    waiting_for_map.erase(i++);
   }
 }
 
@@ -582,8 +597,7 @@ void OSDMonitor::send_latest(entity_inst_t who, epoch_t start)
       send_incremental(who, start);
   } else {
     dout(5) << "send_latest to " << who << " later" << dendl;
-    awaiting_map[who.name].first = who;
-    awaiting_map[who.name].second = start;
+    waiting_for_map[who] = pair<epoch_t,epoch_t>(start, 0);
   }
 }
 
index 2ee86c8fe61e882cb092a4cda2714804c4c933a6..d4fb9270882c261455d88ad4ad0213c1156e7267 100644 (file)
@@ -35,7 +35,7 @@ public:
   OSDMap osdmap;
 
 private:
-  map<entity_name_t, pair<entity_inst_t, epoch_t> > awaiting_map;
+  map<entity_inst_t, pair<epoch_t,epoch_t> > waiting_for_map;  // who -> (has, wants)
 
   // [leader]
   OSDMap::Incremental pending_inc;
index 3603d102a3636f6c58ab4037dcfda198b260f037..6894f5eea4d2c8a1fb1efbbbb734bd7add491f74 100644 (file)
@@ -1,5 +1,16 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
 // vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
 
 #include "Objecter.h"
 #include "osd/OSDMap.h"
@@ -102,6 +113,23 @@ void Objecter::handle_osd_map(MOSDMap *m)
   delete m;
 }
 
+
+void Objecter::maybe_request_map()
+{
+  utime_t now;
+
+  if (last_epoch_requested <= osdmap->get_epoch() ||
+      (now = g_clock.now()) - last_epoch_requested_stamp > g_conf.objecter_map_request_interval) {
+    dout(10) << "maybe_request_map requesting next osd map" << dendl;
+    last_epoch_requested_stamp = now;
+    last_epoch_requested = osdmap->get_epoch()+1;
+    messenger->send_message(new MOSDGetMap(osdmap->get_epoch(), last_epoch_requested),
+                           monmap->get_inst(monmap->pick_mon()));
+  }
+}
+
+
+
 void Objecter::scan_pgs(set<pg_t>& changed_pgs)
 {
   dout(10) << "scan_pgs" << dendl;
@@ -422,7 +450,8 @@ tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex, bool retry)
                << " = osd" << who <<  dendl;
     }
     messenger->send_message(m, osdmap->get_inst(who));
-  }
+  } else 
+    maybe_request_map();
     
   return last_tid;
 }
@@ -718,7 +747,8 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
     }
     
     messenger->send_message(m, osdmap->get_inst(pg.primary()));
-  }
+  } else 
+    maybe_request_map();
   
   dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl;
   
index db8f30e5c85732540087b38336dcd57fd7cca3c4..2da3107566fd43e87a42185e301c2261295d40dc 100644 (file)
@@ -1,5 +1,17 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
 // vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
 #ifndef __OBJECTER_H
 #define __OBJECTER_H
 
@@ -33,6 +45,11 @@ class Objecter {
   int num_unacked;
   int num_uncommitted;
 
+  epoch_t last_epoch_requested;
+  utime_t last_epoch_requested_stamp;
+
+  void maybe_request_map();
+
   /*** track pending operations ***/
   // read
  public: