int FileStore::get_index(coll_t cid, Index *index)
{
- char path[PATH_MAX];
- get_cdir(cid, path, sizeof(path));
- int r = index_manager.get_index(cid, path, index);
+ int r = index_manager.get_index(cid, basedir, index);
assert(!m_filestore_fail_eio || r != -EIO);
return r;
}
return r;
}
-int FileStore::lfn_find(coll_t cid, const ghobject_t& oid, IndexedPath *path)
+int FileStore::lfn_find(const ghobject_t& oid, const Index& index, IndexedPath *path)
{
- Index index;
+ IndexedPath path2;
+ if (!path)
+ path = &path2;
int r, exist;
- r = get_index(cid, &index);
- if (r < 0)
- return r;
-
- r = index->lookup(oid, path, &exist);
+ assert(NULL != index.index);
+ r = (index.index)->lookup(oid, path, &exist);
if (r < 0) {
assert(!m_filestore_fail_eio || r != -EIO);
return r;
int FileStore::lfn_truncate(coll_t cid, const ghobject_t& oid, off_t length)
{
- IndexedPath path;
FDRef fd;
- int r = lfn_open(cid, oid, false, &fd, &path);
+ int r = lfn_open(cid, oid, false, &fd);
if (r < 0)
return r;
r = ::ftruncate(**fd, length);
int FileStore::lfn_stat(coll_t cid, const ghobject_t& oid, struct stat *buf)
{
IndexedPath path;
- int r = lfn_find(cid, oid, &path);
+ Index index;
+ int r = get_index(cid, &index);
+ if (r < 0)
+ return r;
+
+ assert(NULL != index.index);
+ RWLock::RLocker l((index.index)->access_lock);
+
+ r = lfn_find(oid, index, &path);
if (r < 0)
return r;
r = ::stat(path->path(), buf);
const ghobject_t& oid,
bool create,
FDRef *outfd,
- IndexedPath *path,
- Index *index)
+ Index *index)
{
assert(get_allow_sharded_objects() ||
( oid.shard_id == shard_id_t::NO_SHARD &&
oid.generation == ghobject_t::NO_GEN ));
assert(outfd);
+
+ bool need_lock = true;
int flags = O_RDWR;
if (create)
flags |= O_CREAT;
+
Index index2;
if (!index) {
index = &index2;
}
int r = 0;
- if (!(*index)) {
+ if (!((*index).index)) {
r = get_index(cid, index);
+ } else {
+ need_lock = false;
}
int fd, exist;
return 0;
}
- {
- IndexedPath path2;
- if (!path)
- path = &path2;
- if (r < 0) {
- derr << "error getting collection index for " << cid
- << ": " << cpp_strerror(-r) << dendl;
- goto fail;
- }
- r = (*index)->lookup(oid, path, &exist);
+
+ assert(NULL != (*index).index);
+ if (need_lock) {
+ ((*index).index)->access_lock.get_write();
+ }
+
+ IndexedPath path2;
+ IndexedPath *path = &path2;
+ if (r < 0) {
+ derr << "error getting collection index for " << cid
+ << ": " << cpp_strerror(-r) << dendl;
+ goto fail;
+ }
+ r = (*index)->lookup(oid, path, &exist);
+ if (r < 0) {
+ derr << "could not find " << oid << " in index: "
+ << cpp_strerror(-r) << dendl;
+ goto fail;
+ }
+
+ r = ::open((*path)->path(), flags, 0644);
+ if (r < 0) {
+ r = -errno;
+ dout(10) << "error opening file " << (*path)->path() << " with flags="
+ << flags << ": " << cpp_strerror(-r) << dendl;
+ goto fail;
+ }
+ fd = r;
+ if (create && (!exist)) {
+ r = (*index)->created(oid, (*path)->path());
if (r < 0) {
- derr << "could not find " << oid << " in index: "
- << cpp_strerror(-r) << dendl;
+ VOID_TEMP_FAILURE_RETRY(::close(fd));
+ derr << "error creating " << oid << " (" << (*path)->path()
+ << ") in index: " << cpp_strerror(-r) << dendl;
goto fail;
}
-
- r = ::open((*path)->path(), flags, 0644);
+ r = chain_fsetxattr(fd, XATTR_SPILL_OUT_NAME,
+ XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT));
if (r < 0) {
- r = -errno;
- dout(10) << "error opening file " << (*path)->path() << " with flags="
- << flags << ": " << cpp_strerror(-r) << dendl;
+ VOID_TEMP_FAILURE_RETRY(::close(fd));
+ derr << "error setting spillout xattr for oid " << oid << " (" << (*path)->path()
+ << "):" << cpp_strerror(-r) << dendl;
goto fail;
}
- fd = r;
-
- if (create && (!exist)) {
- r = (*index)->created(oid, (*path)->path());
- if (r < 0) {
- VOID_TEMP_FAILURE_RETRY(::close(fd));
- derr << "error creating " << oid << " (" << (*path)->path()
- << ") in index: " << cpp_strerror(-r) << dendl;
- goto fail;
- }
- r = chain_fsetxattr(fd, XATTR_SPILL_OUT_NAME,
- XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT));
- if (r < 0) {
- VOID_TEMP_FAILURE_RETRY(::close(fd));
- derr << "error setting spillout xattr for oid " << oid << " (" << (*path)->path()
- << "):" << cpp_strerror(-r) << dendl;
- goto fail;
- }
- }
}
if (!replaying) {
*outfd = fdcache.add(oid, fd, &existed);
if (existed) {
TEMP_FAILURE_RETRY(::close(fd));
- return 0;
}
} else {
*outfd = FDRef(new FDCache::FD(fd));
}
+
+ if (need_lock) {
+ ((*index).index)->access_lock.put_write();
+ }
+
return 0;
fail:
+
+ if (need_lock) {
+ ((*index).index)->access_lock.put_write();
+ }
+
assert(!m_filestore_fail_eio || r != -EIO);
return r;
}
IndexedPath path_new, path_old;
int exist;
int r;
+ bool index_same = false;
if (c < newcid) {
r = get_index(newcid, &index_new);
if (r < 0)
if (r < 0)
return r;
index_new = index_old;
+ index_same = true;
} else {
r = get_index(c, &index_old);
if (r < 0)
return r;
}
- r = index_old->lookup(o, &path_old, &exist);
- if (r < 0) {
- assert(!m_filestore_fail_eio || r != -EIO);
- return r;
- }
- if (!exist)
- return -ENOENT;
+ assert(NULL != index_old.index);
+ assert(NULL != index_new.index);
- r = index_new->lookup(newoid, &path_new, &exist);
- if (r < 0) {
- assert(!m_filestore_fail_eio || r != -EIO);
- return r;
- }
- if (exist)
- return -EEXIST;
+ if (!index_same) {
- dout(25) << "lfn_link path_old: " << path_old << dendl;
- dout(25) << "lfn_link path_new: " << path_new << dendl;
- r = ::link(path_old->path(), path_new->path());
- if (r < 0)
- return -errno;
+ RWLock::RLocker l1((index_old.index)->access_lock);
- r = index_new->created(newoid, path_new->path());
- if (r < 0) {
- assert(!m_filestore_fail_eio || r != -EIO);
- return r;
- }
+ r = index_old->lookup(o, &path_old, &exist);
+ if (r < 0) {
+ assert(!m_filestore_fail_eio || r != -EIO);
+ return r;
+ }
+ if (!exist)
+ return -ENOENT;
+
+ RWLock::WLocker l2((index_new.index)->access_lock);
+
+ r = index_new->lookup(newoid, &path_new, &exist);
+ if (r < 0) {
+ assert(!m_filestore_fail_eio || r != -EIO);
+ return r;
+ }
+ if (exist)
+ return -EEXIST;
+
+ dout(25) << "lfn_link path_old: " << path_old << dendl;
+ dout(25) << "lfn_link path_new: " << path_new << dendl;
+ r = ::link(path_old->path(), path_new->path());
+ if (r < 0)
+ return -errno;
+
+ r = index_new->created(newoid, path_new->path());
+ if (r < 0) {
+ assert(!m_filestore_fail_eio || r != -EIO);
+ return r;
+ }
+ } else {
+ RWLock::WLocker l1((index_old.index)->access_lock);
+
+ r = index_old->lookup(o, &path_old, &exist);
+ if (r < 0) {
+ assert(!m_filestore_fail_eio || r != -EIO);
+ return r;
+ }
+ if (!exist)
+ return -ENOENT;
+
+ r = index_new->lookup(newoid, &path_new, &exist);
+ if (r < 0) {
+ assert(!m_filestore_fail_eio || r != -EIO);
+ return r;
+ }
+ if (exist)
+ return -EEXIST;
+
+ dout(25) << "lfn_link path_old: " << path_old << dendl;
+ dout(25) << "lfn_link path_new: " << path_new << dendl;
+ r = ::link(path_old->path(), path_new->path());
+ if (r < 0)
+ return -errno;
+
+ r = index_new->created(newoid, path_new->path());
+ if (r < 0) {
+ assert(!m_filestore_fail_eio || r != -EIO);
+ return r;
+ }
+ }
return 0;
}
int r = get_index(cid, &index);
if (r < 0)
return r;
+
+ assert(NULL != index.index);
+ RWLock::WLocker l((index.index)->access_lock);
+
{
IndexedPath path;
int exist;
<< " with error: " << ret << dendl;
goto close_current_fd;
}
+ assert(NULL != index.index);
+ RWLock::WLocker l((index.index)->access_lock);
+
index->cleanup();
}
}
FDRef o, n;
{
Index index;
- IndexedPath from, to;
- r = lfn_open(cid, oldoid, false, &o, &from, &index);
+ r = lfn_open(cid, oldoid, false, &o, &index);
if (r < 0) {
goto out2;
}
- r = lfn_open(cid, newoid, true, &n, &to, &index);
+ assert(NULL != (index.index));
+ RWLock::WLocker l((index.index)->access_lock);
+
+ r = lfn_open(cid, newoid, true, &n, &index);
if (r < 0) {
goto out;
}
dout(10) << __func__ << " could not get index r = " << r << dendl;
goto out;
}
+ assert(NULL != index.index);
+ RWLock::RLocker l((index.index)->access_lock);
+
r = object_map->get_xattrs(oid, to_get, &got);
if (r < 0 && r != -ENOENT) {
dout(10) << __func__ << " get_xattrs err r =" << r << dendl;
dout(10) << __func__ << " could not get index r = " << r << dendl;
goto out;
}
- r = object_map->get_all_xattrs(oid, &omap_attrs);
- if (r < 0 && r != -ENOENT) {
- dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
- goto out;
- }
+ {
+ assert(NULL != index.index);
+ RWLock::RLocker l((index.index)->access_lock);
- r = object_map->get_xattrs(oid, omap_attrs, &omap_aset);
- if (r < 0 && r != -ENOENT) {
- dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
- goto out;
+ r = object_map->get_all_xattrs(oid, &omap_attrs);
+ if (r < 0 && r != -ENOENT) {
+ dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
+ goto out;
+ }
+
+ r = object_map->get_xattrs(oid, omap_attrs, &omap_aset);
+ if (r < 0 && r != -ENOENT) {
+ dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
+ goto out;
+ }
+ if (r == -ENOENT)
+ r = 0;
}
- if (r == -ENOENT)
- r = 0;
assert(omap_attrs.size() == omap_aset.size());
for (map<string, bufferlist>::iterator i = omap_aset.begin();
i != omap_aset.end();
dout(10) << __func__ << " could not get index r = " << r << dendl;
goto out_close;
}
+ assert(NULL != index.index);
+ RWLock::RLocker l((index.index)->access_lock);
+
set<string> to_remove;
to_remove.insert(string(name));
r = object_map->remove_xattrs(oid, to_remove, &spos);
dout(10) << __func__ << " could not get index r = " << r << dendl;
goto out_close;
}
- r = object_map->get_all_xattrs(oid, &omap_attrs);
- if (r < 0 && r != -ENOENT) {
- dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
- assert(!m_filestore_fail_eio || r != -EIO);
- goto out_close;
- }
- r = object_map->remove_xattrs(oid, omap_attrs, &spos);
- if (r < 0 && r != -ENOENT) {
- dout(10) << __func__ << " could not remove omap_attrs r = " << r << dendl;
- goto out_close;
- }
- if (r == -ENOENT)
- r = 0;
+ {
+ assert(NULL != index.index);
+ RWLock::RLocker l((index.index)->access_lock);
- chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
+ r = object_map->get_all_xattrs(oid, &omap_attrs);
+ if (r < 0 && r != -ENOENT) {
+ dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
+ assert(!m_filestore_fail_eio || r != -EIO);
+ goto out_close;
+ }
+ r = object_map->remove_xattrs(oid, omap_attrs, &spos);
+ if (r < 0 && r != -ENOENT) {
+ dout(10) << __func__ << " could not remove omap_attrs r = " << r << dendl;
+ goto out_close;
+ }
+ if (r == -ENOENT)
+ r = 0;
+ chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
sizeof(XATTR_NO_SPILL_OUT));
+ }
out_close:
lfn_close(fd);
int r = get_index(c, &index);
if (r < 0)
return r;
+
+ assert(NULL != index.index);
+ RWLock::WLocker l((index.index)->access_lock);
+
*version = index->collection_version();
if (*version == target_version)
return 1;
int r = get_index(c, &index);
if (r < 0)
return false;
+
+ assert(NULL != index.index);
+ RWLock::RLocker l((index.index)->access_lock);
+
vector<ghobject_t> ls;
collection_list_handle_t handle;
r = index->collection_list_partial(ghobject_t(), 1, 1, 0, &ls, NULL);
int r = get_index(c, &index);
if (r < 0)
return r;
+
+ assert(NULL != index.index);
+ RWLock::RLocker l((index.index)->access_lock);
+
r = index->collection_list_partial(start,
min, max, seq,
ls, next);
int r = get_index(c, &index);
if (r < 0)
return r;
+
+ assert(NULL != index.index);
+ RWLock::RLocker l((index.index)->access_lock);
+
r = index->collection_list(&ls);
assert(!m_filestore_fail_eio || r != -EIO);
return r;
map<string, bufferlist> *out)
{
dout(15) << __func__ << " " << c << "/" << hoid << dendl;
- IndexedPath path;
- int r = lfn_find(c, hoid, &path);
+ Index index;
+ int r = get_index(c, &index);
+ if (r < 0)
+ return r;
+
+ assert(NULL != index.index);
+ RWLock::RLocker l((index.index)->access_lock);
+
+ r = lfn_find(hoid, index);
if (r < 0)
return r;
r = object_map->get(hoid, header, out);
bool allow_eio)
{
dout(15) << __func__ << " " << c << "/" << hoid << dendl;
- IndexedPath path;
- int r = lfn_find(c, hoid, &path);
+ Index index;
+ int r = get_index(c, &index);
+ if (r < 0)
+ return r;
+
+ assert(NULL != index.index);
+ RWLock::RLocker l((index.index)->access_lock);
+
+ r = lfn_find(hoid, index);
+
if (r < 0)
return r;
r = object_map->get_header(hoid, bl);
int FileStore::omap_get_keys(coll_t c, const ghobject_t &hoid, set<string> *keys)
{
dout(15) << __func__ << " " << c << "/" << hoid << dendl;
- IndexedPath path;
- int r = lfn_find(c, hoid, &path);
+ Index index;
+ int r = get_index(c, &index);
+ if (r < 0)
+ return r;
+
+ assert(NULL != index.index);
+ RWLock::RLocker l((index.index)->access_lock);
+
+ r = lfn_find(hoid, index);
+
if (r < 0)
return r;
r = object_map->get_keys(hoid, keys);
map<string, bufferlist> *out)
{
dout(15) << __func__ << " " << c << "/" << hoid << dendl;
- IndexedPath path;
- int r = lfn_find(c, hoid, &path);
+ Index index;
+ int r = get_index(c, &index);
+ if (r < 0)
+ return r;
+
+ assert(NULL != index.index);
+ RWLock::RLocker l((index.index)->access_lock);
+
+ r = lfn_find(hoid, index);
+
if (r < 0)
return r;
r = object_map->get_values(hoid, keys, out);
set<string> *out)
{
dout(15) << __func__ << " " << c << "/" << hoid << dendl;
- IndexedPath path;
- int r = lfn_find(c, hoid, &path);
+
+ Index index;
+ int r = get_index(c, &index);
+ if (r < 0)
+ return r;
+
+ assert(NULL != index.index);
+ RWLock::RLocker l((index.index)->access_lock);
+
+ r = lfn_find(hoid, index);
+
if (r < 0)
return r;
r = object_map->check_keys(hoid, keys, out);
const ghobject_t &hoid)
{
dout(15) << __func__ << " " << c << "/" << hoid << dendl;
- IndexedPath path;
- int r = lfn_find(c, hoid, &path);
+ Index index;
+ int r = get_index(c, &index);
+ if (r < 0)
+ return ObjectMap::ObjectMapIterator();
+
+ assert(NULL != index.index);
+ RWLock::RLocker l((index.index)->access_lock);
+
+ r = lfn_find(hoid, index);
+
if (r < 0)
return ObjectMap::ObjectMapIterator();
return object_map->get_iterator(hoid);
int r = get_index(c, &from);
if (r < 0)
return r;
+ assert(NULL != from.index);
+ RWLock::WLocker l((from.index)->access_lock);
+
r = from->prep_delete();
if (r < 0)
return r;
int FileStore::_omap_clear(coll_t cid, const ghobject_t &hoid,
const SequencerPosition &spos) {
dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
- IndexedPath path;
- int r = lfn_find(cid, hoid, &path);
+ Index index;
+ int r = get_index(cid, &index);
+ if (r < 0)
+ return r;
+
+ assert(NULL != index.index);
+ RWLock::WLocker l((index.index)->access_lock);
+
+ r = lfn_find(hoid, index);
+
if (r < 0)
return r;
r = object_map->clear_keys_header(hoid, &spos);
const map<string, bufferlist> &aset,
const SequencerPosition &spos) {
dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
- IndexedPath path;
- int r = lfn_find(cid, hoid, &path);
+ Index index;
+ int r = get_index(cid, &index);
+ if (r < 0)
+ return r;
+
+ assert(NULL != index.index);
+ RWLock::WLocker l((index.index)->access_lock);
+
+ r = lfn_find(hoid, index);
+
if (r < 0)
return r;
return object_map->set_keys(hoid, aset, &spos);
const set<string> &keys,
const SequencerPosition &spos) {
dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
- IndexedPath path;
- int r = lfn_find(cid, hoid, &path);
+ Index index;
+ int r = get_index(cid, &index);
+ if (r < 0)
+ return r;
+
+ assert(NULL != index.index);
+ RWLock::WLocker l((index.index)->access_lock);
+
+ r = lfn_find(hoid, index);
+
if (r < 0)
return r;
r = object_map->rm_keys(hoid, keys, &spos);
const SequencerPosition &spos)
{
dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
- IndexedPath path;
- int r = lfn_find(cid, hoid, &path);
+ Index index;
+ int r = get_index(cid, &index);
+ if (r < 0)
+ return r;
+
+ assert(NULL != index.index);
+ RWLock::WLocker l((index.index)->access_lock);
+
+ r = lfn_find(hoid, index);
+
if (r < 0)
return r;
return object_map->set_header(hoid, bl, &spos);
if (!r)
r = get_index(dest, &to);
- if (!r)
- r = from->split(rem, bits, to);
+ if (!r) {
+ assert(NULL != from.index);
+ RWLock::WLocker l1((from.index)->access_lock);
+
+ assert(NULL != to.index);
+ RWLock::WLocker l2((to.index)->access_lock);
+
+ r = from->split(rem, bits, to.index);
+ }
_close_replay_guard(cid, spos);
_close_replay_guard(dest, spos);
if (!r)
r = get_index(dest, &to);
- if (!r)
- r = from->split(rem, bits, to);
+ if (!r) {
+ assert(NULL != from.index);
+ RWLock::WLocker l1((from.index)->access_lock);
+
+ assert(NULL != to.index);
+ RWLock::WLocker l2((to.index)->access_lock);
+
+ r = from->split(rem, bits, to.index);
+ }
_close_replay_guard(cid, spos);
_close_replay_guard(dest, spos);