]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
objclass, cls_rgw: update to use omap
authorYehuda Sadeh <yehuda.sadeh@dreamhost.com>
Wed, 7 Mar 2012 01:17:03 +0000 (17:17 -0800)
committerYehuda Sadeh <yehuda.sadeh@dreamhost.com>
Wed, 7 Mar 2012 01:17:03 +0000 (17:17 -0800)
Signed-off-by: Yehuda Sadeh <yehuda.sadeh@dreamhost.com>
src/cls_rgw.cc
src/objclass/class_api.cc
src/objclass/objclass.h

index a6b3e736693ccda2b59d161fb04c9759985941fe..dbf383246c22621d9b7f56bef83c46b5d9cae525 100644 (file)
@@ -30,56 +30,11 @@ static uint64_t get_rounded_size(uint64_t size)
   return (size + ROUND_BLOCK_SIZE - 1) & ~(ROUND_BLOCK_SIZE - 1);
 }
 
-static int read_bucket_dir(cls_method_context_t hctx, struct rgw_bucket_dir& dir)
-{
-  bufferlist bl;
-
-  uint64_t size;
-  int rc = cls_cxx_stat(hctx, &size, NULL);
-  if (rc < 0)
-    return rc;
-
-  rc = cls_cxx_map_read_full(hctx, &bl);
-  if (rc < 0)
-    return rc;
-
-  try {
-    bufferlist::iterator iter = bl.begin();
-    bufferlist header_bl;
-    ::decode(header_bl, iter);
-    bufferlist::iterator header_iter = header_bl.begin();
-    ::decode(dir.header, header_iter);
-    __u32 nkeys = 0;
-    ::decode(nkeys, iter);
-    while (nkeys) {
-      string key;
-      bufferlist value;
-      ::decode(key, iter);
-      ::decode(value, iter);
-      bufferlist::iterator val_iter = value.begin();
-      ::decode(dir.m[key], val_iter);
-      --nkeys;
-    }
-  } catch (buffer::error& err) {
-    CLS_LOG("ERROR: read_bucket_dir(): failed to decode buffer\n");
-    return -EIO;
-  }
-
-  return 0;
-}
-
 int rgw_bucket_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
-  bufferlist bl;
-  struct rgw_bucket_dir dir;
-  int rc = read_bucket_dir(hctx, dir);
-  if (rc < 0)
-    return rc;
-
   bufferlist::iterator iter = in->begin();
 
   struct rgw_cls_list_op op;
