bufferlist olh_tag;
uint64_t pg_ver{false};
uint32_t zone_short_id{0};
+ bool compressed{false};
/* important! don't forget to update copy constructor */
} else {
compressor.emplace(s->cct, plugin, filter);
filter = &*compressor;
+ // always send incompressible hint when rgw is itself doing compression
+ s->object->set_compressed(s->obj_ctx);
}
}
}
return error.value_or(0);
}
+void RadosWriter::add_write_hint(librados::ObjectWriteOperation& op) {
+ const rgw_obj obj = head_obj->get_obj();
+ const RGWObjState *obj_state = obj_ctx.get_state(obj);
+ const bool compressed = obj_state->compressed;
+ uint32_t alloc_hint_flags = 0;
+ if (compressed) {
+ alloc_hint_flags |= librados::ALLOC_HINT_FLAG_INCOMPRESSIBLE;
+ }
+
+ op.set_alloc_hint2(0, 0, alloc_hint_flags);
+}
+
int RadosWriter::set_stripe_obj(const rgw_raw_obj& raw_obj)
{
stripe_obj = store->svc()->rados->obj(raw_obj);
return 0;
}
librados::ObjectWriteOperation op;
+ add_write_hint(op);
if (offset == 0) {
op.write_full(data);
} else {
librados::ObjectWriteOperation op;
op.create(true); // exclusive create
+ add_write_hint(op);
op.write_full(data);
constexpr uint64_t id = 0; // unused
~RadosWriter();
+ // add alloc hint to osd
+ void add_write_hint(librados::ObjectWriteOperation& op);
+
// change the current stripe object
int set_stripe_obj(const rgw_raw_obj& obj);
is_olh = rhs.is_olh;
objv_tracker = rhs.objv_tracker;
pg_ver = rhs.pg_ver;
+ compressed = rhs.compressed;
}
RGWObjState *RGWObjectCtx::get_state(const rgw_obj& obj) {
return result;
}
+void RGWObjectCtx::set_compressed(const rgw_obj& obj) {
+ std::unique_lock wl{lock};
+ assert (!obj.empty());
+ objs_state[obj].compressed = true;
+}
+
void RGWObjectCtx::set_atomic(rgw_obj& obj) {
std::unique_lock wl{lock};
assert (!obj.empty());
}
bool is_atomic = iter->second.is_atomic;
bool prefetch_data = iter->second.prefetch_data;
+ bool compressed = iter->second.compressed;
objs_state.erase(iter);
- if (is_atomic || prefetch_data) {
+ if (is_atomic || prefetch_data || compressed) {
auto& state = objs_state[obj];
state.is_atomic = is_atomic;
state.prefetch_data = prefetch_data;
+ state.compressed = compressed;
}
}
/* if we want to overwrite the data, we also want to overwrite the
xattrs, so just remove the object */
op.write_full(*meta.data);
+ if (state->compressed) {
+ uint32_t alloc_hint_flags = librados::ALLOC_HINT_FLAG_INCOMPRESSIBLE;
+ op.set_alloc_hint2(0, 0, alloc_hint_flags);
+ }
}
string etag;
RGWObjState *get_state(const rgw_obj& obj);
+ void set_compressed(const rgw_obj& obj);
void set_atomic(rgw_obj& obj);
void set_prefetch_data(const rgw_obj& obj);
void invalidate(const rgw_obj& obj);
RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
rctx->set_prefetch_data(obj);
}
+ void set_compressed(void *ctx, const rgw_obj& obj) {
+ RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
+ rctx->set_compressed(obj);
+ }
int decode_policy(const DoutPrefixProvider *dpp, bufferlist& bl, ACLOwner *owner);
int get_bucket_stats(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, int shard_id, std::string *bucket_ver, std::string *master_ver,
std::map<RGWObjCategory, RGWStorageStats>& stats, std::string *max_marker, bool* syncstopped = NULL);
virtual int set_acl(const RGWAccessControlPolicy& acl) = 0;
virtual void set_atomic(RGWObjectCtx* rctx) const = 0;
virtual void set_prefetch_data(RGWObjectCtx* rctx) = 0;
+ virtual void set_compressed(RGWObjectCtx* rctx) = 0;
bool empty() const { return key.empty(); }
const std::string &get_name() const { return key.name; }
return;
}
+ /* RGWObjectCtx will be moved out of sal */
+ /* XXX: Placeholder. Should not be needed later after Dan's patch */
+ void DBObject::set_compressed(RGWObjectCtx* rctx)
+ {
+ return;
+ }
+
bool DBObject::is_expired() {
return false;
}
virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
virtual void set_atomic(RGWObjectCtx* rctx) const override;
virtual void set_prefetch_data(RGWObjectCtx* rctx) override;
+ virtual void set_compressed(RGWObjectCtx* rctx) override;
virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjectCtx* rctx, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
virtual int set_obj_attrs(const DoutPrefixProvider* dpp, RGWObjectCtx* rctx, Attrs* setattrs, Attrs* delattrs, optional_yield y, rgw_obj* target_obj = NULL) override;
real_time(), NULL, dpp, y);
}
+void RadosObject::set_compressed(RGWObjectCtx* rctx) {
+ rgw_obj obj = get_obj();
+ store->getRados()->set_compressed(rctx, obj);
+}
+
void RadosObject::set_atomic(RGWObjectCtx* rctx) const
{
rgw_obj obj = get_obj();
virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
virtual void set_atomic(RGWObjectCtx* rctx) const override;
virtual void set_prefetch_data(RGWObjectCtx* rctx) override;
+ virtual void set_compressed(RGWObjectCtx* rctx) override;
virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjectCtx* rctx, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
virtual int set_obj_attrs(const DoutPrefixProvider* dpp, RGWObjectCtx* rctx, Attrs* setattrs, Attrs* delattrs, optional_yield y, rgw_obj* target_obj = NULL) override;