map<string, bufferlist> attrs;
int len;
map<string, string>::iterator iter;
+ bool multipart;
+ bool need_calc_md5 = (obj_manifest == NULL);
+
perfcounter->inc(l_rgw_put);
ret = -EINVAL;
if (ret < 0)
goto done;
- hash.Update(data_ptr, len);
+ if (need_calc_md5) {
+ hash.Update(data_ptr, len);
+ }
- ret = processor->throttle_data(handle);
- if (ret < 0)
- goto done;
+ /* do we need this operation to be synchronous? if we're dealing with an object with immutable
+ * head, e.g., multipart object we need to make sure we're the first one writing to this object
+ */
+ bool need_to_wait = (ofs == 0) && multipart;
+
+ ret = processor->throttle_data(handle, need_to_wait);
+ if (ret < 0) {
+ if (!need_to_wait || ret != -EEXIST) {
+ ldout(s->cct, 20) << "processor->thottle_data() returned ret=" << ret << dendl;
+ goto done;
+ }
+
+ ldout(s->cct, 5) << "NOTICE: processor->throttle_data() returned -EEXIST, need to restart write" << dendl;
+
+ /* restart processing with different oid suffix */
+
+ dispose_processor(processor);
+ processor = select_processor(&multipart);
+
+ string oid_rand;
+ char buf[33];
+ gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
+ oid_rand.append(buf);
+
+ ret = processor->prepare(store, s->obj_ctx, &oid_rand);
+ if (ret < 0) {
+ ldout(s->cct, 0) << "ERROR: processor->prepare() returned " << ret << dendl;
+ goto done;
+ }
+
+ ret = processor->handle_data(data, ofs, &handle);
+ if (ret < 0) {
+ ldout(s->cct, 0) << "ERROR: processor->handle_data() returned " << ret << dendl;
+ goto done;
+ }
+
+ ret = processor->throttle_data(handle, false);
+ if (ret < 0) {
+ ldout(s->cct, 0) << "ERROR: processor->throttle_data() returned " << ret << dendl;
+ goto done;
+ }
+ }
ofs += len;
} while (len > 0);
policy.set_ctx(s->cct);
}
- RGWPutObjProcessor *select_processor();
+ RGWPutObjProcessor *select_processor(bool *is_multipart);
void dispose_processor(RGWPutObjProcessor *processor);
+ int user_manifest_iterate_cb(rgw_bucket& bucket, RGWObjEnt& ent, RGWAccessControlPolicy *bucket_policy, off_t start_ofs, off_t end_ofs);
+
int verify_permission();
void pre_exec();
void execute();
stripe_size = rule.stripe_max_size;
stripe_size = MIN(manifest->get_obj_size() - stripe_ofs, stripe_size);
} else {
- stripe_size = rule.part_size - (ofs - stripe_ofs);
- stripe_size = MIN(stripe_size, rule.stripe_max_size);
+ uint64_t next = MIN(stripe_ofs + rule.stripe_max_size, part_ofs + rule.part_size);
+ stripe_size = next - stripe_ofs;
}
+ cur_override_prefix = rule.override_prefix;
+
update_location();
}
store = _store;
obj_ctx = _o;
return 0;
- };
+ }
virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle) = 0;
- virtual int throttle_data(void *handle) = 0;
+ virtual int throttle_data(void *handle, bool need_to_wait) = 0;
virtual int complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
};