}
}
-struct put_obj_aio_info {
- void *handle;
-};
-
int RGWPutObj::verify_permission()
{
if (!verify_bucket_permission(s, RGW_PERM_WRITE))
return 0;
}
-int RGWPutObjProcessor::complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs)
-{
- int r = do_complete(etag, mtime, attrs);
- if (r < 0)
- return r;
-
- is_complete = true;
- return 0;
-}
-
-RGWPutObjProcessor::~RGWPutObjProcessor()
-{
- if (is_complete)
- return;
-
- list<rgw_obj>::iterator iter;
- for (iter = objs.begin(); iter != objs.end(); ++iter) {
- rgw_obj& obj = *iter;
- int r = store->delete_obj(obj_ctx, obj);
- if (r < 0 && r != -ENOENT) {
- ldout(store->ctx(), 0) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
- }
- }
-}
-
-class RGWPutObjProcessor_Plain : public RGWPutObjProcessor
-{
- rgw_bucket bucket;
- string obj_str;
-
- bufferlist data;
- rgw_obj obj;
- off_t ofs;
-
-protected:
- int prepare(RGWRados *store, void *obj_ctx);
- int handle_data(bufferlist& bl, off_t ofs, void **phandle);
- int throttle_data(void *handle) { return 0; }
- int do_complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs);
-
-public:
- RGWPutObjProcessor_Plain(rgw_bucket& b, const string& o) : bucket(b), obj_str(o), ofs(0) {}
-};
-
-int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx)
-{
- RGWPutObjProcessor::prepare(store, obj_ctx);
-
- obj.init(bucket, obj_str);
-
- return 0;
-};
-
-int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **phandle)
-{
- if (ofs != _ofs)
- return -EINVAL;
-
- data.append(bl);
- ofs += bl.length();
-
- return 0;
-}
-
-int RGWPutObjProcessor_Plain::do_complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs)
-{
- int r = store->put_obj_meta(obj_ctx, obj, data.length(), mtime, attrs,
- RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE,
- &data);
- return r;
-}
-
-
-class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
-{
- list<struct put_obj_aio_info> pending;
- size_t max_chunks;
-
- struct put_obj_aio_info pop_pending();
- int wait_pending_front();
- bool pending_has_completed();
- int drain_pending();
-
-protected:
- uint64_t obj_len;
-
- int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle);
- int throttle_data(void *handle);
-
- RGWPutObjProcessor_Aio() : max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {}
- virtual ~RGWPutObjProcessor_Aio() {
- drain_pending();
- }
-};
-
-int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle)
-{
- if ((uint64_t)abs_ofs + bl.length() > obj_len)
- obj_len = abs_ofs + bl.length();
-
- // For the first call pass -1 as the offset to
- // do a write_full.
- int r = store->aio_put_obj_data(NULL, obj,
- bl,
- ((ofs != 0) ? ofs : -1),
- false, phandle);
-
- return r;
-}
-
-struct put_obj_aio_info RGWPutObjProcessor_Aio::pop_pending()
-{
- struct put_obj_aio_info info;
- info = pending.front();
- pending.pop_front();
- return info;
-}
-
-int RGWPutObjProcessor_Aio::wait_pending_front()
-{
- struct put_obj_aio_info info = pop_pending();
- int ret = store->aio_wait(info.handle);
- return ret;
-}
-
-bool RGWPutObjProcessor_Aio::pending_has_completed()
-{
- if (pending.empty())
- return false;
-
- struct put_obj_aio_info& info = pending.front();
- return store->aio_completed(info.handle);
-}
-
-int RGWPutObjProcessor_Aio::drain_pending()
-{
- int ret = 0;
- while (!pending.empty()) {
- int r = wait_pending_front();
- if (r < 0)
- ret = r;
- }
- return ret;
-}
-
-int RGWPutObjProcessor_Aio::throttle_data(void *handle)
-{
- if (handle) {
- struct put_obj_aio_info info;
- info.handle = handle;
- pending.push_back(info);
- }
- size_t orig_size = pending.size();
- while (pending_has_completed()) {
- int r = wait_pending_front();
- if (r < 0)
- return r;
- }
-
- /* resize window in case messages are draining too fast */
- if (orig_size - pending.size() >= max_chunks) {
- max_chunks++;
- }
-
- if (pending.size() > max_chunks) {
- int r = wait_pending_front();
- if (r < 0)
- return r;
- }
- return 0;
-}
-
-class RGWPutObjProcessor_Atomic : public RGWPutObjProcessor_Aio
-{
- bufferlist first_chunk;
- uint64_t part_size;
- off_t cur_part_ofs;
- off_t next_part_ofs;
- int cur_part_id;
-protected:
- rgw_bucket bucket;
- string obj_str;
-
- string unique_tag;
-
- string oid_prefix;
- rgw_obj head_obj;
- rgw_obj cur_obj;
- RGWObjManifest manifest;
-
- virtual bool immutable_head() { return false; }
-
- int prepare(RGWRados *store, void *obj_ctx);
- virtual int do_complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs);
-
- void prepare_next_part(off_t ofs);
- void complete_parts();
-
-public:
- ~RGWPutObjProcessor_Atomic() {}
- RGWPutObjProcessor_Atomic(rgw_bucket& _b, const string& _o, uint64_t _p, const string& _t) : part_size(_p),
- cur_part_ofs(0),
- next_part_ofs(_p),
- cur_part_id(0),
- bucket(_b),
- obj_str(_o),
- unique_tag(_t) {}
- int handle_data(bufferlist& bl, off_t ofs, void **phandle) {
- if (!ofs && !immutable_head()) {
- first_chunk.claim(bl);
- *phandle = NULL;
- obj_len = (uint64_t)first_chunk.length();
- prepare_next_part(first_chunk.length());
- return 0;
- }
- if (ofs >= next_part_ofs)
- prepare_next_part(ofs);
- int r = RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle);
-
- return r;
- }
-};
-
-int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, void *obj_ctx)
-{
- RGWPutObjProcessor::prepare(store, obj_ctx);
-
- head_obj.init(bucket, obj_str);
-
- char buf[33];
- gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
- oid_prefix.append("_");
- oid_prefix.append(buf);
- oid_prefix.append("_");
-
- return 0;
-}
-
-void RGWPutObjProcessor_Atomic::prepare_next_part(off_t ofs) {
- int num_parts = manifest.objs.size();
- RGWObjManifestPart *part;
-
- /* first update manifest for written data */
- if (!num_parts) {
- part = &manifest.objs[cur_part_ofs];
- part->loc = head_obj;
- } else {
- part = &manifest.objs[cur_part_ofs];
- part->loc = cur_obj;
- }
- part->loc_ofs = 0;
- part->size = ofs - cur_part_ofs;
-
- if ((uint64_t)ofs > manifest.obj_size)
- manifest.obj_size = ofs;
-
- /* now update params for next part */
-
- cur_part_ofs = ofs;
- next_part_ofs = cur_part_ofs + part_size;
- char buf[16];
-
- cur_part_id++;
- snprintf(buf, sizeof(buf), "%d", cur_part_id);
- string cur_oid = oid_prefix;
- cur_oid.append(buf);
- cur_obj.init_ns(bucket, cur_oid, shadow_ns);
-
- add_obj(cur_obj);
-};
-
-void RGWPutObjProcessor_Atomic::complete_parts()
-{
- if (obj_len > (uint64_t)cur_part_ofs)
- prepare_next_part(obj_len);
-}
-
-int RGWPutObjProcessor_Atomic::do_complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs)
-{
- complete_parts();
-
- store->set_atomic(obj_ctx, head_obj);
-
- RGWRados::PutObjMetaExtraParams extra_params;
-
- extra_params.data = &first_chunk;
- extra_params.manifest = &manifest;
- extra_params.ptag = &unique_tag; /* use req_id as operation tag */
- extra_params.mtime = mtime;
-
- int r = store->put_obj_meta(obj_ctx, head_obj, obj_len, attrs,
- RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE,
- extra_params);
- return r;
-}
-
class RGWPutObjProcessor_Multipart : public RGWPutObjProcessor_Atomic
{
string part_num;
virtual const char *name() { return "delete_bucket"; }
};
-class RGWPutObjProcessor
-{
-protected:
- RGWRados *store;
- void *obj_ctx;
- bool is_complete;
-
- virtual int do_complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs) = 0;
-
- list<rgw_obj> objs;
-
- void add_obj(rgw_obj& obj) {
- objs.push_back(obj);
- }
-public:
- RGWPutObjProcessor() : store(NULL), obj_ctx(NULL), is_complete(false) {}
- virtual ~RGWPutObjProcessor();
- virtual int prepare(RGWRados *_store, void *_o) {
- store = _store;
- obj_ctx = _o;
- return 0;
- };
- virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle) = 0;
- virtual int throttle_data(void *handle) = 0;
- virtual int complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs);
-};
-
class RGWPutObj : public RGWOp {
friend class RGWPutObjProcessor;
append_rand_alpha(cct, write_version.tag, write_version.tag, TAG_LEN);
}
+int RGWPutObjProcessor::complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs)
+{
+ int r = do_complete(etag, mtime, attrs);
+ if (r < 0)
+ return r;
+
+ is_complete = true;
+ return 0;
+}
+
+RGWPutObjProcessor::~RGWPutObjProcessor()
+{
+ if (is_complete)
+ return;
+
+ list<rgw_obj>::iterator iter;
+ for (iter = objs.begin(); iter != objs.end(); ++iter) {
+ rgw_obj& obj = *iter;
+ int r = store->delete_obj(obj_ctx, obj);
+ if (r < 0 && r != -ENOENT) {
+ ldout(store->ctx(), 0) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
+ }
+ }
+}
+
+int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx)
+{
+ RGWPutObjProcessor::prepare(store, obj_ctx);
+
+ obj.init(bucket, obj_str);
+
+ return 0;
+};
+
+int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **phandle)
+{
+ if (ofs != _ofs)
+ return -EINVAL;
+
+ data.append(bl);
+ ofs += bl.length();
+
+ return 0;
+}
+
+int RGWPutObjProcessor_Plain::do_complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs)
+{
+ int r = store->put_obj_meta(obj_ctx, obj, data.length(), mtime, attrs,
+ RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE,
+ &data);
+ return r;
+}
+
+
+int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle)
+{
+ if ((uint64_t)abs_ofs + bl.length() > obj_len)
+ obj_len = abs_ofs + bl.length();
+
+ // For the first call pass -1 as the offset to
+ // do a write_full.
+ int r = store->aio_put_obj_data(NULL, obj,
+ bl,
+ ((ofs != 0) ? ofs : -1),
+ false, phandle);
+
+ return r;
+}
+
+struct put_obj_aio_info RGWPutObjProcessor_Aio::pop_pending()
+{
+ struct put_obj_aio_info info;
+ info = pending.front();
+ pending.pop_front();
+ return info;
+}
+
+int RGWPutObjProcessor_Aio::wait_pending_front()
+{
+ struct put_obj_aio_info info = pop_pending();
+ int ret = store->aio_wait(info.handle);
+ return ret;
+}
+
+bool RGWPutObjProcessor_Aio::pending_has_completed()
+{
+ if (pending.empty())
+ return false;
+
+ struct put_obj_aio_info& info = pending.front();
+ return store->aio_completed(info.handle);
+}
+
+int RGWPutObjProcessor_Aio::drain_pending()
+{
+ int ret = 0;
+ while (!pending.empty()) {
+ int r = wait_pending_front();
+ if (r < 0)
+ ret = r;
+ }
+ return ret;
+}
+
+int RGWPutObjProcessor_Aio::throttle_data(void *handle)
+{
+ if (handle) {
+ struct put_obj_aio_info info;
+ info.handle = handle;
+ pending.push_back(info);
+ }
+ size_t orig_size = pending.size();
+ while (pending_has_completed()) {
+ int r = wait_pending_front();
+ if (r < 0)
+ return r;
+ }
+
+ /* resize window in case messages are draining too fast */
+ if (orig_size - pending.size() >= max_chunks) {
+ max_chunks++;
+ }
+
+ if (pending.size() > max_chunks) {
+ int r = wait_pending_front();
+ if (r < 0)
+ return r;
+ }
+ return 0;
+}
+
+int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle) {
+ if (!ofs && !immutable_head()) {
+ first_chunk.claim(bl);
+ *phandle = NULL;
+ obj_len = (uint64_t)first_chunk.length();
+ prepare_next_part(first_chunk.length());
+ return 0;
+ }
+ if (ofs >= next_part_ofs)
+ prepare_next_part(ofs);
+ int r = RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle);
+
+ return r;
+}
+
+int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, void *obj_ctx)
+{
+ RGWPutObjProcessor::prepare(store, obj_ctx);
+
+ head_obj.init(bucket, obj_str);
+
+ char buf[33];
+ gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
+ oid_prefix.append("_");
+ oid_prefix.append(buf);
+ oid_prefix.append("_");
+
+ return 0;
+}
+
+void RGWPutObjProcessor_Atomic::prepare_next_part(off_t ofs) {
+ int num_parts = manifest.objs.size();
+ RGWObjManifestPart *part;
+
+ /* first update manifest for written data */
+ if (!num_parts) {
+ part = &manifest.objs[cur_part_ofs];
+ part->loc = head_obj;
+ } else {
+ part = &manifest.objs[cur_part_ofs];
+ part->loc = cur_obj;
+ }
+ part->loc_ofs = 0;
+ part->size = ofs - cur_part_ofs;
+
+ if ((uint64_t)ofs > manifest.obj_size)
+ manifest.obj_size = ofs;
+
+ /* now update params for next part */
+
+ cur_part_ofs = ofs;
+ next_part_ofs = cur_part_ofs + part_size;
+ char buf[16];
+
+ cur_part_id++;
+ snprintf(buf, sizeof(buf), "%d", cur_part_id);
+ string cur_oid = oid_prefix;
+ cur_oid.append(buf);
+ cur_obj.init_ns(bucket, cur_oid, shadow_ns);
+
+ add_obj(cur_obj);
+};
+
+void RGWPutObjProcessor_Atomic::complete_parts()
+{
+ if (obj_len > (uint64_t)cur_part_ofs)
+ prepare_next_part(obj_len);
+}
+
+int RGWPutObjProcessor_Atomic::do_complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs)
+{
+ complete_parts();
+
+ store->set_atomic(obj_ctx, head_obj);
+
+ RGWRados::PutObjMetaExtraParams extra_params;
+
+ extra_params.data = &first_chunk;
+ extra_params.manifest = &manifest;
+ extra_params.ptag = &unique_tag; /* use req_id as operation tag */
+ extra_params.mtime = mtime;
+
+ int r = store->put_obj_meta(obj_ctx, head_obj, obj_len, attrs,
+ RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE,
+ extra_params);
+ return r;
+}
+
class RGWWatcher : public librados::WatchCtx {
RGWRados *rados;
public:
};
WRITE_CLASS_ENCODER(RGWUploadPartInfo)
+class RGWPutObjProcessor
+{
+protected:
+ RGWRados *store;
+ void *obj_ctx;
+ bool is_complete;
+
+ virtual int do_complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs) = 0;
+
+ list<rgw_obj> objs;
+
+ void add_obj(rgw_obj& obj) {
+ objs.push_back(obj);
+ }
+public:
+ RGWPutObjProcessor() : store(NULL), obj_ctx(NULL), is_complete(false) {}
+ virtual ~RGWPutObjProcessor();
+ virtual int prepare(RGWRados *_store, void *_o) {
+ store = _store;
+ obj_ctx = _o;
+ return 0;
+ };
+ virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle) = 0;
+ virtual int throttle_data(void *handle) = 0;
+ virtual int complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs);
+};
+
+class RGWPutObjProcessor_Plain : public RGWPutObjProcessor
+{
+ rgw_bucket bucket;
+ string obj_str;
+
+ bufferlist data;
+ rgw_obj obj;
+ off_t ofs;
+
+protected:
+ int prepare(RGWRados *store, void *obj_ctx);
+ int handle_data(bufferlist& bl, off_t ofs, void **phandle);
+ int throttle_data(void *handle) { return 0; }
+ int do_complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs);
+
+public:
+ RGWPutObjProcessor_Plain(rgw_bucket& b, const string& o) : bucket(b), obj_str(o), ofs(0) {}
+};
+
+struct put_obj_aio_info {
+ void *handle;
+};
+
+class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
+{
+ list<struct put_obj_aio_info> pending;
+ size_t max_chunks;
+
+ struct put_obj_aio_info pop_pending();
+ int wait_pending_front();
+ bool pending_has_completed();
+ int drain_pending();
+
+protected:
+ uint64_t obj_len;
+
+ int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle);
+ int throttle_data(void *handle);
+
+ RGWPutObjProcessor_Aio() : max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {}
+ virtual ~RGWPutObjProcessor_Aio() {
+ drain_pending();
+ }
+};
+
+class RGWPutObjProcessor_Atomic : public RGWPutObjProcessor_Aio
+{
+ bufferlist first_chunk;
+ uint64_t part_size;
+ off_t cur_part_ofs;
+ off_t next_part_ofs;
+ int cur_part_id;
+protected:
+ rgw_bucket bucket;
+ string obj_str;
+
+ string unique_tag;
+
+ string oid_prefix;
+ rgw_obj head_obj;
+ rgw_obj cur_obj;
+ RGWObjManifest manifest;
+
+ virtual bool immutable_head() { return false; }
+
+ int prepare(RGWRados *store, void *obj_ctx);
+ virtual int do_complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs);
+
+ void prepare_next_part(off_t ofs);
+ void complete_parts();
+
+public:
+ ~RGWPutObjProcessor_Atomic() {}
+ RGWPutObjProcessor_Atomic(rgw_bucket& _b, const string& _o, uint64_t _p, const string& _t) : part_size(_p),
+ cur_part_ofs(0),
+ next_part_ofs(_p),
+ cur_part_id(0),
+ bucket(_b),
+ obj_str(_o),
+ unique_tag(_t) {}
+ int handle_data(bufferlist& bl, off_t ofs, void **phandle);
+};
+
+
struct RGWObjState {
bool is_atomic;
bool has_attrs;