return store->ctx();
}
-RGWPutObjProcessor::~RGWPutObjProcessor()
+RGWPutObjProcessor_Aio::~RGWPutObjProcessor_Aio()
{
+ drain_pending();
+
if (is_complete)
return;
list<rgw_obj>::iterator iter;
- for (iter = objs.begin(); iter != objs.end(); ++iter) {
+ for (iter = written_objs.begin(); iter != written_objs.end(); ++iter) {
rgw_obj& obj = *iter;
int r = store->delete_obj(obj_ctx, bucket_owner, obj);
if (r < 0 && r != -ENOENT) {
if ((uint64_t)abs_ofs + bl.length() > obj_len)
obj_len = abs_ofs + bl.length();
+ if (!(obj == last_written_obj)) {
+ add_written_obj(obj);
+ last_written_obj = obj;
+ }
+
// For the first call pass -1 as the offset to
// do a write_full.
int r = store->aio_put_obj_data(NULL, obj,
cur_part_ofs = ofs;
next_part_ofs = ofs + manifest_gen.cur_stripe_max_size();
cur_obj = manifest_gen.get_cur_obj();
- add_obj(cur_obj);
return 0;
}
map<string, bufferlist>& attrs,
const char *if_match = NULL, const char *if_nomatch = NULL) = 0;
- list<rgw_obj> objs;
-
- void add_obj(const rgw_obj& obj) {
- objs.push_back(obj);
- }
public:
RGWPutObjProcessor(const string& _bo) : store(NULL), obj_ctx(NULL), is_complete(false), bucket_owner(_bo) {}
- virtual ~RGWPutObjProcessor();
+ virtual ~RGWPutObjProcessor() {}
virtual int prepare(RGWRados *_store, void *_o, string *oid_rand) {
store = _store;
obj_ctx = _o;
int wait_pending_front();
bool pending_has_completed();
+ rgw_obj last_written_obj;
+
protected:
uint64_t obj_len;
+ list<rgw_obj> written_objs;
+
+ void add_written_obj(const rgw_obj& obj) {
+ written_objs.push_back(obj);
+ }
+
int drain_pending();
int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive);
int throttle_data(void *handle, bool need_to_wait);
RGWPutObjProcessor_Aio(const string& bucket_owner) : RGWPutObjProcessor(bucket_owner), max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {}
- virtual ~RGWPutObjProcessor_Aio() {
- drain_pending();
- }
+ virtual ~RGWPutObjProcessor_Aio();
};
class RGWPutObjProcessor_Atomic : public RGWPutObjProcessor_Aio