int RGWWriteRequest::exec_start() {
struct req_state* s = get_state();
+ need_calc_md5 = (obj_manifest == NULL);
+
perfcounter->inc(l_rgw_put);
op_ret = -EINVAL;
return op_ret;
} /* exec_start */
+int RGWWriteRequest::exec_continue()
+{
+ struct req_state* s = get_state();
+ op_ret = 0;
+
+#if 0 // TODO: check offsets
+ if (next_off != last_off)
+ return -EIO;
+#endif
+ size_t len = data.length();
+ if (! len)
+ return 0;
+
+ /* XXX won't see multipart */
+ bool need_to_wait = (ofs == 0) && multipart;
+ bufferlist orig_data;
+
+ if (need_to_wait) {
+ orig_data = data;
+ }
+
+ op_ret = put_data_and_throttle(processor, data, ofs,
+ (need_calc_md5 ? &hash : NULL), need_to_wait);
+ if (op_ret < 0) {
+ if (!need_to_wait || op_ret != -EEXIST) {
+ ldout(s->cct, 20) << "processor->thottle_data() returned ret="
+ << op_ret << dendl;
+ goto done;
+ }
+
+ ldout(s->cct, 5) << "NOTICE: processor->throttle_data() returned -EEXIST, need to restart write" << dendl;
+
+ /* restore original data */
+ data.swap(orig_data);
+
+ /* restart processing with different oid suffix */
+ dispose_processor(processor);
+ processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
+ &multipart);
+
+ string oid_rand;
+ char buf[33];
+ gen_rand_alphanumeric(get_store()->ctx(), buf, sizeof(buf) - 1);
+ oid_rand.append(buf);
+
+ op_ret = processor->prepare(get_store(), &oid_rand);
+ if (op_ret < 0) {
+ ldout(s->cct, 0) << "ERROR: processor->prepare() returned "
+ << op_ret << dendl;
+ goto done;
+ }
+
+ op_ret = put_data_and_throttle(processor, data, ofs, NULL, false);
+ if (op_ret < 0) {
+ goto done;
+ }
+ }
+ bytes_written += len;
+
+done:
+ return op_ret;
+} /* exec_continue */
+
+int RGWWriteRequest::exec_finish()
+{
+ return 0;
+} /* exec_finish */
/* librgw */
extern "C" {
const std::string& bucket_name;
const std::string& obj_name;
RGWPutObjProcessor *processor;
- buffer::list bl;
+ buffer::list data;
+ MD5 hash;
off_t last_off;
off_t next_off;
size_t bytes_written;
bool multipart;
+ bool need_calc_md5;
RGWWriteRequest(CephContext* _cct, RGWUserInfo *_user,
const std::string& _bname, const std::string& _oname)
s->info.request_params = "";
s->info.domain = ""; /* XXX ? */
- /* XXX required in RGWOp::execute() */
- s->content_length = bl.length();
-
// woo
s->user = user;
virtual int get_data(buffer::list& _bl) {
/* XXX for now, use sharing semantics */
- _bl.claim(bl);
+ _bl.claim(data);
uint32_t len = _bl.length();
bytes_written += len;
return len;
void put_data(off_t off, buffer::list& _bl) {
next_off = off;
- bl.claim(_bl);
+ data.claim(_bl);
}
virtual int exec_start();
-
- virtual int exec_continue() {
- if (next_off != last_off)
- return -EIO;
- /* XXX consume bl */
- return 0;
- }
-
- virtual int exec_finish() {
- return 0;
- }
+ virtual int exec_continue();
+ virtual int exec_finish();
virtual void send_response() {}
virtual int verify_params() {
- if (bl.length() > cct->_conf->rgw_max_put_size)
- return -ERR_TOO_LARGE;
return 0;
}
}; /* RGWWriteRequest */