]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: make image contexts threadsafe
authorJosh Durgin <josh.durgin@dreamhost.com>
Wed, 25 May 2011 21:15:36 +0000 (14:15 -0700)
committerJosh Durgin <josh.durgin@dreamhost.com>
Wed, 25 May 2011 21:34:49 +0000 (14:34 -0700)
Use refresh_lock to protect the needs_refresh member, and
ImageContext::lock for the header and snapshot metadata.

Signed-off-by: Josh Durgin <josh.durgin@dreamhost.com>
src/librbd.cc

index 0de136b189fbe94ef9e1a44a66c5a9f82f442229..36da1ade9b76da2b02fde7f88036cf7200ff9c10 100644 (file)
@@ -50,11 +50,13 @@ namespace librbd {
     IoCtx data_ctx, md_ctx;
     WatchCtx *wctx;
     bool needs_refresh;
-    Mutex lock;
+    Mutex refresh_lock;
+    Mutex lock; // protects access to snapshot and header information
 
     ImageCtx(std::string imgname, IoCtx& p) : snapid(CEPH_NOSNAP),
                                              name(imgname),
                                              needs_refresh(true),
+                                             refresh_lock("librbd::ImageCtx::refresh_lock"),
                                              lock("librbd::ImageCtx::lock") {
       md_ctx.dup(p);
       data_ctx.dup(p);
@@ -283,7 +285,7 @@ void WatchCtx::notify(uint8_t opcode, uint64_t ver)
   Mutex::Locker l(lock);
   dout(1) <<  " got notification opcode=" << (int)opcode << " ver=" << ver << " cookie=" << cookie << dendl;
   if (valid) {
-    Mutex::Locker lictx(ictx->lock);
+    Mutex::Locker lictx(ictx->refresh_lock);
     ictx->needs_refresh = true;
   }
 }
@@ -454,9 +456,10 @@ int notify_change(IoCtx& io_ctx, const string& oid, uint64_t *pver, ImageCtx *ic
   uint64_t ver;
 
   if (ictx) {
-    ictx->lock.Lock();
+    assert(ictx->lock.is_locked());
+    ictx->refresh_lock.Lock();
     ictx->needs_refresh = true;
-    ictx->lock.Unlock();
+    ictx->refresh_lock.Unlock();
   }
 
   if (pver)
@@ -511,6 +514,7 @@ int tmap_rm(IoCtx& io_ctx, const string& imgname)
 
 int rollback_image(ImageCtx *ictx, uint64_t snapid)
 {
+  assert(ictx->lock.is_locked());
   uint64_t numseg = get_max_block(ictx->header);
 
   for (uint64_t i = 0; i < numseg; i++) {
@@ -551,7 +555,9 @@ int snap_create(ImageCtx *ictx, const char *snap_name)
   if (r < 0)
     return r;
 
+  Mutex::Locker l(ictx->lock);
   r = add_snap(ictx, snap_name);
+
   if (r < 0)
     return r;
 
@@ -568,6 +574,7 @@ int snap_remove(ImageCtx *ictx, const char *snap_name)
   if (r < 0)
     return r;
 
+  Mutex::Locker l(ictx->lock);
   snap_t snapid = ictx->get_snapid(snap_name);
   if (snapid == CEPH_NOSNAP)
     return -ENOENT;
@@ -577,6 +584,7 @@ int snap_remove(ImageCtx *ictx, const char *snap_name)
     return r;
 
   r = ictx->data_ctx.selfmanaged_snap_remove(snapid);
+
   if (r < 0)
     return r;
 
@@ -689,6 +697,8 @@ int info(ImageCtx *ictx, image_info_t& info, size_t infosize)
   int r = ictx_check(ictx);
   if (r < 0)
     return r;
+
+  Mutex::Locker l(ictx->lock);
   image_info(ictx->header, info, infosize);
   return 0;
 }
@@ -731,6 +741,7 @@ int resize(ImageCtx *ictx, uint64_t size)
   if (r < 0)
     return r;
 
+  Mutex::Locker l(ictx->lock);
   // trim
   if (size == ictx->header.image_size) {
     dout(2) << "no change in size (" << size << " -> " << ictx->header.image_size << ")" << dendl;
@@ -750,6 +761,7 @@ int resize(ImageCtx *ictx, uint64_t size)
   bufferlist bl;
   bl.append((const char *)&(ictx->header), sizeof(ictx->header));
   r = ictx->md_ctx.write(ictx->md_oid(), bl, bl.length(), 0);
+
   if (r == -ERANGE)
     derr << "operation might have conflicted with another client!" << dendl;
   if (r < 0) {
@@ -760,6 +772,7 @@ int resize(ImageCtx *ictx, uint64_t size)
   }
 
   dout(2) << "done." << dendl;
+
   return 0;
 }
 
@@ -772,6 +785,7 @@ int snap_list(ImageCtx *ictx, std::vector<snap_info_t>& snaps)
     return r;
   bufferlist bl, bl2;
 
+  Mutex::Locker l(ictx->lock);
   for (std::map<std::string, struct SnapInfo>::iterator it = ictx->snaps_by_name.begin();
        it != ictx->snaps_by_name.end(); ++it) {
     snap_info_t info;
@@ -786,6 +800,8 @@ int snap_list(ImageCtx *ictx, std::vector<snap_info_t>& snaps)
 
 int add_snap(ImageCtx *ictx, const char *snap_name)
 {
+  assert(ictx->lock.is_locked());
+
   bufferlist bl, bl2;
   uint64_t snap_id;
 
@@ -810,6 +826,8 @@ int add_snap(ImageCtx *ictx, const char *snap_name)
 
 int rm_snap(ImageCtx *ictx, const char *snap_name)
 {
+  assert(ictx->lock.is_locked());
+
   bufferlist bl, bl2;
   ::encode(snap_name, bl);
 
@@ -825,11 +843,12 @@ int rm_snap(ImageCtx *ictx, const char *snap_name)
 int ictx_check(ImageCtx *ictx)
 {
   dout(20) << "ictx_check " << ictx << dendl;
-  ictx->lock.Lock();
+  ictx->refresh_lock.Lock();
   bool needs_refresh = ictx->needs_refresh;
-  ictx->lock.Unlock();
+  ictx->refresh_lock.Unlock();
 
   if (needs_refresh) {
+    Mutex::Locker l(ictx->lock);
     const char *snap = NULL;
     if (ictx->snapid != CEPH_NOSNAP)
       snap = ictx->snapname.c_str();
@@ -851,6 +870,7 @@ int ictx_check(ImageCtx *ictx)
 
 int ictx_refresh(ImageCtx *ictx, const char *snap_name)
 {
+  assert(ictx->lock.is_locked());
   bufferlist bl, bl2;
 
   if (snap_name) {
@@ -903,9 +923,9 @@ int ictx_refresh(ImageCtx *ictx, const char *snap_name)
 
   ictx->data_ctx.selfmanaged_snap_set_write_ctx(ictx->snapc.seq, ictx->snaps);
 
-  ictx->lock.Lock();
+  ictx->refresh_lock.Lock();
   ictx->needs_refresh = false;
-  ictx->lock.Unlock();
+  ictx->refresh_lock.Unlock();
 
   return 0;
 }
@@ -918,6 +938,7 @@ int snap_rollback(ImageCtx *ictx, const char *snap_name)
   if (r < 0)
     return r;
 
+  Mutex::Locker l(ictx->lock);
   snap_t snapid = ictx->get_snapid(snap_name);
   if (snapid == CEPH_NOSNAP) {
     derr << "No such snapshot found." << dendl;
@@ -1015,6 +1036,7 @@ int snap_set(ImageCtx *ictx, const char *snap_name)
   if (r < 0)
     return r;
 
+  Mutex::Locker l(ictx->lock);
   if (snap_name)
     ictx->snap_set(snap_name);
   else
@@ -1031,7 +1053,9 @@ int open_image(IoCtx& io_ctx, ImageCtx *ictx, const char *name, const char *snap
   dout(20) << "open_image " << &io_ctx << " ictx =  " << ictx
           << " name =  " << name << " snap_name = " << (snap_name ? snap_name : "NULL") << dendl;
 
+  ictx->lock.Lock();
   int r = ictx_refresh(ictx, snap_name);
+  ictx->lock.Unlock();
   if (r < 0)
     return r;
 
@@ -1047,12 +1071,12 @@ int open_image(IoCtx& io_ctx, ImageCtx *ictx, const char *name, const char *snap
 void close_image(ImageCtx *ictx)
 {
   dout(20) << "close_image " << ictx << dendl;
-
+  ictx->lock.Lock();
   ictx->wctx->invalidate();
   ictx->md_ctx.unwatch(ictx->md_oid(), ictx->wctx->cookie);
   delete ictx->wctx;
+  ictx->lock.Unlock();
   delete ictx;
-  ictx = NULL;
 }
 
 int64_t read_iterate(ImageCtx *ictx, uint64_t off, size_t len,
@@ -1071,15 +1095,19 @@ int64_t read_iterate(ImageCtx *ictx, uint64_t off, size_t len,
 
   int64_t ret;
   int64_t total_read = 0;
+  ictx->lock.Lock();
   uint64_t start_block = get_block_num(ictx->header, off);
   uint64_t end_block = get_block_num(ictx->header, off + len);
   uint64_t block_size = get_block_size(ictx->header);
+  ictx->lock.Unlock();
   uint64_t left = len;
 
   for (uint64_t i = start_block; i <= end_block; i++) {
     bufferlist bl;
+    ictx->lock.Lock();
     string oid = get_block_oid(ictx->header, i);
     uint64_t block_ofs = get_block_ofs(ictx->header, off + total_read);
+    ictx->lock.Unlock();
     uint64_t read_len = min(block_size - block_ofs, left);
 
     map<uint64_t, uint64_t> m;
@@ -1162,15 +1190,19 @@ ssize_t write(ImageCtx *ictx, uint64_t off, size_t len, const char *buf)
     return r;
 
   size_t total_write = 0;
+  ictx->lock.Lock();
   uint64_t start_block = get_block_num(ictx->header, off);
   uint64_t end_block = get_block_num(ictx->header, off + len - 1);
   uint64_t block_size = get_block_size(ictx->header);
+  ictx->lock.Unlock();
   uint64_t left = len;
 
   for (uint64_t i = start_block; i <= end_block; i++) {
     bufferlist bl;
+    ictx->lock.Lock();
     string oid = get_block_oid(ictx->header, i);
     uint64_t block_ofs = get_block_ofs(ictx->header, off + total_write);
+    ictx->lock.Unlock();
     uint64_t write_len = min(block_size - block_ofs, left);
     bl.append(buf + total_write, write_len);
     r = ictx->data_ctx.write(oid, bl, write_len, block_ofs);
@@ -1261,7 +1293,11 @@ void rados_cb(rados_completion_t c, void *arg)
 
 int check_io(ImageCtx *ictx, uint64_t off, uint64_t len)
 {
-  if ((uint64_t)(off + len) > (uint64_t)ictx->header.image_size)
+  ictx->lock.Lock();
+  uint64_t image_size = ictx->header.image_size;
+  ictx->lock.Unlock();
+
+  if ((uint64_t)(off + len) > image_size)
     return -EINVAL;
   return 0;
 }
@@ -1279,9 +1315,11 @@ int aio_write(ImageCtx *ictx, uint64_t off, size_t len, const char *buf,
     return r;
 
   size_t total_write = 0;
+  ictx->lock.Lock();
   uint64_t start_block = get_block_num(ictx->header, off);
   uint64_t end_block = get_block_num(ictx->header, off + len - 1);
   uint64_t block_size = get_block_size(ictx->header);
+  ictx->lock.Unlock();
   uint64_t left = len;
 
   r = check_io(ictx, off, len);
@@ -1290,8 +1328,10 @@ int aio_write(ImageCtx *ictx, uint64_t off, size_t len, const char *buf,
 
   for (uint64_t i = start_block; i <= end_block; i++) {
     bufferlist bl;
+    ictx->lock.Lock();
     string oid = get_block_oid(ictx->header, i);
     uint64_t block_ofs = get_block_ofs(ictx->header, off + total_write);
+    ictx->lock.Unlock();
     uint64_t write_len = min(block_size - block_ofs, left);
     bl.append(buf + total_write, write_len);
     AioBlockCompletion *block_completion = new AioBlockCompletion(c, off, len, NULL);
@@ -1335,15 +1375,19 @@ int aio_read(ImageCtx *ictx, uint64_t off, size_t len,
 
   int64_t ret;
   int total_read = 0;
+  ictx->lock.Lock();
   uint64_t start_block = get_block_num(ictx->header, off);
   uint64_t end_block = get_block_num(ictx->header, off + len);
   uint64_t block_size = get_block_size(ictx->header);
+  ictx->lock.Unlock();
   uint64_t left = len;
 
   for (uint64_t i = start_block; i <= end_block; i++) {
     bufferlist bl;
+    ictx->lock.Lock();
     string oid = get_block_oid(ictx->header, i);
     uint64_t block_ofs = get_block_ofs(ictx->header, off + total_read);
+    ictx->lock.Unlock();
     uint64_t read_len = min(block_size - block_ofs, left);
 
     map<uint64_t,uint64_t> m;