-
   try {
     ::decode(op, iter);
   } catch (buffer::error& err) {
@@ -89,15 +44,44 @@ int rgw_bucket_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 
   struct rgw_cls_list_ret ret;
   struct rgw_bucket_dir& new_dir = ret.dir;
-  new_dir.header = dir.header;
+  bufferlist header_bl;
+  int rc = cls_cxx_map_read_header(hctx, &header_bl);
+  if (rc < 0)
+    return rc;
+  bufferlist::iterator header_iter = header_bl.begin();
+  try {
+    ::decode(new_dir.header, header_iter);
+  } catch (buffer::error& err) {
+    CLS_LOG("ERROR: rgw_bucket_complete_op(): failed to decode header\n");
+    return -EINVAL;
+  }
+
+  bufferlist bl;
+
+  map<string, bufferlist> keys;
+  rc = cls_cxx_map_read_keys(hctx, op.start_obj, op.num_entries + 1, &keys);
+  if (rc < 0)
+    return rc;
+
   std::map<string, struct rgw_bucket_dir_entry>& m = new_dir.m;
-  std::map<string, struct rgw_bucket_dir_entry>::iterator miter = dir.m.upper_bound(op.start_obj);
+  std::map<string, bufferlist>::iterator kiter = keys.begin();
   uint32_t i;
-  for (i = 0; i != op.num_entries && miter != dir.m.end(); ++i, ++miter) {
-    m[miter->first] = miter->second;
+
+  for (i = 0; i < op.num_entries && kiter != keys.end(); ++i, ++kiter) {
+    struct rgw_bucket_dir_entry entry;
+    bufferlist& entrybl = kiter->second;
+    bufferlist::iterator eiter = entrybl.begin();
+    try {
+      ::decode(entry, eiter);
+    } catch (buffer::error& err) {
+      CLS_LOG("ERROR: rgw_bucket_list(): failed to decode entry, key=%s\n", kiter->first.c_str());
+      return -EINVAL;
+    }
+    
+    m[kiter->first] = entry;
   }
 
-  ret.is_truncated = (miter != dir.m.end());
+  ret.is_truncated = (kiter != keys.end());
 
   ::encode(ret, *out);
 
@@ -109,23 +93,25 @@ int rgw_bucket_init_index(cls_method_context_t hctx, bufferlist *in, bufferlist
   bufferlist bl;
   bufferlist::iterator iter;
 
-  uint64_t size;
-  int rc = cls_cxx_stat(hctx, &size, NULL);
-  if (rc < 0)
-    return rc;
-  if (size != 0) {
+  bufferlist header_bl;
+  int rc = cls_cxx_map_read_header(hctx, &header_bl);
+  if (rc < 0) {
+    switch (rc) {
+    case -ENODATA:
+    case -ENOENT:
+      break;
+    default:
+      return rc;
+    }
+  }
+
+  if (header_bl.length() != 0) {
     CLS_LOG("ERROR: index already initialized\n");
-    return -EINVAL;
   }
 
   rgw_bucket_dir dir;
-  bufferlist map_bl;
-  bufferlist header_bl;
   ::encode(dir.header, header_bl);
-  ::encode(header_bl, map_bl);
-  __u32 num_keys = 0;
-  ::encode(num_keys, map_bl);
-  rc = cls_cxx_map_write_full(hctx, &map_bl);
+  rc = cls_cxx_map_write_header(hctx, &header_bl);
   return rc;
 }
 
@@ -197,7 +183,12 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
   if (rc < 0)
     return rc;
   bufferlist::iterator header_iter = header_bl.begin();
-  ::decode(header, header_iter);
+  try {
+    ::decode(header, header_iter);
+  } catch (buffer::error& err) {
+    CLS_LOG("ERROR: rgw_bucket_complete_op(): failed to decode header\n");
+    return -EINVAL;
+  }
 
   bufferlist current_entry;
   struct rgw_bucket_dir_entry entry;
@@ -216,7 +207,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
   } else {
     bufferlist::iterator cur_iter = current_entry.begin();
     ::decode(entry, cur_iter);
-    CLS_LOG("rgw_bucket_complete_op(): existing entry: epoch=%lld\n", entry.epoch);
+    CLS_LOG("rgw_bucket_complete_op(): existing entry: epoch=%lld name=%s locator=%s\n", entry.epoch, entry.name.c_str(), entry.locator.c_str());
   }
 
   if (op.tag.size()) {
@@ -246,15 +237,16 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
   case CLS_RGW_OP_DEL:
     if (ondisk) {
       if (!entry.pending_map.size()) {
-        op_bl.append(CEPH_OSD_TMAP_RM);
-        ::encode(op.name, op_bl);
+       int ret = cls_cxx_map_remove_key(hctx, op.name);
+       if (ret < 0)
+         return ret;
       } else {
         entry.exists = false;
         bufferlist new_key_bl;
         ::encode(entry, new_key_bl);
-        op_bl.append(CEPH_OSD_TMAP_SET);
-        ::encode(op.name, op_bl);
-        ::encode(new_key_bl, op_bl);
+       int ret = cls_cxx_map_write_key(hctx, op.name, &new_key_bl);
+       if (ret < 0)
+         return ret;
       }
     } else {
       return -ENOENT;
@@ -273,20 +265,16 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
       stats.total_size_rounded += get_rounded_size(meta.size);
       bufferlist new_key_bl;
       ::encode(entry, new_key_bl);
-      op_bl.append(CEPH_OSD_TMAP_SET);
-      ::encode(op.name, op_bl);
-      ::encode(new_key_bl, op_bl);
+      int ret = cls_cxx_map_write_key(hctx, op.name, &new_key_bl);
+      if (ret < 0)
+       return ret;
     }
     break;
   }
 
-  bufferlist update_bl;
   bufferlist new_header_bl;
   ::encode(header, new_header_bl);
-  update_bl.append(CEPH_OSD_TMAP_HDR);
-  ::encode(new_header_bl, update_bl);
-  update_bl.claim_append(op_bl);
-  return cls_cxx_map_update(hctx, &update_bl);
+  return cls_cxx_map_write_header(hctx, &new_header_bl);
 }
 
 int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
@@ -332,6 +320,8 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis
       }
     }
 
