]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc: inc_lock
authorSage Weil <sage@newdream.net>
Mon, 24 Mar 2008 17:45:49 +0000 (10:45 -0700)
committerSage Weil <sage@newdream.net>
Mon, 24 Mar 2008 17:45:49 +0000 (10:45 -0700)
src/osdc/Filer.h
src/osdc/ObjectCacher.h
src/osdc/Objecter.cc
src/osdc/Objecter.h

index 4744e2276d1622b806246c8bb41ff97fc2a02c1b..86fb663fa2991a0610526e4d07c9251cbabec91c 100644 (file)
@@ -85,7 +85,7 @@ class Filer {
                                  size_t len, 
                                  bufferlist *bl,
                                  int flags) {
-    Objecter::OSDRead *rd = new Objecter::OSDRead(bl, flags);
+    Objecter::OSDRead *rd = objecter->prepare_read(bl, flags);
     file_to_extents(inode, offset, len, rd->extents);
     return rd;
   }
@@ -107,7 +107,7 @@ class Filer {
             Context *onack,
             Context *oncommit,
            objectrev_t rev=0) {
-    Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl, flags);
+    Objecter::OSDWrite *wr = objecter->prepare_write(bl, flags);
     file_to_extents(inode, offset, len, wr->extents, rev);
     return objecter->modifyx(wr, onack, oncommit) > 0 ? 0:-1;
   }
@@ -118,7 +118,7 @@ class Filer {
           int flags,
            Context *onack,
            Context *oncommit) {
-    Objecter::OSDModify *z = new Objecter::OSDModify(CEPH_OSD_OP_ZERO, flags);
+    Objecter::OSDModify *z = objecter->prepare_modify(CEPH_OSD_OP_ZERO, flags);
     file_to_extents(inode, offset, len, z->extents);
     return objecter->modifyx(z, onack, oncommit) > 0 ? 0:-1;
   }
@@ -129,7 +129,7 @@ class Filer {
             int flags,
             Context *onack,
             Context *oncommit) {
-    Objecter::OSDModify *z = new Objecter::OSDModify(CEPH_OSD_OP_DELETE, flags);
+    Objecter::OSDModify *z = objecter->prepare_modify(CEPH_OSD_OP_DELETE, flags);
     file_to_extents(inode, offset, len, z->extents);
     return objecter->modifyx(z, onack, oncommit) > 0 ? 0:-1;
   }
index 1559491680d364edde4ae4c0513d1e2d14fac56f..170b2e0e9b683b6ff457aa4e543de9ad0540f2cc 100644 (file)
@@ -489,7 +489,7 @@ class ObjectCacher {
                 bufferlist *bl,
                int flags,
                 Context *onfinish) {
-    Objecter::OSDRead *rd = new Objecter::OSDRead(bl, flags);
+    Objecter::OSDRead *rd = objecter->prepare_read(bl, flags);
     filer.file_to_extents(inode, offset, len, rd->extents);
     return readx(rd, inode.ino, onfinish);
   }
@@ -498,7 +498,7 @@ class ObjectCacher {
                  off_t offset, size_t len, 
                  bufferlist& bl, int flags,
                 objectrev_t rev=0) {
-    Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl, flags);
+    Objecter::OSDWrite *wr = objecter->prepare_write(bl, flags);
     filer.file_to_extents(inode, offset, len, wr->extents);
     return writex(wr, inode.ino);
   }
@@ -511,7 +511,7 @@ class ObjectCacher {
                             off_t offset, size_t len, 
                             bufferlist *bl, int flags,
                             Mutex &lock) {
-    Objecter::OSDRead *rd = new Objecter::OSDRead(bl, flags);
+    Objecter::OSDRead *rd = objecter->prepare_read(bl, flags);
     filer.file_to_extents(inode, offset, len, rd->extents);
     return atomic_sync_readx(rd, inode.ino, lock);
   }
