if (is_complete)
return;
- list<rgw_obj>::iterator iter;
+ set<rgw_obj>::iterator iter;
bool is_multipart_obj = false;
rgw_obj multipart_obj;
* details is describled on #11749
*/
for (iter = written_objs.begin(); iter != written_objs.end(); ++iter) {
- rgw_obj &obj = *iter;
+ const rgw_obj &obj = *iter;
if (RGW_OBJ_NS_MULTIPART == obj.ns) {
ldout(store->ctx(), 5) << "NOTE: we should not process the multipart object (" << obj << ") here" << dendl;
multipart_obj = *iter;
obj_len = abs_ofs + bl.length();
if (!(obj == last_written_obj)) {
- add_written_obj(obj);
last_written_obj = obj;
}
bl,
((ofs != 0) ? ofs : -1),
exclusive, phandle);
-
return r;
}
}
struct put_obj_aio_info info = pop_pending();
int ret = store->aio_wait(info.handle);
+
+ if (ret >= 0) {
+ add_written_obj(info.obj);
+ }
+
return ret;
}
return ret;
}
-int RGWPutObjProcessor_Aio::throttle_data(void *handle, bool need_to_wait)
+int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait)
{
if (handle) {
struct put_obj_aio_info info;
info.handle = handle;
+ info.obj = obj;
pending.push_back(info);
}
size_t orig_size = pending.size();
return 0;
}
-int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive)
+int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool exclusive)
{
if (ofs >= next_part_ofs) {
int r = prepare_next_part(ofs);
}
}
+ *pobj = cur_obj;
+
return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
}
-int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again)
+int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again)
{
*again = false;
bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there
we could be racing with another upload, to the same
object and cleanup can be messy */
- int ret = write_data(bl, write_ofs, phandle, exclusive);
+ int ret = write_data(bl, write_ofs, phandle, pobj, exclusive);
if (ret >= 0) { /* we might return, need to clear bl as it was already sent */
if (hash) {
hash->Update((const byte *)bl.c_str(), bl.length());
}
if (pending_data_bl.length()) {
void *handle;
- int r = write_data(pending_data_bl, data_ofs, &handle, false);
+ rgw_obj obj;
+ int r = write_data(pending_data_bl, data_ofs, &handle, &obj, false);
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl;
return r;
}
- r = throttle_data(handle, false);
+ r = throttle_data(handle, obj, false);
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl;
return r;
do {
void *handle;
- int ret = processor->handle_data(bl, ofs, NULL, &handle, &again);
+ rgw_obj obj;
+ int ret = processor->handle_data(bl, ofs, NULL, &handle, &obj, &again);
if (ret < 0)
return ret;
ret = opstate->renew_state();
if (ret < 0) {
ldout(processor->ctx(), 0) << "ERROR: RGWRadosPutObj::handle_data(): failed to renew op state ret=" << ret << dendl;
- int r = processor->throttle_data(handle, false);
+ int r = processor->throttle_data(handle, obj, false);
if (r < 0) {
ldout(processor->ctx(), 0) << "ERROR: RGWRadosPutObj::handle_data(): processor->throttle_data() returned " << r << dendl;
}
need_opstate = false;
}
- ret = processor->throttle_data(handle, false);
+ ret = processor->throttle_data(handle, obj, false);
if (ret < 0)
return ret;
} while (again);
do {
void *handle;
+ rgw_obj obj;
- ret = processor.handle_data(bl, ofs, NULL, &handle, &again);
+ ret = processor.handle_data(bl, ofs, NULL, &handle, &obj, &again);
if (ret < 0) {
return ret;
}
- ret = processor.throttle_data(handle, false);
+ ret = processor.throttle_data(handle, obj, false);
if (ret < 0)
return ret;
} while (again);
return 0;
}
-int RGWRados::delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, rgw_obj& obj,
+int RGWRados::delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& obj,
int versioning_status, uint16_t bilog_flags)
{
RGWRados::Object del_target(this, bucket_info, obj_ctx, obj);
int complete_atomic_modification();
public:
- Object(RGWRados *_store, RGWBucketInfo& _bucket_info, RGWObjectCtx& _ctx, rgw_obj& _obj) : store(_store), bucket_info(_bucket_info),
+ Object(RGWRados *_store, RGWBucketInfo& _bucket_info, RGWObjectCtx& _ctx, const rgw_obj& _obj) : store(_store), bucket_info(_bucket_info),
ctx(_ctx), obj(_obj), bs(store),
state(NULL), versioning_disabled(false),
bs_initialized(false) {}
int bucket_suspended(rgw_bucket& bucket, bool *suspended);
/** Delete an object.*/
- virtual int delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_owner, rgw_obj& src_obj,
+ virtual int delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_owner, const rgw_obj& src_obj,
int versioning_status, uint16_t bilog_flags = 0);
/* Delete a system object */
store = _store;
return 0;
}
- virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again) = 0;
- virtual int throttle_data(void *handle, bool need_to_wait) = 0;
+ virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again) = 0;
+ virtual int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) = 0;
virtual void complete_hash(MD5 *hash) {
assert(0);
}
struct put_obj_aio_info {
void *handle;
+ rgw_obj obj;
};
class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
protected:
uint64_t obj_len;
- list<rgw_obj> written_objs;
+ set<rgw_obj> written_objs;
void add_written_obj(const rgw_obj& obj) {
- written_objs.push_back(obj);
+ written_objs.insert(obj);
}
int drain_pending();
int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive);
public:
- int throttle_data(void *handle, bool need_to_wait);
+ int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait);
RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info) : RGWPutObjProcessor(obj_ctx, bucket_info), max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {}
virtual ~RGWPutObjProcessor_Aio();
RGWObjManifest manifest;
RGWObjManifest::generator manifest_gen;
- int write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive);
+ int write_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool exclusive);
virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime,
map<string, bufferlist>& attrs,
const char *if_match = NULL, const char *if_nomatch = NULL);
void set_extra_data_len(uint64_t len) {
extra_data_len = len;
}
- virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again);
+ virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again);
virtual void complete_hash(MD5 *hash);
bufferlist& get_extra_data() { return extra_data_bl; }