+    int ret = 0;
+
     if (cur_disk.pending_map.empty()) {
       struct rgw_bucket_category_stats& stats =
           header.stats[cur_disk.meta.category];
@@ -343,8 +333,9 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis
       }
       switch(op) {
       case CEPH_RGW_REMOVE:
-        op_bl.append(CEPH_OSD_TMAP_RM);
-        ::encode(cur_change.name, op_bl);
+       ret = cls_cxx_map_remove_key(hctx, cur_change.name);
+       if (ret < 0)
+         return ret;
         break;
       case CEPH_RGW_UPDATE:
         stats.num_entries++;
@@ -352,8 +343,9 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis
         stats.total_size_rounded += get_rounded_size(cur_change.meta.size);
         bufferlist cur_state_bl;
         ::encode(cur_change, cur_state_bl);
-        op_bl.append(CEPH_OSD_TMAP_SET);
-        ::encode(cur_state_bl, op_bl);
+        ret = cls_cxx_map_write_key(hctx, cur_change.name, &cur_state_bl);
+        if (ret < 0)
+         return ret;
         break;
       }
     }
@@ -363,11 +355,9 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis
   if (header_changed) {
     bufferlist new_header_bl;
     ::encode(header, new_header_bl);
-    update_bl.append(CEPH_OSD_TMAP_HDR);
-    ::encode(new_header_bl, update_bl);
+    return cls_cxx_map_write_header(hctx, &new_header_bl);
   }
-  update_bl.claim_append(op_bl);
-  return cls_cxx_map_update(hctx, &update_bl);
+  return 0;
 }
 
 void __cls_init()
index be65f6d8f1969447ca3e290cc865b3c9a4de4bc5..ba3bb84e7e67181500e4726c0cb0042345d9723a 100644 (file)
@@ -243,119 +243,148 @@ int cls_cxx_snap_revert(cls_method_context_t hctx, snapid_t snapid)
   return (*pctx)->pg->do_osd_ops(*pctx, ops);
 }
 
-int cls_cxx_map_read_full(cls_method_context_t hctx, bufferlist* outbl)
+int cls_cxx_map_read_all_keys(cls_method_context_t hctx, map<string, bufferlist>* vals)
 {
   ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
   vector<OSDOp> ops(1);
+  OSDOp& op = ops[0];
   int ret;
-  ops[0].op.op = CEPH_OSD_OP_TMAPGET;
+
+  string start_after;
+  uint64_t max = (uint64_t)-1;
+  bufferlist inbl;
+
+  ::encode(start_after, op.indata);
+  ::encode(max, op.indata);
+
+  op.op.op = CEPH_OSD_OP_OMAPGETVALS;
+  
   ret = (*pctx)->pg->do_osd_ops(*pctx, ops);
   if (ret < 0)
     return ret;
-  outbl->claim(ops[0].outdata);
-  return outbl->length();
+
+  bufferlist::iterator iter = op.outdata.begin();
+  try {
+    ::decode(*vals, iter);
+  } catch (buffer::error& err) {
+    return -EIO;
+  }
+  return vals->size();
 }
 
