}
// compression stuff
if (need_decompress) {
- ldout(s->cct, 10) << "Compression for rgw is enabled, decompress part" << dendl;
+ ldout(s->cct, 10) << "Compression for rgw is enabled, decompress part " << bl_len << dendl;
+
CompressorRef compressor = Compressor::create(s->cct, cs_info.compression_type);
if (!compressor.get()) {
// if compressor isn't available - error, because cannot return decompressed data?
<< "for rgw, check rgw_compression_type config option" << dendl;
return -EIO;
} else {
- bufferlist out_bl;
- int cr = compressor->decompress(bl, out_bl);
- if (cr < 0) {
- lderr(s->cct) << "Compression failed with exit code " << cr << dendl;
- return cr;
+ bufferlist out_bl, in_bl;
+ bl_ofs = 0;
+ if (waiting.length() != 0) {
+ in_bl.append(waiting);
+ in_bl.append(bl);
+ waiting.clear();
+ } else {
+ in_bl.claim(bl);
+ }
+ bl_len = in_bl.length();
+
+ while (first_block <= last_block) {
+ bufferlist tmp, tmp_out;
+ int ofs_in_bl = cs_info.blocks[first_block].new_ofs - cur_ofs;
+ if (ofs_in_bl + cs_info.blocks[first_block].len > bl.length()) {
+ // not complete block, put it to waiting
+ int tail = bl.length() - ofs_in_bl;
+ bl.copy(ofs_in_bl, tail, waiting);
+ cur_ofs -= tail;
+ break;
+ }
+ bl.copy(ofs_in_bl, cs_info.blocks[first_block].len, tmp);
+ int cr = compressor->decompress(tmp, tmp_out);
+ if (cr < 0) {
+ lderr(s->cct) << "Compression failed with exit code " << cr << dendl;
+ return cr;
+ }
+ if (first_block == last_block && partial_content)
+ out_bl.append(tmp_out.c_str(), q_len);
+ else
+ out_bl.append(tmp_out);
+ first_block++;
}
+
+ if (first_data && partial_content && out_bl.length() != 0)
+ bl_ofs = q_ofs;
+
+ if (first_data && out_bl.length() != 0)
+ first_data = false;
+
+ cur_ofs += bl_len;
return send_response_data(out_bl, bl_ofs, out_bl.length());
}
}
lderr(s->cct) << "ERROR: failed to decode compression info, cannot decompress" << dendl;
goto done_err;
}
- if (need_decompress) {
+ if (need_decompress && !partial_content) {
total_len = cs_info.orig_size;
}
start = ofs;
+ if (need_decompress) {
+ if (partial_content) {
+ // if user set range, we need to calculate it in decompressed data
+ first_block = 0; last_block = 0;
+ if (cs_info.blocks.size() > 1) {
+ off_t i = 1;
+ while (i < cs_info.blocks.size() && cs_info.blocks[i].old_ofs <= new_ofs) i++;
+ first_block = i - 1;
+ while (i < cs_info.blocks.size() && cs_info.blocks[i].old_ofs < new_end) i++;
+ last_block = i - 1;
+ }
+ } else {
+ first_block = 0; last_block = cs_info.blocks.size() - 1;
+ }
+
+ ofs = cs_info.blocks[first_block].new_ofs;
+ end = cs_info.blocks[last_block].new_ofs + cs_info.blocks[last_block].len;
+
+ // check user range for correctness
+ if (new_ofs < 0) {
+ ldout(s->cct, 5) << "WARNING: uncorrect begin of the bytes range, get 0 instead" << dendl;
+ new_ofs = 0;
+ }
+ if (new_end >= cs_info.orig_size) {
+ ldout(s->cct, 5) << "WARNING: end of the bytes range more than object size (" << cs_info.orig_size
+ << ")" << dendl;
+ new_end = cs_info.orig_size - 1;
+ }
+
+ q_ofs = new_ofs - cs_info.blocks[first_block].old_ofs;
+ q_len = new_end - cs_info.blocks[last_block].old_ofs + 1;
+
+ first_data = true;
+ cur_ofs = ofs;
+ waiting.clear();
+
+ }
+
/* STAT ops don't need data, and do no i/o */
if (get_type() == RGW_OP_STAT_OBJ) {
return;
RGWCompressionInfo cs_info;
cs_info.compression_type = s->cct->_conf->rgw_compression_type;
cs_info.orig_size = s->obj_size;
+ cs_info.blocks = processor->get_compression_blocks();
::encode(cs_info, tmp);
attrs[RGW_ATTR_COMPRESSION] = tmp;
}
// compression attrs
RGWCompressionInfo cs_info;
bool need_decompress;
+ off_t first_block, last_block;
+ off_t q_ofs, q_len;
+ bool first_data;
+ uint64_t cur_ofs;
+ bufferlist waiting;
int init_common();
public:
skip_manifest = false;
is_slo = false;
rgwx_stat = false;
+ need_decompress = false;
+ first_block = 0;
+ last_block = 0;
+ q_ofs = 0;
+ q_len = 0;
+ first_data = true;
+ cur_ofs = 0;
}
bool prefetch_data();
bool compression_enabled = store->ctx()->_conf->rgw_compression_type != "none";
if ((ofs > 0 && compressed) || // if previous part was compressed
(ofs == 0 && compression_enabled)) { // or it's the first part and flag is set
- ldout(store->ctx(), 10) << "Compression for rgw is enabled, compress part" << dendl;
+ ldout(store->ctx(), 10) << "Compression for rgw is enabled, compress part " << bl.length() << dendl;
CompressorRef compressor = Compressor::create(store->ctx(), store->ctx()->_conf->rgw_compression_type);
if (!compressor.get()) {
+ if (ofs > 0 && compressed) {
+ lderr(store->ctx()) << "Cannot load compressor of type " << store->ctx()->_conf->rgw_compression_type
+ << " for next part, compression process failed" << dendl;
+ return -EIO;
+ }
// if compressor isn't available - just do not use it with log warning?
ldout(store->ctx(), 5) << "Cannot load compressor of type " << store->ctx()->_conf->rgw_compression_type
<< "for rgw, check rgw_compression_type config option" << dendl;
in_bl.claim(bl);
} else {
int cr = compressor->compress(bl, in_bl);
- if (cr != 0) {
+ if (cr < 0) {
+ if (ofs > 0 && compressed) {
+ lderr(store->ctx()) << "Compression failed with exit code " << cr
+ << " for next part, compression process failed" << dendl;
+ return -EIO;
+ }
ldout(store->ctx(), 5) << "Compression failed with exit code " << cr << dendl;
compressed = false;
in_bl.claim(bl);
} else {
compressed = true;
+
+ compression_block newbl;
+ int bs = blocks.size();
+ newbl.old_ofs = ofs;
+ newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0;
+ newbl.len = in_bl.length();
+ blocks.push_back(newbl);
}
}
} else {
} catch (buffer::error& err) {
return -EIO;
}
- need_decompress = true;
+ if (cs_info.compression_type != "none")
+ need_decompress = true;
+ else
+ need_decompress = false;
return 0;
} else {
need_decompress = false;
int rgw_policy_from_attrset(CephContext *cct, map<string, bufferlist>& attrset, RGWAccessControlPolicy *policy);
+struct compression_block {
+ uint64_t old_ofs;
+ uint64_t new_ofs;
+ uint64_t len;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(old_ofs, bl);
+ ::encode(new_ofs, bl);
+ ::encode(len, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(old_ofs, bl);
+ ::decode(new_ofs, bl);
+ ::decode(len, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(compression_block)
+
struct RGWCompressionInfo {
string compression_type;
uint64_t orig_size;
+ vector<compression_block> blocks;
RGWCompressionInfo() : compression_type("none"), orig_size(0) {}
ENCODE_START(1, 1, bl);
::encode(compression_type, bl);
::encode(orig_size, bl);
+ ::encode(blocks, bl);
ENCODE_FINISH(bl);
}
DECODE_START(1, bl);
::decode(compression_type, bl);
::decode(orig_size, bl);
+ ::decode(blocks, bl);
DECODE_FINISH(bl);
}
};
bool compressed;
RGWBucketInfo bucket_info;
bool canceled;
+ vector<compression_block> blocks;
virtual int do_complete(string& etag, ceph::real_time *mtime, ceph::real_time set_mtime,
map<string, bufferlist>& attrs, ceph::real_time delete_at,
bool is_canceled() { return canceled; }
bool is_compressed() { return compressed; }
+ const vector<compression_block>& get_compression_blocks() { return blocks; }
}; /* RGWPutObjProcessor */
struct put_obj_aio_info {
CEPH_OUT_DIR=$VSTART_DEST/out
fi
+mkdir -p .libs/compressor
+for f in `ls -d compressor/*/`;
+do
+ cp .libs/libceph_`basename $f`.so* .libs/compressor/;
+done
+
# for running out of the CMake build directory
if [ -e CMakeCache.txt ]; then
# Out of tree build, learn source location from CMakeCache.txt