]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: atomic get, basic flow for severl rados ops
authorYehuda Sadeh <yehuda@hq.newdream.net>
Wed, 3 Aug 2011 23:59:33 +0000 (16:59 -0700)
committerYehuda Sadeh <yehuda@hq.newdream.net>
Thu, 4 Aug 2011 22:58:33 +0000 (15:58 -0700)
not working yet

src/rgw/rgw_access.h
src/rgw/rgw_common.h
src/rgw/rgw_op.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 8b59ce6fcda1be9c36e7003727d714af5f201ed1..33e5866819fbd197f83aa835899fa79532c8eef3 100644 (file)
@@ -244,6 +244,7 @@ public:
 
   virtual void *create_context() { return NULL; }
   virtual void destroy_context(void *ctx) {}
+  virtual void set_atomic(void *ctx, rgw_obj& obj) {}
 };
 
 class RGWStoreManager {
index 677b25ef135392933ac2fec9d3740d398f9c496c..38c6e0ff78323d8970864d19596b88cc0f45ab0b 100644 (file)
@@ -48,6 +48,7 @@ extern string rgw_root_bucket;
 #define RGW_ATTR_META_PREFIX   RGW_ATTR_PREFIX "x-amz-meta-"
 #define RGW_ATTR_CONTENT_TYPE  RGW_ATTR_PREFIX "content_type"
 #define RGW_ATTR_ID_TAG        RGW_ATTR_PREFIX "idtag"
+#define RGW_ATTR_SHADOW_OBJ            RGW_ATTR_PREFIX "shadow_name"
 
 #define RGW_BUCKETS_OBJ_PREFIX ".buckets"
 
index c02d167de02d517131fbd33d7f00a8f5cea7722d..187be0b2231f54d5ff05a0f257424c0983ab9f14 100644 (file)
@@ -689,6 +689,7 @@ void RGWPutObj::execute()
 
     if (!multipart) {
       rgw_obj dst_obj(s->bucket_str, s->object_str);
+      rgwstore->set_atomic(s->obj_ctx, dst_obj);
       ret = rgwstore->clone_obj(s->obj_ctx, dst_obj, 0, obj, 0, s->obj_size, attrs);
       if (ret < 0)
         goto done_err;
index b9a9501aecd9ecc08a175e7ee8ca4b16ea2e8205..2064c729755fcca193fc982d67e878bf1d961654 100644 (file)
@@ -701,12 +701,35 @@ int RGWRados::get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io
 
   *state = s;
 
-  int r = io_ctx.getxattrs(actual_obj, s->attrset);
+  ObjectReadOperation op;
+  op.getxattrs();
+  op.stat();
+  bufferlist outbl;
+  int r = io_ctx.operate(actual_obj, &op, &outbl);
   if (r < 0)
     return r;
 
+  RGW_LOG(0) << "outbl.length()=" << outbl.length() << dendl;
+
+  /* argh for this whole block */
+  bufferlist::iterator oiter = outbl.begin();
+  bufferlist bl;
+  ::decode(bl, oiter);
+  bufferlist::iterator bliter = bl.begin();
+  ::decode(s->attrset, bliter);
+  ::decode(s->size, bliter);
+  ::decode(bl, oiter);
+  bliter = bl.begin();
+  ::decode(s->mtime, oiter);
+
   s->has_attrs = true;
-  s->ver = io_ctx.get_last_version();
+  map<string, bufferlist>::iterator iter = s->attrset.find(RGW_ATTR_SHADOW_OBJ);
+  if (iter != s->attrset.end()) {
+    bufferlist bl = iter->second;
+    bufferlist::iterator it = bl.begin();
+    ::decode(s->shadow_obj, it);
+  }
+  s->obj_tag = s->attrset[RGW_ATTR_ID_TAG];
   return 0;
 }
 
@@ -755,6 +778,80 @@ int RGWRados::get_attr(void *ctx, rgw_obj& obj, const char *name, bufferlist& de
   return 0;
 }
 
+int RGWRados::append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
+                            string& actual_obj, ObjectOperation& op, RGWObjState **pstate)
+{
+  if (!rctx)
+    return 0;
+
+  int r = get_obj_state(rctx, obj, io_ctx, actual_obj, pstate);
+  if (r < 0)
+    return r;
+
+  RGWObjState *state = *pstate;
+
+  if (!state->is_atomic)
+    return 0;
+
+  if (state->obj_tag.length() > 0) {// check for backward compatibility
+    op.cmpxattr(RGW_ATTR_ID_TAG, LIBRADOS_CMPXATTR_OP_EQ, state->obj_tag);
+  }
+  return 0;
+}
+
+int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
+                            string& actual_obj, ObjectOperation& op, RGWObjState **pstate)
+{
+  if (!rctx)
+    return 0;
+
+  int r = get_obj_state(rctx, obj, io_ctx, actual_obj, pstate);
+  if (r < 0)
+    return r;
+
+  RGWObjState *state = *pstate;
+
+  if (!state->is_atomic)
+    return 0;
+
+  if (state->obj_tag.length() == 0 ||
+      state->shadow_obj.size() == 0) {
+    RGW_LOG(0) << "can't clone object " << obj << " to shadow object, tag/shadow_obj haven't been set" << dendl;
+  } else {
+    RGW_LOG(0) << "cloning object " << obj << " to name=" << state->shadow_obj << dendl;
+    rgw_obj dest_obj(state->shadow_obj, obj.bucket);
+    if (obj.key.size())
+      dest_obj.set_key(obj.key);
+    else
+      dest_obj.set_key(obj.object);
+
+    /* FIXME: clone obj should be conditional, should check src object id-tag */
+    r = clone_obj(NULL, dest_obj, 0, obj, 0, state->size, state->attrset);
+    if (r < 0) {
+      RGW_LOG(0) << "ERROR: failed to clone object r=" << r << dendl;
+      return r;
+    }
+  }
+
+  if (state->obj_tag.length() > 0) {// check for backward compatibility
+    string tag;
+    append_rand_alpha(tag, tag, 32);
+    bufferlist bl;
+    bl.append(tag);
+    
+    op.setxattr(RGW_ATTR_ID_TAG, bl);
+
+    string shadow = obj.object;
+    shadow.append(".");
+    shadow.append(tag);
+
+    bufferlist shadow_bl;
+    shadow_bl.append(shadow);
+    op.setxattr(RGW_ATTR_SHADOW_OBJ, bl);
+  }
+  return 0;
+}
+
 /**
  * Set an attr on an object.
  * bucket: name of the bucket holding the object
@@ -783,23 +880,27 @@ int RGWRados::set_attr(void *ctx, rgw_obj& obj, const char *name, bufferlist& bl
 
   io_ctx.locator_set_key(obj.key);
 
-  if (rctx) {
-    RGWObjState *state;
-    r = get_obj_state(rctx, obj, io_ctx, actual_obj, &state);
-    if (r < 0)
-      return r;
-    ObjectOperation op;
-    if (state->obj_tag.length() > 0) // check for backward compatibility
-      op.cmpxattr(RGW_ATTR_ID_TAG, state->obj_tag);
-    op.setxattr(name, bl);
+  ObjectOperation op;
+  RGWObjState *state = NULL;
 
-    bufferlist outbl;
-    r = io_ctx.operate(actual_obj, &op, &outbl);
+  string shadow_name;
 
-    if (r >= 0)
-      state->attrset[name] = bl;
-  } else {
-    r = io_ctx.setxattr(actual_obj, name, bl);
+  r = append_atomic_test(rctx, obj, io_ctx, actual_obj, op, &state);
+  if (r < 0)
+    return r;
+
+  bufferlist outbl;
+  op.setxattr(name, bl);
+  r = io_ctx.operate(actual_obj, &op, &outbl);
+
+  if (state && r >= 0)
+    state->attrset[name] = bl;
+
+  if (r == -ECANCELED) {
+    /* a race! object was replaced, we need to set attr on the original obj */
+    dout(0) << "RGWRados::set_attr: raced with another process, going to the shadow obj instead" << dendl;
+    rgw_obj shadow(state->shadow_obj, obj.bucket, obj.key);
+    r = set_attr(NULL, shadow, name, bl);
   }
 
   if (r < 0)