@@ -521,7 +521,7 @@ class ObjectCacher {
                              bufferlist& bl, int flags,
                              Mutex &lock,
                             objectrev_t rev=0) {
-    Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl, flags);
+    Objecter::OSDWrite *wr = objecter->prepare_write(bl, flags);
     filer.file_to_extents(inode, offset, len, wr->extents);
     return atomic_sync_writex(wr, inode.ino, lock);
   }
index 31d660c16b9c39cf8d1baf94cbf37abbdb9625ea..1acf5eb41b9d51a4e0a93201c621355c15adf378 100644 (file)
@@ -334,7 +334,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 
 tid_t Objecter::stat(object_t oid, off_t *size, ceph_object_layout ol, int flags, Context *onfinish)
 {
-  OSDStat *st = new OSDStat(size, flags);
+  OSDStat *st = prepare_stat(size, flags);
   st->extents.push_back(ObjectExtent(oid, 0, 0));
   st->extents.front().layout = ol;
   st->onfinish = onfinish;
@@ -373,6 +373,10 @@ tid_t Objecter::stat_submit(OSDStat *st)
     MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
                           ex.oid, ex.layout, osdmap->get_epoch(), 
                           CEPH_OSD_OP_STAT, flags);
+    if (inc_lock >= 0) {
+      st->inc_lock = inc_lock;
+      m->set_inc_lock(inc_lock);
+    }
     
     messenger->send_message(m, osdmap->get_inst(pg.acker()));
   }
@@ -440,7 +444,7 @@ void Objecter::handle_osd_stat_reply(MOSDOpReply *m)
 tid_t Objecter::read(object_t oid, off_t off, size_t len, ceph_object_layout ol, bufferlist *bl, int flags, 
                      Context *onfinish)
 {
-  OSDRead *rd = new OSDRead(bl, flags);
+  OSDRead *rd = prepare_read(bl, flags);
   rd->extents.push_back(ObjectExtent(oid, off, len));
   rd->extents.front().layout = ol;
   readx(rd, onfinish);
@@ -491,6 +495,10 @@ tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex, bool retry)
     MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
                           ex.oid, ex.layout, osdmap->get_epoch(), 
                           CEPH_OSD_OP_READ, flags);
+    if (inc_lock >= 0) {
+      rd->inc_lock = inc_lock;
+      m->set_inc_lock(inc_lock);
+    }
     m->set_length(ex.length);
     m->set_offset(ex.start);
     m->set_retry_attempt(retry);