-int cls_cxx_map_read_header(cls_method_context_t hctx, bufferlist *outbl)
+int cls_cxx_map_read_keys(cls_method_context_t hctx, string& start_obj, uint64_t max, map<string, bufferlist>* vals)
 {
   ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
   vector<OSDOp> ops(1);
+  OSDOp& op = ops[0];
   int ret;
-  ops[0].op.op = CEPH_OSD_OP_TMAPGET;
+
+  string start_after;
+  bufferlist inbl;
+
+  ::encode(start_obj, op.indata);
+  ::encode(max, op.indata);
+
+  op.op.op = CEPH_OSD_OP_OMAPGETVALS;
+  
   ret = (*pctx)->pg->do_osd_ops(*pctx, ops);
   if (ret < 0)
     return ret;
 
+  bufferlist::iterator iter = op.outdata.begin();
   try {
-    //decode and return the header
-    bufferlist::iterator map_iter = ops[0].outdata.begin();
-    ::decode(*outbl, map_iter);
-  } catch (buffer::error& e) {
-    return -EINVAL;
+    ::decode(*vals, iter);
+  } catch (buffer::error& err) {
+    return -EIO;
   }
+  return vals->size();
+}
+
+int cls_cxx_map_read_header(cls_method_context_t hctx, bufferlist *outbl)
+{
+  ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
+  vector<OSDOp> ops(1);
+  OSDOp& op = ops[0];
+  int ret;
+  op.op.op = CEPH_OSD_OP_OMAPGETHEADER;
+  ret = (*pctx)->pg->do_osd_ops(*pctx, ops);
+  if (ret < 0)
+    return ret;
+
+  outbl->claim(op.outdata);
+
   return 0;
 }
 int cls_cxx_map_read_key(cls_method_context_t hctx, string key, bufferlist *outbl)
 {
   ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
   vector<OSDOp> ops(1);
+  OSDOp& op = ops[0];
   int ret;
-  ops[0].op.op = CEPH_OSD_OP_TMAPGET;
+
+  set<string> k;
+  k.insert(key);
+  ::encode(k, op.indata);
+
+  op.op.op = CEPH_OSD_OP_OMAPGETVALSBYKEYS;
   ret = (*pctx)->pg->do_osd_ops(*pctx, ops);
   if (ret < 0)
     return ret;
 
-  //find and return just the requested key!
-  bufferlist header;
-  string next_key;
-  bufferlist next_val;
-  __u32 nkeys;
-  bufferlist::iterator map_iter = ops[0].outdata.begin();
+  bufferlist::iterator iter = op.outdata.begin();
   try {
-    ::decode(header, map_iter);
-    ::decode(nkeys, map_iter);
-    while (nkeys) {
-      ::decode(next_key, map_iter);
-      ::decode(next_val, map_iter);
-      if (next_key == key) {
-        *outbl = next_val;
-        return 0;
-      }
-      if (next_key > key)
-        return -ENOENT;
-      --nkeys;
-    }
+    map<string, bufferlist> m;
+
+    ::decode(m, iter);
+    map<string, bufferlist>::iterator iter = m.begin();
+    if (iter == m.end())
+      return -ENOENT;
+
+    *outbl = iter->second;
   } catch (buffer::error& e) {
-    return -EINVAL;
+    return -EIO;
   }
-  return -ENOENT;
+  return 0;
 }
 
-int cls_cxx_map_write_full(cls_method_context_t hctx, bufferlist* inbl)
+int cls_cxx_map_write_key(cls_method_context_t hctx, string key, bufferlist *inbl)
 {
   ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
   vector<OSDOp> ops(1);
-  ops[0].op.op = CEPH_OSD_OP_TMAPPUT;
-  ops[0].indata = *inbl;
-  return (*pctx)->pg->do_osd_ops(*pctx, ops);
-}
+  OSDOp& op = ops[0];
+  bufferlist& update_bl = op.indata;
+  map<string, bufferlist> m;
+  m[key] = *inbl;
+  ::encode(m, update_bl);
 
