while (first_block <= last_block) {
bufferlist tmp, tmp_out;
int ofs_in_bl = cs_info.blocks[first_block].new_ofs - cur_ofs;
- if (ofs_in_bl + cs_info.blocks[first_block].len > bl.length()) {
+ if (ofs_in_bl + cs_info.blocks[first_block].len > bl_len) {
// not complete block, put it to waiting
- int tail = bl.length() - ofs_in_bl;
- bl.copy(ofs_in_bl, tail, waiting);
+ int tail = bl_len - ofs_in_bl;
+ in_bl.copy(ofs_in_bl, tail, waiting);
cur_ofs -= tail;
break;
}
- bl.copy(ofs_in_bl, cs_info.blocks[first_block].len, tmp);
+ in_bl.copy(ofs_in_bl, cs_info.blocks[first_block].len, tmp);
int cr = compressor->decompress(tmp, tmp_out);
if (cr < 0) {
lderr(s->cct) << "Compression failed with exit code " << cr << dendl;
first_data = false;
cur_ofs += bl_len;
- return send_response_data(out_bl, bl_ofs, out_bl.length());
+ return send_response_data(out_bl, bl_ofs, out_bl.length() - bl_ofs);
}
}
// end of compression stuff
perfcounter->inc(l_rgw_get);
int64_t new_ofs, new_end;
+ int cret;
RGWRados::Object op_target(store, s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
RGWRados::Object::Read read_op(&op_target);
read_op.params.perr = &s->err;
op_ret = read_op.prepare(&new_ofs, &new_end);
- if (op_ret < 0)
+ if (op_ret < 0 && op_ret != -ERANGE) // check erange error later
+ goto done_err;
+
+ cret = rgw_compression_info_from_attrset(attrs, need_decompress, cs_info);
+ if (cret < 0) {
+ lderr(s->cct) << "ERROR: failed to decode compression info, cannot decompress" << dendl;
+ if (op_ret == 0)
+ op_ret = cret;
goto done_err;
+ }
+
+ if (op_ret == -ERANGE && !need_decompress)
+ goto done_err;
+
+ if (need_decompress) {
+ op_ret = 0;
+ s->obj_size = cs_info.orig_size;
+
+ if (partial_content) {
+ // recheck user range for correctness
+ if (ofs < 0) {
+ ofs += cs_info.orig_size;
+ if (ofs < 0)
+ ofs = 0;
+ end = cs_info.orig_size - 1;
+ } else if (end < 0) {
+ end = cs_info.orig_size - 1;
+ }
+
+ if (ofs >= cs_info.orig_size) {
+ lderr(s->cct) << "ERROR: begin of the bytes range more than object size (" << cs_info.orig_size
+ << ")" << dendl;
+ op_ret = -ERANGE;
+ goto done_err;
+ } else
+ new_ofs = ofs;
+
+ if (end >= cs_info.orig_size) {
+ ldout(s->cct, 5) << "WARNING: end of the bytes range more than object size (" << cs_info.orig_size
+ << ")" << dendl;
+ new_end = cs_info.orig_size - 1;
+ } else
+ new_end = end;
+
+ total_len = new_end - new_ofs + 1;
+
+ } else
+ total_len = cs_info.orig_size;
+ }
// for range requests with obj size 0
if (range_str && !(s->obj_size)) {
return;
}
- op_ret = rgw_compression_info_from_attrset(attrs, need_decompress, cs_info);
- if (op_ret < 0) {
- lderr(s->cct) << "ERROR: failed to decode compression info, cannot decompress" << dendl;
- goto done_err;
- }
- if (need_decompress && !partial_content) {
- total_len = cs_info.orig_size;
- }
-
-
/* Check whether the object has expired. Swift API documentation
* stands that we should return 404 Not Found in such case. */
if (need_object_expiration() && object_is_expired(attrs)) {
start = ofs;
if (need_decompress) {
+
+ if (cs_info.blocks.size() == 0) {
+ lderr(s->cct) << "ERROR: no info about compression blocks, cannot decompress" << dendl;
+ op_ret = -EIO;
+ goto done_err;
+ }
+
if (partial_content) {
+
// if user set range, we need to calculate it in decompressed data
first_block = 0; last_block = 0;
if (cs_info.blocks.size() > 1) {
first_block = 0; last_block = cs_info.blocks.size() - 1;
}
- ofs = cs_info.blocks[first_block].new_ofs;
- end = cs_info.blocks[last_block].new_ofs + cs_info.blocks[last_block].len;
-
- // check user range for correctness
- if (new_ofs < 0) {
- ldout(s->cct, 5) << "WARNING: uncorrect begin of the bytes range, get 0 instead" << dendl;
- new_ofs = 0;
- }
- if (new_end >= cs_info.orig_size) {
- ldout(s->cct, 5) << "WARNING: end of the bytes range more than object size (" << cs_info.orig_size
- << ")" << dendl;
- new_end = cs_info.orig_size - 1;
- }
-
q_ofs = new_ofs - cs_info.blocks[first_block].old_ofs;
q_len = new_end - cs_info.blocks[last_block].old_ofs + 1;
+ new_ofs = cs_info.blocks[first_block].new_ofs;
+ new_end = cs_info.blocks[last_block].new_ofs + cs_info.blocks[last_block].len;
+
first_data = true;
- cur_ofs = ofs;
+ cur_ofs = new_ofs;
waiting.clear();
-
}
/* STAT ops don't need data, and do no i/o */
return;
}
- if (!get_data || ofs > end) {
+ if (!get_data || new_ofs > new_end) {
send_response_data(bl, 0, 0);
return;
}
- perfcounter->inc(l_rgw_get_b, end - ofs);
+ perfcounter->inc(l_rgw_get_b, new_end - new_ofs);
- op_ret = read_op.iterate(ofs, end, &cb);
+ op_ret = read_op.iterate(new_ofs, new_end, &cb);
perfcounter->tinc(l_rgw_get_lat,
(ceph_clock_now(s->cct) - start_time));
info.size = s->obj_size;
info.modified = real_clock::now();
info.manifest = manifest;
+ if (attrs.find(RGW_ATTR_COMPRESSION) != attrs.end()) {
+ bool tmp;
+ RGWCompressionInfo cs_info;
+ rgw_compression_info_from_attrset(attrs, tmp, cs_info);
+ info.cs_info = cs_info;
+ }
::encode(info, bl);
string multipart_meta_obj = mp.get_meta();
fst = copy_source_range_fst;
lst = copy_source_range_lst;
+ processor->compression_enabled = s->cct->_conf->rgw_compression_type != "none";
do {
bufferlist data_in;
dout(10) << "v4 auth ok" << dendl;
}
-
op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
user_quota, bucket_quota, s->obj_size);
if (op_ret < 0) {
return;
}
+ processor.compression_enabled = s->cct->_conf->rgw_compression_type != "none";
+
while (data_pending) {
bufferlist data;
len = get_data(data);
if (!len)
break;
+ hash.Update((const byte *)data.c_str(), data.length());
op_ret = put_data_and_throttle(&processor, data, ofs, &hash, false);
ofs += len;
return;
}
- processor.complete_hash(&hash);
hash.Final(m);
buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
emplace_attr(RGW_ATTR_CONTENT_TYPE, std::move(ct_bl));
}
+ if (processor.is_compressed()) {
+ bufferlist tmp;
+ RGWCompressionInfo cs_info;
+ cs_info.compression_type = s->cct->_conf->rgw_compression_type;
+ cs_info.orig_size = s->obj_size;
+ cs_info.blocks = processor.get_compression_blocks();
+ ::encode(cs_info, tmp);
+ emplace_attr(RGW_ATTR_COMPRESSION, std::move(tmp));
+ }
+
op_ret = processor.complete(etag, NULL, real_time(), attrs, delete_at);
}
int max_parts = 1000;
int marker = 0;
bool truncated;
+ RGWCompressionInfo cs_info;
+ bool compressed = false;
uint64_t min_part_size = s->cct->_conf->rgw_multipart_min_part_size;
manifest.append(obj_part.manifest);
}
+ if (obj_part.cs_info.compression_type != "none") {
+ if (compressed && cs_info.compression_type != obj_part.cs_info.compression_type) {
+ ldout(s->cct, 0) << "ERROR: compression type was changed during multipart upload ("
+ << cs_info.compression_type << ">>" << obj_part.cs_info.compression_type << ")" << dendl;
+ op_ret = -ERR_INVALID_PART;
+ return;
+ }
+ int new_ofs; // offset in compression data for new part
+ if (cs_info.blocks.size() > 0)
+ new_ofs = cs_info.blocks[cs_info.blocks.size() - 1].new_ofs + cs_info.blocks[cs_info.blocks.size() - 1].len;
+ else
+ new_ofs = 0;
+ for (off_t i=0; i < obj_part.cs_info.blocks.size(); ++i) {
+ compression_block cb;
+ cb.old_ofs = obj_part.cs_info.blocks[i].old_ofs + cs_info.orig_size;
+ cb.new_ofs = new_ofs;
+ cb.len = obj_part.cs_info.blocks[i].len;
+ cs_info.blocks.push_back(cb);
+ new_ofs = cb.new_ofs + cb.len;
+ }
+ if (!compressed)
+ cs_info.compression_type = obj_part.cs_info.compression_type;
+ cs_info.orig_size += obj_part.cs_info.orig_size;
+ compressed = true;
+ }
+
rgw_obj_key remove_key;
src_obj.get_index_key(&remove_key);
attrs[RGW_ATTR_ETAG] = etag_bl;
+ if (compressed) {
+ // write compression attribute to full object
+ bufferlist tmp;
+ ::encode(cs_info, tmp);
+ attrs[RGW_ATTR_COMPRESSION] = tmp;
+ }
+
target_obj.init(s->bucket, s->object.name);
if (versioned_object) {
store->gen_rand_obj_instance_name(&target_obj);
string etag;
ceph::real_time modified;
RGWObjManifest manifest;
+ RGWCompressionInfo cs_info;
RGWUploadPartInfo() : num(0), size(0) {}
void encode(bufferlist& bl) const {
- ENCODE_START(3, 2, bl);
+ ENCODE_START(4, 2, bl);
::encode(num, bl);
::encode(size, bl);
::encode(etag, bl);
::encode(modified, bl);
::encode(manifest, bl);
+ ::encode(cs_info, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
- DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl);
+ DECODE_START_LEGACY_COMPAT_LEN(4, 2, 2, bl);
::decode(num, bl);
::decode(size, bl);
::decode(etag, bl);
::decode(modified, bl);
if (struct_v >= 3)
::decode(manifest, bl);
+ if (struct_v >= 4)
+ ::decode(cs_info, bl);
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
const char *if_match = NULL, const char *if_nomatch = NULL) = 0;
public:
- RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), obj_ctx(_obj_ctx), is_complete(false), compressed(false), bucket_info(_bi), canceled(false) {}
+ RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL),
+ obj_ctx(_obj_ctx),
+ is_complete(false),
+ compressed(false),
+ bucket_info(_bi),
+ canceled(false),
+ compression_enabled(false) {}
virtual ~RGWPutObjProcessor() {}
virtual int prepare(RGWRados *_store, string *oid_rand) {
store = _store;
map<string, bufferlist>& attrs, ceph::real_time delete_at,
const char *if_match = NULL, const char *if_nomatch = NULL);
+ bool compression_enabled;
CephContext *ctx();
bool is_canceled() { return canceled; }