*state = s;
- int r = io_ctx.getxattrs(actual_obj, s->attrset);
+ ObjectReadOperation op;
+ op.getxattrs();
+ op.stat();
+ bufferlist outbl;
+ int r = io_ctx.operate(actual_obj, &op, &outbl);
if (r < 0)
return r;
+ RGW_LOG(0) << "outbl.length()=" << outbl.length() << dendl;
+
+ /* argh for this whole block */
+ bufferlist::iterator oiter = outbl.begin();
+ bufferlist bl;
+ ::decode(bl, oiter);
+ bufferlist::iterator bliter = bl.begin();
+ ::decode(s->attrset, bliter);
+ ::decode(s->size, bliter);
+ ::decode(bl, oiter);
+ bliter = bl.begin();
+ ::decode(s->mtime, oiter);
+
s->has_attrs = true;
- s->ver = io_ctx.get_last_version();
+ map<string, bufferlist>::iterator iter = s->attrset.find(RGW_ATTR_SHADOW_OBJ);
+ if (iter != s->attrset.end()) {
+ bufferlist bl = iter->second;
+ bufferlist::iterator it = bl.begin();
+ ::decode(s->shadow_obj, it);
+ }
+ s->obj_tag = s->attrset[RGW_ATTR_ID_TAG];
return 0;
}
return 0;
}
+int RGWRados::append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
+ string& actual_obj, ObjectOperation& op, RGWObjState **pstate)
+{
+ if (!rctx)
+ return 0;
+
+ int r = get_obj_state(rctx, obj, io_ctx, actual_obj, pstate);
+ if (r < 0)
+ return r;
+
+ RGWObjState *state = *pstate;
+
+ if (!state->is_atomic)
+ return 0;
+
+ if (state->obj_tag.length() > 0) {// check for backward compatibility
+ op.cmpxattr(RGW_ATTR_ID_TAG, LIBRADOS_CMPXATTR_OP_EQ, state->obj_tag);
+ }
+ return 0;
+}
+
+int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
+ string& actual_obj, ObjectOperation& op, RGWObjState **pstate)
+{
+ if (!rctx)
+ return 0;
+
+ int r = get_obj_state(rctx, obj, io_ctx, actual_obj, pstate);
+ if (r < 0)
+ return r;
+
+ RGWObjState *state = *pstate;
+
+ if (!state->is_atomic)
+ return 0;
+
+ if (state->obj_tag.length() == 0 ||
+ state->shadow_obj.size() == 0) {
+ RGW_LOG(0) << "can't clone object " << obj << " to shadow object, tag/shadow_obj haven't been set" << dendl;
+ } else {
+ RGW_LOG(0) << "cloning object " << obj << " to name=" << state->shadow_obj << dendl;
+ rgw_obj dest_obj(state->shadow_obj, obj.bucket);
+ if (obj.key.size())
+ dest_obj.set_key(obj.key);
+ else
+ dest_obj.set_key(obj.object);
+
+ /* FIXME: clone obj should be conditional, should check src object id-tag */
+ r = clone_obj(NULL, dest_obj, 0, obj, 0, state->size, state->attrset);
+ if (r < 0) {
+ RGW_LOG(0) << "ERROR: failed to clone object r=" << r << dendl;
+ return r;
+ }
+ }
+
+ if (state->obj_tag.length() > 0) {// check for backward compatibility
+ string tag;
+ append_rand_alpha(tag, tag, 32);
+ bufferlist bl;
+ bl.append(tag);
+
+ op.setxattr(RGW_ATTR_ID_TAG, bl);
+
+ string shadow = obj.object;
+ shadow.append(".");
+ shadow.append(tag);
+
+ bufferlist shadow_bl;
+ shadow_bl.append(shadow);
+ op.setxattr(RGW_ATTR_SHADOW_OBJ, bl);
+ }
+ return 0;
+}
+
/**
* Set an attr on an object.
* bucket: name of the bucket holding the object
io_ctx.locator_set_key(obj.key);
- if (rctx) {
- RGWObjState *state;
- r = get_obj_state(rctx, obj, io_ctx, actual_obj, &state);
- if (r < 0)
- return r;
- ObjectOperation op;
- if (state->obj_tag.length() > 0) // check for backward compatibility
- op.cmpxattr(RGW_ATTR_ID_TAG, state->obj_tag);
- op.setxattr(name, bl);
+ ObjectOperation op;
+ RGWObjState *state = NULL;
- bufferlist outbl;
- r = io_ctx.operate(actual_obj, &op, &outbl);
+ string shadow_name;
- if (r >= 0)
- state->attrset[name] = bl;
- } else {
- r = io_ctx.setxattr(actual_obj, name, bl);
+ r = append_atomic_test(rctx, obj, io_ctx, actual_obj, op, &state);
+ if (r < 0)
+ return r;
+
+ bufferlist outbl;
+ op.setxattr(name, bl);
+ r = io_ctx.operate(actual_obj, &op, &outbl);
+
+ if (state && r >= 0)
+ state->attrset[name] = bl;
+
+ if (r == -ECANCELED) {
+ /* a race! object was replaced, we need to set attr on the original obj */
+ dout(0) << "RGWRados::set_attr: raced with another process, going to the shadow obj instead" << dendl;
+ rgw_obj shadow(state->shadow_obj, obj.bucket, obj.key);
+ r = set_attr(NULL, shadow, name, bl);
}
if (r < 0)
std::string& bucket = dst_obj.bucket;
std::string& dst_oid = dst_obj.object;
librados::IoCtx io_ctx;
+ RGWRadosCtx *rctx = (RGWRadosCtx *)ctx;
int r = open_bucket_ctx(bucket, io_ctx);
if (r < 0)
bufferlist& bl = iter->second;
op.setxattr(name.c_str(), bl);
}
+ RGWObjState *state;
+ r = prepare_atomic_for_write(rctx, dst_obj, io_ctx, dst_oid, op, &state);
+ if (r < 0)
+ return r;
+
vector<RGWCloneRangeInfo>::iterator range_iter;
for (range_iter = ranges.begin(); range_iter != ranges.end(); ++range_iter) {
RGWCloneRangeInfo range = *range_iter;
struct RGWObjState {
bool is_atomic;
- uint64_t ver;
bool has_attrs;
+ uint64_t size;
+ time_t mtime;
bufferlist obj_tag;
+ string shadow_obj;
map<string, bufferlist> attrset;
- RGWObjState() : is_atomic(false), ver(0), has_attrs(0) {}
+ RGWObjState() : is_atomic(false), has_attrs(0) {}
bool get_attr(string name, bufferlist& dest) {
map<string, bufferlist>::iterator iter = attrset.find(name);
RGWObjState *get_state(rgw_obj& obj) {
return &objs_state[obj];
}
+ void set_atomic(rgw_obj& obj) {
+ objs_state[obj].is_atomic = true;
+ }
};
class RGWRados : public RGWAccess
librados::IoCtx control_pool_ctx;
int get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, string& actual_obj, RGWObjState **state);
+ int append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
+ string& actual_obj, librados::ObjectOperation& op, RGWObjState **state);
+ int prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
+ string& actual_obj, librados::ObjectOperation& op, RGWObjState **pstate);
public:
RGWRados() : watcher(NULL), watch_handle(0) {}
void destroy_context(void *ctx) {
delete (RGWRadosCtx *)ctx;
}
+ void set_atomic(void *ctx, rgw_obj& obj) {
+ RGWRadosCtx *rctx = (RGWRadosCtx *)ctx;
+ rctx->set_atomic(obj);
+ }
- void set_atomic_ops(void *ctx, rgw_obj& obj);
};
#endif