@@ -975,6 +1076,7 @@ int RGWRados::clone_objs(void *ctx, rgw_obj& dst_obj,
   std::string& bucket = dst_obj.bucket;
   std::string& dst_oid = dst_obj.object;
   librados::IoCtx io_ctx;
+  RGWRadosCtx *rctx = (RGWRadosCtx *)ctx;
 
   int r = open_bucket_ctx(bucket, io_ctx);
   if (r < 0)
@@ -991,6 +1093,11 @@ int RGWRados::clone_objs(void *ctx, rgw_obj& dst_obj,
     bufferlist& bl = iter->second;
     op.setxattr(name.c_str(), bl);
   }
+  RGWObjState *state;
+  r = prepare_atomic_for_write(rctx, dst_obj, io_ctx, dst_oid, op, &state);
+  if (r < 0)
+    return r;
+
   vector<RGWCloneRangeInfo>::iterator range_iter;
   for (range_iter = ranges.begin(); range_iter != ranges.end(); ++range_iter) {
     RGWCloneRangeInfo range = *range_iter;
index 890e3eed74811b69993e2d5f42b52e260b93e788..bf3bfd7c8d02d4ed215295249617a0da847e3b25 100644 (file)
@@ -9,12 +9,14 @@ class RGWWatcher;
 
 struct RGWObjState {
   bool is_atomic;
-  uint64_t ver;
   bool has_attrs;
+  uint64_t size;
+  time_t mtime;
   bufferlist obj_tag;
+  string shadow_obj;
 
   map<string, bufferlist> attrset;
-  RGWObjState() : is_atomic(false), ver(0), has_attrs(0) {}
+  RGWObjState() : is_atomic(false), has_attrs(0) {}
 
   bool get_attr(string name, bufferlist& dest) {
     map<string, bufferlist>::iterator iter = attrset.find(name);
@@ -31,6 +33,9 @@ struct RGWRadosCtx {
   RGWObjState *get_state(rgw_obj& obj) {
     return &objs_state[obj];
   }
+  void set_atomic(rgw_obj& obj) {
+    objs_state[obj].is_atomic = true;
+  }
 };
   
 class RGWRados  : public RGWAccess
@@ -55,6 +60,10 @@ class RGWRados  : public RGWAccess
   librados::IoCtx control_pool_ctx;
 
   int get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, string& actual_obj, RGWObjState **state);
+  int append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
+                         string& actual_obj, librados::ObjectOperation& op, RGWObjState **state);
+  int prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
+                         string& actual_obj, librados::ObjectOperation& op, RGWObjState **pstate);
 public:
   RGWRados() : watcher(NULL), watch_handle(0) {}
 
@@ -164,8 +173,11 @@ public:
   void destroy_context(void *ctx) {
     delete (RGWRadosCtx *)ctx;
   }
+  void set_atomic(void *ctx, rgw_obj& obj) {
+    RGWRadosCtx *rctx = (RGWRadosCtx *)ctx;
+    rctx->set_atomic(obj);
+  }
 
-  void set_atomic_ops(void *ctx, rgw_obj& obj);
 };
 
 #endif