-int cls_cxx_map_write_key(cls_method_context_t hctx, string key, bufferlist *inbl)
-{
-  bufferlist update_bl;
-  update_bl.append(CEPH_OSD_TMAP_SET);
-  ::encode(key, update_bl);
-  ::encode(*inbl, update_bl);
-  return cls_cxx_map_update(hctx, &update_bl);
+  op.op.op = CEPH_OSD_OP_OMAPSETVALS;
+
+  return (*pctx)->pg->do_osd_ops(*pctx, ops);
 }
 
 int cls_cxx_map_write_header(cls_method_context_t hctx, bufferlist *inbl)
 {
-  ReplicatedPG::OpContext **pctx = static_cast<ReplicatedPG::OpContext **>(hctx);
+  ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
   vector<OSDOp> ops(1);
-  bufferlist update_bl;
-  update_bl.append(CEPH_OSD_TMAP_HDR);
+  OSDOp& op = ops[0];
+  bufferlist& update_bl = op.indata;
   ::encode(*inbl, update_bl);
 
-  ops[0].op.op = CEPH_OSD_OP_TMAPUP;
-  ops[0].indata = update_bl;
+  op.op.op = CEPH_OSD_OP_OMAPSETHEADER;
 
   return (*pctx)->pg->do_osd_ops(*pctx, ops);
 }
 
 int cls_cxx_map_remove_key(cls_method_context_t hctx, string key)
-{
-  bufferlist update_bl;
-  update_bl.append(CEPH_OSD_TMAP_RM);
-  ::encode(key, update_bl);
-  return cls_cxx_map_update(hctx, &update_bl);
-}
-
-int cls_cxx_map_update(cls_method_context_t hctx, bufferlist* inbl)
 {
   ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
   vector<OSDOp> ops(1);
-  ops[0].op.op = CEPH_OSD_OP_TMAPUP;
-  ops[0].indata = *inbl;
+  OSDOp& op = ops[0];
+  bufferlist& update_bl = op.indata;
+  set<string> to_rm;
+  to_rm.insert(key);
+
+  ::encode(to_rm, update_bl);
   return (*pctx)->pg->do_osd_ops(*pctx, ops);
 }
+
index 822558c7d6b7a5d385c5b1cf238832e1c6460311..3b4fb53f8c93ce2145c7be5b6d84d32bcaba068a 100644 (file)
@@ -93,10 +93,10 @@ extern int cls_cxx_write(cls_method_context_t hctx, int ofs, int len, bufferlist
 extern int cls_cxx_write_full(cls_method_context_t hctx, bufferlist *bl);
 extern int cls_cxx_replace(cls_method_context_t hctx, int ofs, int len, bufferlist *bl);
 extern int cls_cxx_snap_revert(cls_method_context_t hctx, snapid_t snapid);
-extern int cls_cxx_map_read_full(cls_method_context_t hctx, bufferlist* outbl);
+extern int cls_cxx_map_read_all_keys(cls_method_context_t hctx, std::map<string, bufferlist> *keys);
+extern int cls_cxx_map_read_keys(cls_method_context_t hctx, string& start_after, uint64_t max, std::map<string, bufferlist> *keys);
 extern int cls_cxx_map_read_header(cls_method_context_t hctx, bufferlist *outbl);
 extern int cls_cxx_map_read_key(cls_method_context_t hctx, string key, bufferlist *outbl);
-extern int cls_cxx_map_write_full(cls_method_context_t hctx, bufferlist* in);
 extern int cls_cxx_map_write_key(cls_method_context_t hctx, string key, bufferlist *inbl);
 extern int cls_cxx_map_write_header(cls_method_context_t hctx, bufferlist *inbl);
 extern int cls_cxx_map_remove_key(cls_method_context_t hctx, string key);