@@ -680,7 +688,7 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m)
 tid_t Objecter::write(object_t oid, off_t off, size_t len, ceph_object_layout ol, bufferlist &bl, int flags,
                       Context *onack, Context *oncommit)
 {
-  OSDWrite *wr = new OSDWrite(bl, flags);
+  OSDWrite *wr = prepare_write(bl, flags);
   wr->extents.push_back(ObjectExtent(oid, off, len));
   wr->extents.front().layout = ol;
   wr->extents.front().buffer_extents[0] = len;
@@ -694,7 +702,7 @@ tid_t Objecter::write(object_t oid, off_t off, size_t len, ceph_object_layout ol
 tid_t Objecter::zero(object_t oid, off_t off, size_t len, ceph_object_layout ol, int flags, 
                      Context *onack, Context *oncommit)
 {
-  OSDModify *z = new OSDModify(CEPH_OSD_OP_ZERO, flags);
+  OSDModify *z = prepare_modify(CEPH_OSD_OP_ZERO, flags);
   z->extents.push_back(ObjectExtent(oid, off, len));
   z->extents.front().layout = ol;
   modifyx(z, onack, oncommit);
@@ -707,7 +715,7 @@ tid_t Objecter::zero(object_t oid, off_t off, size_t len, ceph_object_layout ol,
 tid_t Objecter::lock(int op, object_t oid, int flags, ceph_object_layout ol, 
                      Context *onack, Context *oncommit)
 {
-  OSDModify *l = new OSDModify(op, flags);
+  OSDModify *l = prepare_modify(op, flags);
   l->extents.push_back(ObjectExtent(oid, 0, 0));
   l->extents.front().layout = ol;
   modifyx(l, onack, oncommit);
@@ -777,6 +785,10 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
     MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, tid,
                           ex.oid, ex.layout, osdmap->get_epoch(),
                           wr->op, flags);
+    if (inc_lock >= 0) {
+      wr->inc_lock = inc_lock;
+      m->set_inc_lock(inc_lock);
+    }
     m->set_length(ex.length);
     m->set_offset(ex.start);
     if (usetid > 0)
@@ -843,6 +855,18 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
     delete m;
     return;
   }
+  if (m->get_result() == -EAGAIN) {
+    dout(7) << " got -EAGAIN, resubmitting" << dendl;
+    if (wr->onack) num_unacked--;
+    if (wr->oncommit) num_uncommitted--;
+    if (wr->waitfor_ack.count(tid)) 
+      modifyx_submit(wr, wr->waitfor_ack[tid]);
+    else if (wr->waitfor_commit.count(tid)) 
+      modifyx_submit(wr, wr->waitfor_commit[tid]);
+    else assert(0);
+    delete m;
+    return;
+  }
 
   assert(m->get_result() >= 0);
 
index 8aefd0559be05bba6368b3249b6006dded4b79a9..b1cb68f91400dceac87ee7003b428dea8a43ecd6 100644 (file)
@@ -44,6 +44,7 @@ class Objecter {
  private:
   tid_t last_tid;
   int client_inc;
+  int inc_lock;       // optional
   int num_unacked;
   int num_uncommitted;
 
@@ -70,6 +71,8 @@ class Objecter {
   class OSDOp {
   public:
     list<ObjectExtent> extents;
+    int inc_lock;
+    OSDOp() : inc_lock(-1) {}
     virtual ~OSDOp() {}
   };
 
@@ -86,6 +89,10 @@ class Objecter {
     }
   };
 
+  OSDRead *prepare_read(bufferlist *b, int f) {
+    return new OSDRead(b, f);
+  }
+
   class OSDStat : public OSDOp {
   public:
     tid_t tid;
@@ -95,6 +102,10 @@ class Objecter {
     OSDStat(off_t *s, int f) : tid(0), size(s), flags(f), onfinish(0) { }
   };
 
+  OSDStat *prepare_stat(off_t *s, int f) {
+    return new OSDStat(s, f);
+  }
+
   // generic modify
   class OSDModify : public OSDOp {
   public:
@@ -109,6 +120,10 @@ class Objecter {
 
     OSDModify(int o, int f) : op(o), flags(f), onack(0), oncommit(0) {}
   };
+
+  OSDModify *prepare_modify(int o, int f) { 
+    return new OSDModify(o, f); 
+  }
   
   // write (includes the bufferlist)
   class OSDWrite : public OSDModify {
@@ -117,6 +132,10 @@ class Objecter {
     OSDWrite(bufferlist &b, int f) : OSDModify(CEPH_OSD_OP_WRITE, f), bl(b) {}
   };
 
+  OSDWrite *prepare_write(bufferlist &b, int f) { 
+    return new OSDWrite(b, f); 
+  }
+
   
 
  private:
@@ -172,7 +191,7 @@ class Objecter {
  public:
   Objecter(Messenger *m, MonMap *mm, OSDMap *om, Mutex& l) : 
     messenger(m), monmap(mm), osdmap(om), 
-    last_tid(0), client_inc(-1),
+    last_tid(0), client_inc(-1), inc_lock(-1),
     num_unacked(0), num_uncommitted(0),
     last_epoch_requested(0),
     client_lock(l), timer(l)
@@ -204,10 +223,12 @@ class Objecter {
   }
   void dump_active();
 
-  int get_client_incarnation() { return client_inc; }
-  void set_client_incarnation(int inc) {
-       client_inc = inc;
-  }
+  int get_client_incarnation() const { return client_inc; }
+  void set_client_incarnation(int inc) { client_inc = inc; }
+
+  //int get_inc_lock() const { return inc_lock; }
+  void set_inc_lock(int l) { inc_lock = l; }
+    
 
   // med level
   tid_t readx(OSDRead *read, Context *onfinish);