]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
objecter: set pgls start_epoch field
authorSage Weil <sage.weil@dreamhost.com>
Thu, 12 May 2011 00:56:32 +0000 (17:56 -0700)
committerSage Weil <sage.weil@dreamhost.com>
Thu, 12 May 2011 00:56:32 +0000 (17:56 -0700)
For each pg, start out with start_epoch = 0 in the first request.  For
subsequent requests, set it to the first reply's epoch.  This forces the
OSD to ignore our cookie and "restart" if the pg mapping changes and there
is a possibility of incomplete results.

The price we pay is the possibility of duplicate results.

Fixes: #1030
Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
src/osdc/Objecter.cc
src/osdc/Objecter.h

index 4f28eab700733a8eb778437125016ef979689ea6..0c1d7f0e84063435809afb5b24cc0192fdf925e9 100644 (file)
@@ -751,6 +751,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 
   if (op->objver)
     *op->objver = m->get_version();
+  if (op->reply_epoch)
+    *op->reply_epoch = m->get_map_epoch();
 
   // got data?
   if (op->outbl) {
@@ -833,6 +835,7 @@ void Objecter::list_objects(ListContext *list_context, Context *onfinish) {
     dout(10) << "The placement groups have changed, restarting with " << pg_num << dendl;
     list_context->current_pg = 0;
     list_context->cookie = 0;
+    list_context->current_pg_epoch = 0;
     list_context->starting_pg_num = pg_num;
   }
   if (list_context->current_pg == pg_num){ //this context got all the way through
@@ -842,7 +845,8 @@ void Objecter::list_objects(ListContext *list_context, Context *onfinish) {
   }
 
   ObjectOperation op;
-  op.pg_ls(list_context->max_entries, list_context->filter, list_context->cookie);
+  op.pg_ls(list_context->max_entries, list_context->filter, list_context->cookie,
+          list_context->current_pg_epoch);
 
   bufferlist *bl = new bufferlist();
   C_List *onack = new C_List(list_context, onfinish, bl, this);
@@ -855,13 +859,14 @@ void Objecter::list_objects(ListContext *list_context, Context *onfinish) {
   o->priority = op.priority;
   o->snapid = list_context->pool_snap_seq;
   o->outbl = bl;
+  o->reply_epoch = &onack->epoch;
 
   o->pgid = pg_t(list_context->current_pg, list_context->pool_id, -1);
 
   op_submit(o);
 }
 
-void Objecter::_list_reply(ListContext *list_context, bufferlist *bl, Context *final_finish)
+void Objecter::_list_reply(ListContext *list_context, bufferlist *bl, Context *final_finish, epoch_t reply_epoch)
 {
   dout(10) << "_list_reply" << dendl;
 
@@ -873,6 +878,11 @@ void Objecter::_list_reply(ListContext *list_context, bufferlist *bl, Context *f
     ::decode(extra_info, iter);
   }
   list_context->cookie = (uint64_t)response.handle;
+  if (!list_context->current_pg_epoch) {
+    // first pgls result, set epoch marker
+    dout(20) << "first pgls piece, reply_epoch is " << reply_epoch << dendl;
+    list_context->current_pg_epoch = reply_epoch;
+  }
 
   int response_size = response.entries.size();
   dout(20) << "response.entries.size " << response_size
@@ -890,17 +900,19 @@ void Objecter::_list_reply(ListContext *list_context, bufferlist *bl, Context *f
       return;
     }
   }
-  //if we make this this far, there are no objects left in the current pg, but we want more!
+
+  // if we make this this far, there are no objects left in the current pg, but we want more!
   ++list_context->current_pg;
+  list_context->current_pg_epoch = 0;
   dout(20) << "emptied current pg, moving on to next one:" << list_context->current_pg << dendl;
-  if(list_context->current_pg < list_context->starting_pg_num){ //we have more pgs to go through
+  if (list_context->current_pg < list_context->starting_pg_num){ // we have more pgs to go through
     list_context->cookie = 0;
     delete bl;
     list_objects(list_context, final_finish);
     return;
   }
   
-  //if we make it this far, there are no more pgs
+  // if we make it this far, there are no more pgs
   dout(20) << "out of pgs, returning to" << final_finish << dendl;
   list_context->at_end = true;
   delete bl;
index 020edbaf17cbb24f2f0462c55ad45ebeef616c08..6290a322cbef517c32d8838b4523e655fdb9963b 100644 (file)
@@ -95,19 +95,21 @@ struct ObjectOperation {
     ops[s].op.watch.flag = flag;
     ops[s].data.append(inbl);
   }
-  void add_pgls(int op, uint64_t count, uint64_t cookie) {
+  void add_pgls(int op, uint64_t count, uint64_t cookie, epoch_t start_epoch) {
     int s = ops.size();
     ops.resize(s+1);
     ops[s].op.op = op;
     ops[s].op.pgls.count = count;
     ops[s].op.pgls.cookie = cookie;
+    ops[s].op.pgls.start_epoch = start_epoch;
   }
-  void add_pgls_filter(int op, uint64_t count, bufferlist& filter, uint64_t cookie) {
+  void add_pgls_filter(int op, uint64_t count, bufferlist& filter, uint64_t cookie, epoch_t start_epoch) {
     int s = ops.size();
     ops.resize(s+1);
     ops[s].op.op = op;
     ops[s].op.pgls.count = count;
     ops[s].op.pgls.cookie = cookie;
+    ops[s].op.pgls.start_epoch = start_epoch;
     string cname = "pg";
     string mname = "filter";
     ::encode(cname, ops[s].data);
@@ -118,11 +120,11 @@ struct ObjectOperation {
   // ------
 
   // pg
-  void pg_ls(uint64_t count, bufferlist& filter, uint64_t cookie) {
+  void pg_ls(uint64_t count, bufferlist& filter, uint64_t cookie, epoch_t start_epoch) {
     if (filter.length() == 0)
-      add_pgls(CEPH_OSD_OP_PGLS, count, cookie);
+      add_pgls(CEPH_OSD_OP_PGLS, count, cookie, start_epoch);
     else
-      add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER, count, filter, cookie);
+      add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER, count, filter, cookie, start_epoch);
     flags |= CEPH_OSD_FLAG_PGOP;
   }
 
@@ -310,6 +312,7 @@ public:
     bool paused;
 
     eversion_t *objver;
+    epoch_t *reply_epoch;
 
     utime_t stamp;
 
@@ -320,7 +323,7 @@ public:
       used_replica(false), con(NULL),
       snapid(CEPH_NOSNAP), outbl(0), flags(f), priority(0), onack(ac), oncommit(co), 
       tid(0), attempts(0),
-      paused(false), objver(ov) {
+      paused(false), objver(ov), reply_epoch(NULL) {
       ops.swap(op);
     }
   };
@@ -369,6 +372,7 @@ public:
   struct ListContext {
     int current_pg;
     uint64_t cookie;
+    epoch_t current_pg_epoch;
     int starting_pg_num;
     bool at_end;
 
@@ -381,7 +385,7 @@ public:
 
     bufferlist extra_info;
 
-    ListContext() : current_pg(0), cookie(0), starting_pg_num(0),
+    ListContext() : current_pg(0), cookie(0), current_pg_epoch(0), starting_pg_num(0),
                    at_end(false), pool_id(0),
                    pool_snap_seq(0), max_entries(0) {}
   };
@@ -391,11 +395,12 @@ public:
     Context *final_finish;
     bufferlist *bl;
     Objecter *objecter;
+    epoch_t epoch;
     C_List(ListContext *lc, Context * finish, bufferlist *b, Objecter *ob) :
-      list_context(lc), final_finish(finish), bl(b), objecter(ob) {}
+      list_context(lc), final_finish(finish), bl(b), objecter(ob), epoch(0) {}
     void finish(int r) {
       if (r >= 0) {
-        objecter->_list_reply(list_context, bl, final_finish);
+        objecter->_list_reply(list_context, bl, final_finish, epoch);
       } else {
         final_finish->finish(r);
         delete final_finish;
@@ -529,7 +534,8 @@ public:
   void reopen_session(OSDSession *session);
   void close_session(OSDSession *session);
   
-  void _list_reply(ListContext *list_context, bufferlist *bl, Context *final_finish);
+  void _list_reply(ListContext *list_context, bufferlist *bl, Context *final_finish,
+                  epoch_t reply_epoch);
 
   void resend_mon_ops();