m_lock(lock) { }
virtual ~ObjecterWriteback() {}
- virtual void read(const object_t& oid, const object_locator_t& oloc,
- uint64_t off, uint64_t len, snapid_t snapid,
- bufferlist *pbl, uint64_t trunc_size, __u32 trunc_seq,
- Context *onfinish) {
+ virtual void read(const object_t& oid, uint64_t object_no,
+ const object_locator_t& oloc, uint64_t off, uint64_t len,
+ snapid_t snapid, bufferlist *pbl, uint64_t trunc_size,
+ __u32 trunc_seq, Context *onfinish) {
m_objecter->read_trunc(oid, oloc, off, len, snapid, pbl, 0,
trunc_size, trunc_seq,
new C_OnFinisher(new C_Lock(m_lock, onfinish),
int AioRead::send() {
ldout(m_ictx->cct, 20) << "send " << this << " " << m_oid << " " << m_object_off << "~" << m_object_len << dendl;
+ // send read request to parent if the object doesn't exist locally
+ if (!m_ictx->object_may_exist(m_object_no)) {
+ complete(-ENOENT);
+ return 0;
+ }
+
librados::AioCompletion *rados_completion =
librados::Rados::aio_create_completion(this, rados_req_cb, NULL);
int r;
return -ENOENT;
}
- void ImageCtx::aio_read_from_cache(object_t o, bufferlist *bl, size_t len,
+ void ImageCtx::aio_read_from_cache(object_t o, uint64_t object_no,
+ bufferlist *bl, size_t len,
uint64_t off, Context *onfinish) {
snap_lock.get_read();
ObjectCacher::OSDRead *rd = object_cacher->prepare_read(snap_id, bl, 0);
snap_lock.put_read();
- ObjectExtent extent(o, 0 /* a lie */, off, len, 0);
+ ObjectExtent extent(o, object_no, off, len, 0);
extent.oloc.pool = data_ctx.get_id();
extent.buffer_extents.push_back(make_pair(0, len));
rd->extents.push_back(extent);
}
}
- int ImageCtx::read_from_cache(object_t o, bufferlist *bl, size_t len,
- uint64_t off) {
+ int ImageCtx::read_from_cache(object_t o, uint64_t object_no, bufferlist *bl,
+ size_t len, uint64_t off) {
int r;
Mutex mylock("librbd::ImageCtx::read_from_cache");
Cond cond;
bool done;
Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
- aio_read_from_cache(o, bl, len, off, onfinish);
+ aio_read_from_cache(o, object_no, bl, len, off, onfinish);
mylock.Lock();
while (!done)
cond.Wait(mylock);
}
}
+ bool ImageCtx::object_may_exist(uint64_t object_no) const
+ {
+ // Fall back to default logic if object map is disabled
+ if ((features & RBD_FEATURE_OBJECT_MAP) == 0 /* || invalid map */) {
+ return true;
+ }
+
+ RWLock::RLocker l(object_map_lock);
+ assert(object_no < object_map.size());
+ return (object_map[object_no] == OBJECT_EXISTS ||
+ object_map[object_no] == OBJECT_PENDING);
+ }
+
int ImageCtx::refresh_object_map()
{
if ((features & RBD_FEATURE_OBJECT_MAP) == 0) {
return 0;
}
+ RWLock::WLocker l(object_map_lock);
int r = cls_client::object_map_load(&data_ctx, object_map_name(id),
&object_map);
if (r < 0) {
return 0;
}
+ RWLock::WLocker l(object_map_lock);
uint64_t num_objs = Striper::get_num_objects(layout, get_current_size());
ldout(cct, 20) << "resizing object map: " << num_objs << dendl;
librados::ObjectWriteOperation op;
return 0;
}
+ RWLock::WLocker l(object_map_lock);
assert(start_object_no <= end_object_no);
assert(/* flagged as invalid || */ end_object_no <= object_map.size());
if (end_object_no > object_map.size()) {
int r = data_ctx.operate(object_map_name(id), &op);
if (r < 0) {
lderr(cct) << "object map update failed: " << cpp_strerror(r) << dendl;
- // TODO: disable object map
+ // TODO: remove RBD_FEATURE_EXCLUSIVE_LOCK feature on image
} else {
for (uint64_t object_no = start_object_no; object_no < end_object_no;
++object_no) {
uint64_t get_parent_snap_id(librados::snap_t in_snap_id) const;
int get_parent_overlap(librados::snap_t in_snap_id,
uint64_t *overlap) const;
- void aio_read_from_cache(object_t o, bufferlist *bl, size_t len,
- uint64_t off, Context *onfinish);
+ void aio_read_from_cache(object_t o, uint64_t object_no, bufferlist *bl,
+ size_t len, uint64_t off, Context *onfinish);
void write_to_cache(object_t o, bufferlist& bl, size_t len, uint64_t off,
Context *onfinish);
- int read_from_cache(object_t o, bufferlist *bl, size_t len, uint64_t off);
+ int read_from_cache(object_t o, uint64_t object_no, bufferlist *bl,
+ size_t len, uint64_t off);
void user_flushed();
void flush_cache_aio(Context *onfinish);
int flush_cache();
void wait_for_pending_aio();
void wait_for_pending_copyup();
+ /* object map */
+ bool object_may_exist(uint64_t object_no) const;
int refresh_object_map();
int resize_object_map(uint8_t default_object_state);
int update_object_map(uint64_t object_no, uint8_t object_state);
ldout(m_image_ctx.cct, 20) << "acquired exclusive lock" << dendl;
m_lock_owner_state = LOCK_OWNER_STATE_LOCKED;
+ r = m_image_ctx.refresh_object_map();
+ if (r < 0) {
+ unlock();
+ return r;
+ }
+
bufferlist bl;
ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
::encode(NOTIFY_OP_ACQUIRED_LOCK, bl);
#include "common/ceph_context.h"
#include "common/dout.h"
+#include "common/Finisher.h"
#include "common/Mutex.h"
#include "include/Context.h"
#include "include/rados/librados.hpp"
};
LibrbdWriteback::LibrbdWriteback(ImageCtx *ictx, Mutex& lock)
- : m_tid(0), m_lock(lock), m_ictx(ictx)
+ : m_finisher(new Finisher(ictx->cct)), m_tid(0), m_lock(lock), m_ictx(ictx)
{
+ m_finisher->start();
}
- void LibrbdWriteback::read(const object_t& oid,
+ LibrbdWriteback::~LibrbdWriteback() {
+ m_finisher->stop();
+ delete m_finisher;
+ }
+
+ void LibrbdWriteback::read(const object_t& oid, uint64_t object_no,
const object_locator_t& oloc,
uint64_t off, uint64_t len, snapid_t snapid,
bufferlist *pbl, uint64_t trunc_size,
{
// on completion, take the mutex and then call onfinish.
Context *req = new C_Request(m_ictx->cct, onfinish, &m_lock);
+ if (!m_ictx->object_may_exist(object_no)) {
+ m_finisher->queue(req, -ENOENT);
+ return;
+ }
+
librados::AioCompletion *rados_completion =
librados::Rados::aio_create_completion(req, context_cb, NULL);
librados::ObjectReadOperation op;
#include "osd/osd_types.h"
#include "osdc/WritebackHandler.h"
+class Finisher;
class Mutex;
namespace librbd {
class LibrbdWriteback : public WritebackHandler {
public:
LibrbdWriteback(ImageCtx *ictx, Mutex& lock);
- virtual ~LibrbdWriteback() {}
+ virtual ~LibrbdWriteback();
// Note that oloc, trunc_size, and trunc_seq are ignored
- virtual void read(const object_t& oid, const object_locator_t& oloc,
- uint64_t off, uint64_t len, snapid_t snapid,
- bufferlist *pbl, uint64_t trunc_size, __u32 trunc_seq,
- Context *onfinish);
+ virtual void read(const object_t& oid, uint64_t object_no,
+ const object_locator_t& oloc, uint64_t off, uint64_t len,
+ snapid_t snapid, bufferlist *pbl, uint64_t trunc_size,
+ __u32 trunc_seq, Context *onfinish);
// Determine whether a read to this extent could be affected by a write-triggered copy-on-write
virtual bool may_copy_on_write(const object_t& oid, uint64_t read_off, uint64_t read_len, snapid_t snapid);
private:
void complete_writes(const std::string& oid);
+ Finisher *m_finisher;
ceph_tid_t m_tid;
Mutex& m_lock;
librbd::ImageCtx *m_ictx;
return io_ctx.tmap_update(RBD_DIRECTORY, cmdbl);
}
+ void rollback_object(ImageCtx *ictx, uint64_t snap_id, const string& oid,
+ SimpleThrottle& throttle)
+ {
+ Context *req_comp = new C_SimpleThrottle(&throttle);
+ librados::AioCompletion *rados_completion =
+ librados::Rados::aio_create_completion(req_comp, NULL, rados_ctx_cb);
+ librados::ObjectWriteOperation op;
+ op.selfmanaged_snap_rollback(snap_id);
+ ictx->data_ctx.aio_operate(oid, rados_completion, &op);
+ ldout(ictx->cct, 10) << "scheduling selfmanaged_snap_rollback on "
+ << oid << " to " << snap_id << dendl;
+ rados_completion->release();
+ }
+
int rollback_image(ImageCtx *ictx, uint64_t snap_id,
ProgressContext& prog_ctx)
{
for (uint64_t i = 0; i < numseg; i++) {
string oid = ictx->get_object_name(i);
- Context *req_comp = new C_SimpleThrottle(&throttle);
- librados::AioCompletion *rados_completion =
- librados::Rados::aio_create_completion(req_comp, NULL, rados_ctx_cb);
- librados::ObjectWriteOperation op;
- op.selfmanaged_snap_rollback(snap_id);
- ictx->data_ctx.aio_operate(oid, rados_completion, &op);
- ldout(cct, 10) << "scheduling selfmanaged_snap_rollback on "
- << oid << " to " << snap_id << dendl;
- rados_completion->release();
+ rollback_object(ictx, snap_id, ictx->get_object_name(i), throttle);
prog_ctx.update_progress(i * bsize, numseg * bsize);
}
+ rollback_object(ictx, snap_id, object_map_name(ictx->id), throttle);
r = throttle.wait_for_ret();
if (r < 0) {
ostringstream oss;
CephContext *cct = (CephContext *)io_ctx.cct();
+ ceph_file_layout layout;
+
id_obj = id_obj_name(imgname);
int r = io_ctx.create(id_obj, true);
}
}
+ if ((features & RBD_FEATURE_OBJECT_MAP) != 0) {
+ if ((features & RBD_FEATURE_EXCLUSIVE_LOCK) == 0) {
+ lderr(cct) << "cannot use object map without exclusive lock" << dendl;
+ goto err_remove_header;
+ }
+
+ memset(&layout, 0, sizeof(layout));
+ layout.fl_object_size = 1ull << order;
+ if (stripe_unit == 0 || stripe_count == 0) {
+ layout.fl_stripe_unit = layout.fl_object_size;
+ layout.fl_stripe_count = 1;
+ } else {
+ layout.fl_stripe_unit = stripe_unit;
+ layout.fl_stripe_count = stripe_count;
+ }
+
+ librados::ObjectWriteOperation op;
+ cls_client::object_map_resize(&op, Striper::get_num_objects(layout, size),
+ OBJECT_NONEXISTENT);
+ r = io_ctx.operate(object_map_name(id), &op);
+ if (r < 0) {
+ goto err_remove_header;
+ }
+ }
+
ldout(cct, 2) << "done." << dendl;
return 0;
}
}
if (!old_format) {
+ r = io_ctx.remove(object_map_name(id));
+ if (r < 0 && r != -ENOENT) {
+ lderr(cct) << "error removing image object map" << dendl;
+ }
+
ldout(cct, 2) << "removing id object..." << dendl;
r = io_ctx.remove(id_obj_name(imgname));
if (r < 0 && r != -ENOENT) {
<< dendl;
return;
}
+
+ m_ictx->size = m_new_size;
+ m_ictx->resize_object_map(OBJECT_NONEXISTENT);
}
private:
return -ERESTART;
}
+ if (!m_ictx->object_may_exist(m_object_no)) {
+ return 1;
+ }
+
string oid = m_ictx->get_object_name(m_object_no);
librados::AioCompletion *rados_completion =
librados::Rados::aio_create_completion(this, NULL, rados_ctx_cb);
}
virtual void finish(int r) {
- if (r < 0 || m_delete_offset <= m_new_size) {
+ if (r < 0) {
m_ctx->complete(r);
return;
}
return;
}
+ m_ictx->update_object_map(m_delete_start, m_num_objects,
+ OBJECT_NONEXISTENT, OBJECT_PENDING);
+ if (m_delete_offset <= m_new_size) {
+ m_ctx->complete(r);
+ return;
+ }
+
// discard the weird boundary, if any
vector<ObjectExtent> extents;
Striper::file_to_extents(m_ictx->cct, m_ictx->format_string,
Context *req_comp = new C_ContextCompletion(*completion);
librados::AioCompletion *rados_completion =
librados::Rados::aio_create_completion(req_comp, NULL, rados_ctx_cb);
+
+ bool flag_nonexistent = false;
if (p->offset == 0) {
+ flag_nonexistent = true;
+ m_ictx->update_object_map(p->objectno, p->objectno + 1,
+ OBJECT_PENDING, OBJECT_EXISTS);
m_ictx->data_ctx.aio_remove(p->oid.name, rados_completion);
} else {
+ m_ictx->update_object_map(p->objectno, OBJECT_EXISTS);
librados::ObjectWriteOperation op;
op.truncate(p->offset);
m_ictx->data_ctx.aio_operate(p->oid.name, rados_completion, &op);
}
rados_completion->release();
+
+ if (flag_nonexistent) {
+ m_ictx->update_object_map(p->objectno, p->objectno + 1,
+ OBJECT_NONEXISTENT, OBJECT_PENDING);
+ }
}
completion->finish_adding_requests();
}
ldout(cct, 2) << "trim_image objects " << delete_start << " to "
<< (num_objects - 1) << dendl;
+ ictx->update_object_map(delete_start, num_objects, OBJECT_PENDING,
+ OBJECT_EXISTS);
+
AsyncObjectThrottle::ContextFactory context_factory(
boost::lambda::bind(boost::lambda::new_ptr<AsyncTrimObjectContext>(),
boost::lambda::_1, ictx, boost::lambda::_2));
vector<parent_info> snap_parents;
vector<uint8_t> snap_protection;
{
+ int r;
RWLock::WLocker l(ictx->snap_lock);
{
- int r;
RWLock::WLocker l2(ictx->parent_lock);
ictx->lockers.clear();
if (ictx->old_format) {
ictx->snap_exists = false;
}
+ if (ictx->snap_exists) {
+ r = ictx->refresh_object_map();
+ if (r < 0) {
+ return r;
+ }
+ }
+
ictx->data_ctx.selfmanaged_snap_set_write_ctx(ictx->snapc.seq, ictx->snaps);
} // release snap_lock
return r;
}
+ ictx->refresh_object_map();
refresh_parent(ictx);
return 0;
}
bl.append(buf + q->first, q->second);
}
+ r = ictx->update_object_map(p->objectno, OBJECT_EXISTS);
+ if (r < 0) {
+ goto done;
+ }
+
C_AioWrite *req_comp = new C_AioWrite(cct, c);
if (ictx->object_cacher) {
c->add_request();
object_overlap = ictx->prune_parent_extents(objectx, overlap);
}
+ bool flag_nonexistent = false;
if (p->offset == 0 && p->length == ictx->layout.fl_object_size) {
req = new AioRemove(ictx, p->oid.name, p->objectno, objectx, object_overlap,
snapc, snap_id, req_comp);
+ if (!req->has_parent()) {
+ ictx->update_object_map(p->objectno, p->objectno + 1, OBJECT_PENDING,
+ OBJECT_EXISTS);
+ flag_nonexistent = true;
+ }
} else if (p->offset + p->length == ictx->layout.fl_object_size) {
req = new AioTruncate(ictx, p->oid.name, p->objectno, p->offset, objectx, object_overlap,
snapc, snap_id, req_comp);
snapc, snap_id, req_comp);
}
+ if (!flag_nonexistent) {
+ ictx->update_object_map(p->objectno, OBJECT_EXISTS);
+ }
+
r = req->send();
if (r < 0)
goto done;
+
+ if (flag_nonexistent) {
+ ictx->update_object_map(p->objectno, p->objectno + 1, OBJECT_NONEXISTENT,
+ OBJECT_PENDING);
+ }
}
r = 0;
done:
Context *req_comp = new C_RBD_Readahead(ictx, q->oid, q->offset, q->length);
ictx->readahead.inc_pending();
- ictx->aio_read_from_cache(q->oid, NULL,
+ ictx->aio_read_from_cache(q->oid, q->objectno, NULL,
q->length, q->offset,
req_comp);
}
if (ictx->object_cacher) {
C_CacheRead *cache_comp = new C_CacheRead(req);
- ictx->aio_read_from_cache(q->oid, &req->data(),
+ ictx->aio_read_from_cache(q->oid, q->objectno, &req->data(),
q->length, q->offset,
cache_comp);
} else {
};
class Context;
+class SimpleThrottle;
namespace librbd {
ceph::bufferlist& header);
int tmap_set(librados::IoCtx& io_ctx, const std::string& imgname);
int tmap_rm(librados::IoCtx& io_ctx, const std::string& imgname);
+ void rollback_object(ImageCtx *ictx, uint64_t snap_id, const string& oid,
+ SimpleThrottle& throttle);
int rollback_image(ImageCtx *ictx, uint64_t snap_id,
ProgressContext& prog_ctx);
void image_info(const ImageCtx *ictx, image_info_t& info, size_t info_size);
}
/* private */
-ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid, ObjectSet *oset,
+ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid,
+ uint64_t object_no,
+ ObjectSet *oset,
object_locator_t &l,
uint64_t truncate_size,
uint64_t truncate_seq)
if ((uint32_t)l.pool < objects.size()) {
if (objects[l.pool].count(oid)) {
Object *o = objects[l.pool][oid];
+ o->object_no = object_no;
o->truncate_size = truncate_size;
o->truncate_seq = truncate_seq;
return o;
}
// create it.
- Object *o = new Object(this, oid, oset, l, truncate_size, truncate_seq);
+ Object *o = new Object(this, oid, object_no, oset, l, truncate_size,
+ truncate_seq);
objects[l.pool][oid] = o;
ob_lru.lru_insert_top(o);
return o;
C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob, bh->last_read_tid,
bh->start(), bh->length());
// go
- writeback_handler.read(bh->ob->get_oid(), bh->ob->get_oloc(),
- bh->start(), bh->length(), bh->ob->get_snap(),
- &onfinish->bl, bh->ob->truncate_size, bh->ob->truncate_seq,
- onfinish);
+ writeback_handler.read(bh->ob->get_oid(), bh->ob->get_object_number(),
+ bh->ob->get_oloc(), bh->start(), bh->length(),
+ bh->ob->get_snap(), &onfinish->bl,
+ bh->ob->truncate_size, bh->ob->truncate_seq, onfinish);
++reads_outstanding;
}
// get Object cache
sobject_t soid(ex_it->oid, rd->snap);
- Object *o = get_object(soid, oset, ex_it->oloc, ex_it->truncate_size, oset->truncate_seq);
+ Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc, ex_it->truncate_size, oset->truncate_seq);
touch_ob(o);
// does not exist and no hits?
++ex_it) {
// get object cache
sobject_t soid(ex_it->oid, CEPH_NOSNAP);
- Object *o = get_object(soid, oset, ex_it->oloc, ex_it->truncate_size, oset->truncate_seq);
+ Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
+ ex_it->truncate_size, oset->truncate_seq);
// map it all into a single bufferhead.
BufferHead *bh = o->map_write(wr);
friend struct ObjectSet;
public:
+ uint64_t object_no;
ObjectSet *oset;
xlist<Object*>::item set_item;
object_locator_t oloc;
Object(const Object& other);
const Object& operator=(const Object& other);
- Object(ObjectCacher *_oc, sobject_t o, ObjectSet *os, object_locator_t& l,
- uint64_t ts, uint64_t tq) :
+ Object(ObjectCacher *_oc, sobject_t o, uint64_t ono, ObjectSet *os,
+ object_locator_t& l, uint64_t ts, uint64_t tq) :
ref(0),
oc(_oc),
- oid(o), oset(os), set_item(this), oloc(l),
+ oid(o), object_no(ono), oset(os), set_item(this), oloc(l),
truncate_size(ts), truncate_seq(tq),
complete(false), exists(true),
last_write_tid(0), last_commit_tid(0),
snapid_t get_snap() { return oid.snap; }
ObjectSet *get_object_set() { return oset; }
string get_namespace() { return oloc.nspace; }
+ uint64_t get_object_number() const { return object_no; }
object_locator_t& get_oloc() { return oloc; }
void set_object_locator(object_locator_t& l) { oloc = l; }
return NULL;
}
- Object *get_object(sobject_t oid, ObjectSet *oset, object_locator_t &l,
- uint64_t truncate_size, uint64_t truncate_seq);
+ Object *get_object(sobject_t oid, uint64_t object_no, ObjectSet *oset,
+ object_locator_t &l, uint64_t truncate_size,
+ uint64_t truncate_seq);
void close_object(Object *ob);
// bh stats
WritebackHandler() {}
virtual ~WritebackHandler() {}
- virtual void read(const object_t& oid, const object_locator_t& oloc,
- uint64_t off, uint64_t len, snapid_t snapid,
- bufferlist *pbl, uint64_t trunc_size, __u32 trunc_seq,
- Context *onfinish) = 0;
+ virtual void read(const object_t& oid, uint64_t object_no,
+ const object_locator_t& oloc, uint64_t off, uint64_t len,
+ snapid_t snapid, bufferlist *pbl, uint64_t trunc_size,
+ __u32 trunc_seq, Context *onfinish) = 0;
/**
* check if a given extent read result may change due to a write
*
delete m_finisher;
}
-void FakeWriteback::read(const object_t& oid,
+void FakeWriteback::read(const object_t& oid, uint64_t object_no,
const object_locator_t& oloc,
uint64_t off, uint64_t len, snapid_t snapid,
bufferlist *pbl, uint64_t trunc_size,
FakeWriteback(CephContext *cct, Mutex *lock, uint64_t delay_ns);
virtual ~FakeWriteback();
- virtual void read(const object_t& oid, const object_locator_t& oloc,
- uint64_t off, uint64_t len, snapid_t snapid,
- bufferlist *pbl, uint64_t trunc_size, __u32 trunc_seq,
- Context *onfinish);
+ virtual void read(const object_t& oid, uint64_t object_no,
+ const object_locator_t& oloc, uint64_t off, uint64_t len,
+ snapid_t snapid, bufferlist *pbl, uint64_t trunc_size,
+ __u32 trunc_seq, Context *onfinish);
virtual ceph_tid_t write(const object_t& oid, const object_locator_t& oloc,
uint64_t off, uint64_t len,
eq(image1.is_exclusive_lock_owner(), False)
eq(image2.is_exclusive_lock_owner(), True)
for offset in [0, IMG_SIZE / 2]:
- read = image2.read(0, 256)
+ read = image2.read(offset, 256)
eq(data, read)
export RBD_CREATE_ARGS="--format 2"
run_cli_tests
-for i in 0 1 5
+for i in 0 1 5 13
do
export RBD_FEATURES=$i
run_api_tests