#include <unistd.h>
#include "NewStore.h"
+#include "kv.h"
#include "include/compat.h"
#include "include/stringify.h"
#include "common/errno.h"
#include "common/safe_io.h"
+#include "Allocator.h"
+#include "FreelistManager.h"
#define dout_subsys ceph_subsys_newstore
TODO:
- * alloc hint to set frag size
- * collection_list must flush pending db work
- * rocksdb: use db_paths (db/ and db.bulk/ ?)
- * rocksdb: auto-detect use_fsync option when not xfs or btrfs
- * avoid mtime updates when doing open-by-handle
- * fid xattr backpointer
- * inline first fsync_item in TransContext to void allocation?
- * refcounted fragments (for efficient clone)
+ * superblock, features
+ * statfs reports on block device only
+ * bdev: smarter zeroing
+ * zero overlay in onode?
+ * discard
+ * aio read?
+ * read uses local ioc
+ * refcounted extents (for efficient clone)
+ * overlay does inefficient zeroing on unwritten extent
*/
/*
* Some invariants:
*
- * - The fragment extent referenced by the fragment_t is always
- * defined. It may be zeros (a hole in the underlying fs).
- *
- * - The fragment file may be larger than the fragment_t indicates.
- * If so, the trailing cruft should be ignored and can be safely
- * discarded (see _clean_fid_tail).
- *
- * - The fragment file may be smaller than the fragment_t indicates.
- * The content is defined to be zeros in this case.
- *
- * - All fragments start on a frag_size boundary, and are
- * at most frag_size bytes.
- *
- * - A fragment *may* be shorter than frag_size, even if
- * it isn't at the end of a file.
- *
- * - The fragment map never extends beyond the size of the object.
- *
- * - The fragment_t::offset is (currently) always 0.
+ * - If the end of the object is a partial block, and is not an overlay,
+ * the remainder of that block will always be zeroed. (It has to be written
+ * anyway, so we may as well have written zeros.)
*
*/
const string PREFIX_OVERLAY = "V"; // u64 + offset -> value
const string PREFIX_OMAP = "M"; // u64 + keyname -> value
const string PREFIX_WAL = "L"; // write ahead log
+const string PREFIX_ALLOC = "B"; // block allocator
/*
* object name key structure
return key + 2;
}
-static void _key_encode_u32(uint32_t u, string *key)
-{
- uint32_t bu;
-#ifdef CEPH_BIG_ENDIAN
- bu = u;
-#elif defined(CEPH_LITTLE_ENDIAN)
- bu = swab32(u);
-#else
-# error wtf
-#endif
- key->append((char*)&bu, 4);
-}
-
-static const char *_key_decode_u32(const char *key, uint32_t *pu)
-{
- uint32_t bu;
- memcpy(&bu, key, 4);
-#ifdef CEPH_BIG_ENDIAN
- *pu = bu;
-#elif defined(CEPH_LITTLE_ENDIAN)
- *pu = swab32(bu);
-#else
-# error wtf
-#endif
- return key + 4;
-}
-
-static void _key_encode_u64(uint64_t u, string *key)
-{
- uint64_t bu;
-#ifdef CEPH_BIG_ENDIAN
- bu = u;
-#elif defined(CEPH_LITTLE_ENDIAN)
- bu = swab64(u);
-#else
-# error wtf
-#endif
- key->append((char*)&bu, 8);
-}
-
-static const char *_key_decode_u64(const char *key, uint64_t *pu)
-{
- uint64_t bu;
- memcpy(&bu, key, 8);
-#ifdef CEPH_BIG_ENDIAN
- *pu = bu;
-#elif defined(CEPH_LITTLE_ENDIAN)
- *pu = swab64(bu);
-#else
-# error wtf
-#endif
- return key + 8;
-}
-
static void get_coll_key_range(const coll_t& cid, int bits,
string *temp_start, string *temp_end,
string *start, string *end)
// Onode
+#undef dout_prefix
+#define dout_prefix *_dout << "newstore.onode(" << this << ") "
+
NewStore::Onode::Onode(const ghobject_t& o, const string& k)
: nref(0),
oid(o),
flush_lock("NewStore::Onode::flush_lock") {
}
+void NewStore::Onode::flush()
+{
+ Mutex::Locker l(flush_lock);
+ dout(20) << __func__ << " " << flush_txns << dendl;
+ while (!flush_txns.empty())
+ flush_cond.Wait(flush_lock);
+ dout(20) << __func__ << " done" << dendl;
+}
+
// OnodeHashLRU
#undef dout_prefix
#define dout_prefix *_dout << "newstore(" << path << ") "
+void aio_cb(void *priv, void *priv2)
+{
+ NewStore *store = static_cast<NewStore*>(priv);
+ store->_txc_aio_finish(priv2);
+}
+
NewStore::NewStore(CephContext *cct, const string& path)
: ObjectStore(path),
cct(cct),
db(NULL),
fs(NULL),
+ bdev(NULL),
+ fm(NULL),
+ alloc(NULL),
path_fd(-1),
fsid_fd(-1),
- frag_fd(-1),
- fset_fd(-1),
mounted(false),
coll_lock("NewStore::coll_lock"),
- fid_lock("NewStore::fid_lock"),
nid_lock("NewStore::nid_lock"),
nid_max(0),
throttle_ops(cct, "newstore_max_ops", cct->_conf->newstore_max_ops),
cct->_conf->newstore_wal_thread_suicide_timeout,
&wal_tp),
finisher(cct),
- fsync_tp(cct,
- "NewStore::fsync_tp",
- cct->_conf->newstore_fsync_threads,
- "newstore_fsync_threads"),
- fsync_wq(this,
- cct->_conf->newstore_fsync_thread_timeout,
- cct->_conf->newstore_fsync_thread_suicide_timeout,
- &fsync_tp),
- aio_thread(this),
- aio_stop(false),
- aio_queue(cct->_conf->newstore_aio_max_queue_depth),
kv_sync_thread(this),
kv_lock("NewStore::kv_lock"),
kv_stop(false),
assert(!mounted);
assert(db == NULL);
assert(fsid_fd < 0);
- assert(frag_fd < 0);
}
void NewStore::_init_logger()
fs = NULL;
}
-int NewStore::_open_frag()
+int NewStore::_open_bdev()
{
- assert(frag_fd < 0);
- frag_fd = ::openat(path_fd, "fragments", O_DIRECTORY);
- if (frag_fd < 0) {
- int r = -errno;
- derr << __func__ << " cannot open " << path << "/fragments: "
- << cpp_strerror(r) << dendl;
- return r;
+ assert(bdev == NULL);
+ bdev = new BlockDevice(aio_cb, static_cast<void*>(this));
+ string p = path + "/block";
+ int r = bdev->open(p);
+ if (r < 0) {
+ delete bdev;
+ bdev = NULL;
}
- return 0;
+ return r;
}
-int NewStore::_create_frag()
+void NewStore::_close_bdev()
{
- assert(frag_fd < 0);
- frag_fd = ::openat(path_fd, "fragments", O_DIRECTORY);
- if (frag_fd < 0 && errno == ENOENT) {
- int r = ::mkdirat(path_fd, "fragments", 0755);
- if (r < 0) {
- r = -errno;
- derr << __func__ << " cannot create " << path << "/fragments: "
- << cpp_strerror(r) << dendl;
- return r;
- }
- frag_fd = ::openat(path_fd, "fragments", O_DIRECTORY);
+ assert(bdev);
+ bdev->close();
+ delete bdev;
+ bdev = NULL;
+}
+
+int NewStore::_open_alloc()
+{
+ assert(fm == NULL);
+ assert(alloc == NULL);
+ fm = new FreelistManager();
+ int r = fm->init(db, PREFIX_ALLOC);
+ if (r < 0) {
+ delete fm;
+ fm = NULL;
+ return r;
}
- if (frag_fd < 0) {
- int r = -errno;
- derr << __func__ << " cannot open created " << path << "/fragments: "
- << cpp_strerror(r) << dendl;
+ alloc = Allocator::create("stupid");
+ r = alloc->init(fm);
+ if (r < 0) {
+ delete alloc;
+ alloc = NULL;
+ fm->shutdown();
+ delete fm;
+ fm = NULL;
return r;
}
- return 0;
+ return r;
}
-void NewStore::_close_frag()
+void NewStore::_close_alloc()
{
- if (fset_fd >= 0) {
- VOID_TEMP_FAILURE_RETRY(::close(fset_fd));
- fset_fd = -1;
- }
- VOID_TEMP_FAILURE_RETRY(::close(frag_fd));
- frag_fd = -1;
+ assert(fm);
+ assert(alloc);
+ alloc->shutdown();
+ delete alloc;
+ alloc = NULL;
+ fm->shutdown();
+ delete fm;
+ fm = NULL;
}
int NewStore::_open_fsid(bool create)
return ret;
if (ret > 36)
fsid_str[36] = 0;
+ else
+ fsid_str[ret] = 0;
if (!uuid->parse(fsid_str))
return -EINVAL;
return 0;
}
dout(1) << __func__ << " opened " << g_conf->newstore_backend
<< " path " << path << " options " << options << dendl;
+
+ if (create) {
+ // blow it away
+ dout(1) << __func__ << " wiping by prefix" << dendl;
+ KeyValueDB::Transaction t = db->get_transaction();
+ t->rmkeys_by_prefix(PREFIX_SUPER);
+ t->rmkeys_by_prefix(PREFIX_COLL);
+ t->rmkeys_by_prefix(PREFIX_OBJ);
+ t->rmkeys_by_prefix(PREFIX_OVERLAY);
+ t->rmkeys_by_prefix(PREFIX_OMAP);
+ t->rmkeys_by_prefix(PREFIX_WAL);
+ t->rmkeys_by_prefix(PREFIX_ALLOC);
+ db->submit_transaction_sync(t);
+ }
return 0;
}
db = NULL;
}
-int NewStore::_aio_start()
-{
- if (g_conf->newstore_aio) {
- dout(10) << __func__ << dendl;
- int r = aio_queue.init();
- if (r < 0)
- return r;
- aio_thread.create();
- }
- return 0;
-}
-
-void NewStore::_aio_stop()
-{
- if (g_conf->newstore_aio) {
- dout(10) << __func__ << dendl;
- aio_stop = true;
- aio_thread.join();
- aio_stop = false;
- aio_queue.shutdown();
- }
-}
-
-int NewStore::_open_collections()
+int NewStore::_open_collections(int *errors)
{
KeyValueDB::Iterator it = db->get_iterator(PREFIX_COLL);
for (it->upper_bound(string());
dout(20) << __func__ << " opened " << cid << dendl;
coll_map[cid] = c;
} else {
- dout(20) << __func__ << " unrecognized collection " << it->key() << dendl;
+ derr << __func__ << " unrecognized collection " << it->key() << dendl;
+ if (errors)
+ (*errors)++;
}
}
return 0;
dout(1) << __func__ << " fsid is already set to " << fsid << dendl;
}
- r = _create_frag();
+ // block device
+ if (g_conf->newstore_block_path.length()) {
+ int r = ::symlinkat(g_conf->newstore_block_path.c_str(), path_fd, "block");
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " failed to create block symlink to "
+ << g_conf->newstore_block_path << ": " << cpp_strerror(r) << dendl;
+ goto out_close_fsid;
+ }
+ } else if (g_conf->newstore_block_size) {
+ struct stat st;
+ int r = ::fstatat(path_fd, "block", &st, 0);
+ if (r < 0)
+ r = -errno;
+ if (r == -ENOENT) {
+ int fd = ::openat(path_fd, "block", O_CREAT|O_RDWR, 0644);
+ if (fd < 0) {
+ int r = -errno;
+ derr << __func__ << " faile to create block file: " << cpp_strerror(r)
+ << dendl;
+ goto out_close_fsid;
+ }
+ int r = ::ftruncate(fd, g_conf->newstore_block_size);
+ assert(r == 0);
+ dout(1) << __func__ << " created block file with size "
+ << pretty_si_t(g_conf->newstore_block_size) << "B" << dendl;
+ }
+ }
+
+ r = _open_bdev();
if (r < 0)
goto out_close_fsid;
r = _open_db(true);
if (r < 0)
- goto out_close_frag;
+ goto out_close_bdev;
+
+ r = _open_alloc();
+ if (r < 0)
+ goto out_close_db;
+
+ // initialize freespace
+ {
+ dout(20) << __func__ << " initializing freespace" << dendl;
+ KeyValueDB::Transaction t = db->get_transaction();
+ fm->release(0, bdev->get_size(), t);
+ db->submit_transaction_sync(t);
+ }
// FIXME: superblock
dout(10) << __func__ << " success" << dendl;
r = 0;
- _close_db();
- out_close_frag:
- _close_frag();
+ _close_alloc();
+ out_close_db:
+ _close_db();
+ out_close_bdev:
+ _close_bdev();
out_close_fsid:
_close_fsid();
out_path_fd:
{
dout(1) << __func__ << " path " << path << dendl;
+ if (g_conf->newstore_fsck_on_mount) {
+ int rc = fsck();
+ if (rc < 0)
+ return rc;
+ }
+
int r = _open_path();
if (r < 0)
return r;
if (r < 0)
goto out_fsid;
- r = _open_frag();
+ r = _open_bdev();
if (r < 0)
goto out_fsid;
- // FIXME: superblock, features
-
r = _open_db(false);
if (r < 0)
- goto out_frag;
+ goto out_bdev;
- r = _recover_next_fid();
+ r = _open_alloc();
if (r < 0)
goto out_db;
r = _recover_next_nid();
if (r < 0)
- goto out_db;
+ goto out_alloc;
r = _open_collections();
if (r < 0)
- goto out_db;
-
- r = _aio_start();
- if (r < 0)
- goto out_db;
-
- r = _wal_replay();
- if (r < 0)
- goto out_aio;
+ goto out_alloc;
finisher.start();
- fsync_tp.start();
wal_tp.start();
kv_sync_thread.create();
+ r = _wal_replay();
+ if (r < 0)
+ goto out_stop;
+
mounted = true;
return 0;
- out_aio:
- _aio_stop();
+ out_stop:
+ _kv_stop();
+ wal_tp.stop();
+ finisher.wait_for_empty();
+ finisher.stop();
+ out_alloc:
+ _close_alloc();
out_db:
_close_db();
- out_frag:
- _close_frag();
+ out_bdev:
+ _close_bdev();
out_fsid:
_close_fsid();
out_path:
_sync();
_reap_collections();
- dout(20) << __func__ << " stopping fsync_wq" << dendl;
- fsync_tp.stop();
- dout(20) << __func__ << " stopping aio" << dendl;
- _aio_stop();
dout(20) << __func__ << " stopping kv thread" << dendl;
_kv_stop();
dout(20) << __func__ << " draining wal_wq" << dendl;
dout(20) << __func__ << " closing" << dendl;
mounted = false;
- if (fset_fd >= 0)
- VOID_TEMP_FAILURE_RETRY(::close(fset_fd));
+ _close_alloc();
_close_db();
- _close_frag();
+ _close_bdev();
_close_fsid();
_close_path();
return 0;
}
+int NewStore::fsck()
+{
+ dout(1) << __func__ << dendl;
+ int errors = 0;
+ set<uint64_t> used_nids;
+ set<uint64_t> used_omap_head;
+ interval_set<uint64_t> used_blocks;
+ KeyValueDB::Iterator it;
+
+ int r = _open_path();
+ if (r < 0)
+ return r;
+ r = _open_fsid(false);
+ if (r < 0)
+ goto out_path;
+
+ r = _read_fsid(&fsid);
+ if (r < 0)
+ goto out_fsid;
+
+ r = _lock_fsid();
+ if (r < 0)
+ goto out_fsid;
+
+ r = _open_bdev();
+ if (r < 0)
+ goto out_fsid;
+
+ r = _open_db(false);
+ if (r < 0)
+ goto out_bdev;
+
+ r = _open_alloc();
+ if (r < 0)
+ goto out_db;
+
+ r = _open_collections(&errors);
+ if (r < 0)
+ goto out_alloc;
+
+ // walk collections, objects
+ for (ceph::unordered_map<coll_t, CollectionRef>::iterator p = coll_map.begin();
+ p != coll_map.end() && !errors;
+ ++p) {
+ dout(1) << __func__ << " collection " << p->first << dendl;
+ CollectionRef c = _get_collection(p->first);
+ RWLock::RLocker l(c->lock);
+ ghobject_t pos;
+ while (!errors) {
+ vector<ghobject_t> ols;
+ int r = collection_list(p->first, pos, ghobject_t::get_max(), true,
+ 100, &ols, &pos);
+ if (r < 0) {
+ ++errors;
+ break;
+ }
+ if (ols.empty()) {
+ break;
+ }
+ for (auto oid : ols) {
+ dout(10) << __func__ << " " << oid << dendl;
+ OnodeRef o = c->get_onode(oid, false);
+ if (!o || !o->exists) {
+ ++errors;
+ break;
+ }
+ if (o->onode.nid) {
+ if (used_nids.count(o->onode.nid)) {
+ derr << " " << oid << " nid " << o->onode.nid << " already in use"
+ << dendl;
+ ++errors;
+ break;
+ }
+ used_nids.insert(o->onode.nid);
+ }
+ // blocks
+ for (auto b : o->onode.block_map) {
+ if (used_blocks.contains(b.second.offset, b.second.length)) {
+ derr << " " << oid << " extent " << b.first << ": " << b.second
+ << " already allocated" << dendl;
+ ++errors;
+ continue;
+ }
+ used_blocks.insert(b.second.offset, b.second.length);
+ if (b.second.end() > bdev->get_size()) {
+ derr << " " << oid << " extent " << b.first << ": " << b.second
+ << " past end of block device" << dendl;
+ ++errors;
+ }
+ }
+ // overlays
+ set<string> overlay_keys;
+ map<uint64_t,int> refs;
+ for (auto v : o->onode.overlay_map) {
+ if (v.first + v.second.length > o->onode.size) {
+ derr << " " << oid << " overlay " << v.first << " " << v.second
+ << " extends past end of object" << dendl;
+ ++errors;
+ }
+ if (v.second.key > o->onode.last_overlay_key) {
+ derr << " " << oid << " overlay " << v.first << " " << v.second
+ << " is > last_overlay_key " << o->onode.last_overlay_key
+ << dendl;
+ ++errors;
+ }
+ ++refs[v.second.key];
+ string key;
+ bufferlist val;
+ get_overlay_key(o->onode.nid, v.second.key, &key);
+ overlay_keys.insert(key);
+ int r = db->get(PREFIX_OVERLAY, key, &val);
+ if (r < 0) {
+ derr << " " << oid << " overlay " << v.first << " " << v.second
+ << " failed to fetch: " << cpp_strerror(r) << dendl;
+ ++errors;
+ }
+ if (val.length() < v.second.value_offset + v.second.length) {
+ derr << " " << oid << " overlay " << v.first << " " << v.second
+ << " too short, " << val.length() << dendl;
+ ++errors;
+ }
+ }
+ for (auto vr : o->onode.overlay_refs) {
+ if (refs[vr.first] != vr.second) {
+ derr << " " << oid << " overlay key " << vr.first
+ << " says " << vr.second << " refs but we have "
+ << refs[vr.first] << dendl;
+ ++errors;
+ }
+ refs.erase(vr.first);
+ }
+ for (auto p : refs) {
+ if (p.second > 1) {
+ derr << " " << oid << " overlay key " << p.first
+ << " has " << p.second << " refs but they are not recorded"
+ << dendl;
+ ++errors;
+ }
+ }
+ do {
+ string start;
+ get_overlay_key(o->onode.nid, 0, &start);
+ KeyValueDB::Iterator it = db->get_iterator(PREFIX_OVERLAY);
+ if (!it)
+ break;
+ for (it->lower_bound(start); it->valid(); it->next()) {
+ string k = it->key();
+ const char *p = k.c_str();
+ uint64_t nid;
+ p = _key_decode_u64(p, &nid);
+ if (nid != o->onode.nid)
+ break;
+ if (!overlay_keys.count(k)) {
+ derr << " " << oid << " has stray overlay kv pair for "
+ << k << dendl;
+ ++errors;
+ }
+ }
+ } while (false);
+ // omap
+ while (o->onode.omap_head) {
+ if (used_omap_head.count(o->onode.omap_head)) {
+ derr << " " << oid << " omap_head " << o->onode.omap_head
+ << " already in use" << dendl;
+ ++errors;
+ break;
+ }
+ used_omap_head.insert(o->onode.omap_head);
+ // hrm, scan actual key/value pairs?
+ KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
+ if (!it)
+ break;
+ string head, tail;
+ get_omap_header(o->onode.omap_head, &head);
+ get_omap_tail(o->onode.omap_head, &tail);
+ it->lower_bound(head);
+ while (it->valid()) {
+ if (it->key() == head) {
+ dout(30) << __func__ << " got header" << dendl;
+ } else if (it->key() >= tail) {
+ dout(30) << __func__ << " reached tail" << dendl;
+ break;
+ } else {
+ string user_key;
+ decode_omap_key(it->key(), &user_key);
+ dout(30) << __func__
+ << " got " << pretty_binary_string(it->key())
+ << " -> " << user_key << dendl;
+ assert(it->key() < tail);
+ }
+ it->next();
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ dout(1) << __func__ << " checking for stray objects" << dendl;
+ it = db->get_iterator(PREFIX_OBJ);
+ if (it) {
+ CollectionRef c;
+ for (it->lower_bound(string()); it->valid(); it->next()) {
+ ghobject_t oid;
+ int r = get_key_object(it->key(), &oid);
+ if (r < 0) {
+ dout(30) << __func__ << " bad object key "
+ << pretty_binary_string(it->key()) << dendl;
+ ++errors;
+ continue;
+ }
+ if (!c || !c->contains(oid)) {
+ c = NULL;
+ for (ceph::unordered_map<coll_t, CollectionRef>::iterator p =
+ coll_map.begin();
+ p != coll_map.end() && !errors;
+ ++p) {
+ if (p->second->contains(oid)) {
+ c = p->second;
+ break;
+ }
+ }
+ if (!c) {
+ dout(30) << __func__ << " stray object " << oid
+ << " not owned by any collection" << dendl;
+ ++errors;
+ continue;
+ }
+ }
+ }
+ }
+
+ dout(1) << __func__ << " checking for stray overlay data" << dendl;
+ it = db->get_iterator(PREFIX_OVERLAY);
+ if (it) {
+ for (it->lower_bound(string()); it->valid(); it->next()) {
+ string key = it->key();
+ const char *p = key.c_str();
+ uint64_t nid;
+ p = _key_decode_u64(p, &nid);
+ if (used_nids.count(nid) == 0) {
+ derr << __func__ << " found stray overlay data on nid " << nid << dendl;
+ ++errors;
+ }
+ }
+ }
+
+ dout(1) << __func__ << " checking for stray omap data" << dendl;
+ it = db->get_iterator(PREFIX_OMAP);
+ if (it) {
+ for (it->lower_bound(string()); it->valid(); it->next()) {
+ string key = it->key();
+ const char *p = key.c_str();
+ uint64_t omap_head;
+ p = _key_decode_u64(p, &omap_head);
+ if (used_omap_head.count(omap_head) == 0) {
+ derr << __func__ << " found stray omap data on omap_head " << omap_head
+ << dendl;
+ ++errors;
+ }
+ }
+ }
+
+ dout(1) << __func__ << " checking freelist vs allocated" << dendl;
+ {
+ const map<uint64_t,uint64_t>& free = fm->get_freelist();
+ for (map<uint64_t,uint64_t>::const_iterator p = free.begin();
+ p != free.end(); ++p) {
+ if (used_blocks.contains(p->first, p->second)) {
+ derr << __func__ << " free extent " << p->first << "~" << p->second
+ << " intersects allocated blocks" << dendl;
+ ++errors;
+ continue;
+ }
+ used_blocks.insert(p->first, p->second);
+ }
+ if (!used_blocks.contains(0, bdev->get_size())) {
+ derr << __func__ << " leaked some space; free+used = "
+ << used_blocks
+ << " != expected 0~" << bdev->get_size()
+ << dendl;
+ ++errors;
+ }
+ }
+
+ out_alloc:
+ _close_alloc();
+ out_db:
+ it.reset(); // before db is closed
+ _close_db();
+ out_bdev:
+ _close_bdev();
+ out_fsid:
+ _close_fsid();
+ out_path:
+ _close_path();
+
+ dout(1) << __func__ << " finish with " << errors << " errors" << dendl;
+ return errors;
+}
+
void NewStore::_sync()
{
dout(10) << __func__ << dendl;
- dout(20) << " flushing fsync wq" << dendl;
- fsync_wq.flush();
+ // flush aios in flght
+ bdev->flush();
kv_lock.Lock();
while (!kv_committing.empty() ||
int NewStore::statfs(struct statfs *buf)
{
- if (::statfs(path.c_str(), buf) < 0) {
+ memset(buf, 0, sizeof(*buf));
+ buf->f_blocks = bdev->get_size() / bdev->get_block_size();
+ buf->f_bsize = bdev->get_block_size();
+ buf->f_bfree = fm->get_total_free() / bdev->get_block_size();
+ buf->f_bavail = buf->f_bfree;
+
+ /*
+ struct statfs fs;
+ if (::statfs(path.c_str(), &fs) < 0) {
int r = -errno;
assert(!g_conf->newstore_fail_eio || r != -EIO);
return r;
}
+ */
+
return 0;
}
bufferlist& bl,
uint32_t op_flags)
{
- map<uint64_t,fragment_t>::iterator fp, fend;
+ map<uint64_t,extent_t>::iterator bp, bend;
map<uint64_t,overlay_t>::iterator op, oend;
+ uint64_t block_size = bdev->get_block_size();
int r;
- int fd = -1;
- fid_t cur_fid;
+ IOContext ioc(NULL); // FIXME?
dout(20) << __func__ << " " << offset << "~" << length << " size "
<< o->onode.size << dendl;
+ bl.clear();
if (offset > o->onode.size) {
r = 0;
r = 0;
// loop over overlays and data fragments. overlays take precedence.
- fend = o->onode.data_map.end();
- fp = o->onode.data_map.lower_bound(offset);
- if (fp != o->onode.data_map.begin()) {
- --fp;
+ bend = o->onode.block_map.end();
+ bp = o->onode.block_map.lower_bound(offset);
+ if (bp != o->onode.block_map.begin()) {
+ --bp;
}
oend = o->onode.overlay_map.end();
op = o->onode.overlay_map.lower_bound(offset);
++op;
continue;
}
- if (fp != fend && fp->first + fp->second.length <= offset) {
- dout(30) << __func__ << " skip frag " << fp->first << "~" << fp->second
+ if (bp != bend && bp->first + bp->second.length <= offset) {
+ dout(30) << __func__ << " skip frag " << bp->first << "~" << bp->second
<< dendl;
- ++fp;
+ ++bp;
continue;
}
x_len = op->first - offset;
}
- // frag?
- if (fp != fend && fp->first <= offset) {
- if (fp->second.fid != cur_fid) {
- cur_fid = fp->second.fid;
- if (fd >= 0) {
- VOID_TEMP_FAILURE_RETRY(::close(fd));
- }
- fd = _open_fid(cur_fid, O_RDONLY);
- if (fd < 0) {
- r = fd;
+ // extent?
+ if (bp != bend && bp->first <= offset) {
+ uint64_t x_off = offset - bp->first;
+ x_len = MIN(x_len, bp->second.length);
+ if (!bp->second.has_flag(extent_t::FLAG_UNWRITTEN)) {
+ dout(30) << __func__ << " data " << bp->first << ": " << bp->second
+ << " use " << x_off << "~" << x_len
+ << " final offset " << x_off + bp->second.offset
+ << dendl;
+ uint64_t front_extra = x_off % block_size;
+ uint64_t r_off = x_off - front_extra;
+ uint64_t r_len = ROUND_UP_TO(x_len + front_extra, block_size);
+ dout(30) << __func__ << " reading " << r_off << "~" << r_len << dendl;
+ bufferlist t;
+ r = bdev->read(r_off + bp->second.offset, r_len, &t, &ioc);
+ if (r < 0) {
goto out;
}
- }
- uint64_t x_off = offset - fp->first - fp->second.offset;
- x_len = MIN(x_len, fp->second.length - x_off);
- dout(30) << __func__ << " data " << fp->first << " " << fp->second
- << " use " << x_off << "~" << x_len
- << " fid " << cur_fid << " offset " << x_off + fp->second.offset
- << dendl;
- r = ::lseek64(fd, x_off, SEEK_SET);
- if (r < 0) {
- r = -errno;
- goto out;
- }
- bufferlist t;
- r = t.read_fd(fd, x_len);
- if (r < 0) {
- goto out;
- }
- bl.claim_append(t);
- if ((unsigned)r < x_len) {
- dout(10) << __func__ << " short read " << r << " < " << x_len
- << " from " << cur_fid << dendl;
- bufferptr z(x_len - r);
- z.zero();
- bl.append(z);
+ bufferlist u;
+ u.substr_of(t, front_extra, x_len);
+ bl.claim_append(u);
+ } else {
+ // unwritten (zero) extent
+ dout(30) << __func__ << " data " << bp->first << ": " << bp->second
+ << ", use " << x_len << " zeros" << dendl;
+ bufferptr bp(x_len);
+ bp.zero();
+ bl.push_back(bp);
}
offset += x_len;
length -= x_len;
- if (x_off + x_len == fp->second.length) {
- ++fp;
+ if (x_off + x_len == bp->second.length) {
+ ++bp;
}
continue;
}
r = bl.length();
out:
- if (fd >= 0) {
- VOID_TEMP_FAILURE_RETRY(::close(fd));
- }
return r;
}
dout(20) << __func__ << " " << offset << "~" << len << " size "
<< o->onode.size << dendl;
- map<uint64_t,fragment_t>::iterator fp, fend;
+ map<uint64_t,extent_t>::iterator bp, bend;
map<uint64_t,overlay_t>::iterator op, oend;
// loop over overlays and data fragments. overlays take precedence.
- fend = o->onode.data_map.end();
- fp = o->onode.data_map.lower_bound(offset);
- if (fp != o->onode.data_map.begin()) {
- --fp;
+ bend = o->onode.block_map.end();
+ bp = o->onode.block_map.lower_bound(offset);
+ if (bp != o->onode.block_map.begin()) {
+ --bp;
}
oend = o->onode.overlay_map.end();
op = o->onode.overlay_map.lower_bound(offset);
++op;
continue;
}
- if (fp != fend && fp->first + fp->second.length <= offset) {
- ++fp;
+ if (bp != bend && bp->first + bp->second.length <= offset) {
+ ++bp;
continue;
}
if (op != oend && op->first <= offset) {
uint64_t x_len = MIN(op->first + op->second.length - offset, len);
//m[offset] = x_len;
- dout(30) << __func__ << " get overlay, off = " << offset << " len=" << x_len << dendl;
+ dout(30) << __func__ << " overlay " << offset << "~" << x_len << dendl;
len -= x_len;
offset += x_len;
++op;
x_len = op->first - offset;
}
- // frag?
- if (fp != fend && fp->first <= offset) {
- uint64_t x_off = offset - fp->first - fp->second.offset;
- x_len = MIN(x_len, fp->second.length - x_off);
- //m[offset] = x_len;
- dout(30) << __func__ << " get frag, off = " << offset << " len=" << x_len << dendl;
+ // extent?
+ if (bp != bend && bp->first <= offset) {
+ uint64_t x_off = offset - bp->first;
+ x_len = MIN(x_len, bp->second.length - x_off);
+ dout(30) << __func__ << " extent " << offset << "~" << x_len << dendl;
len -= x_len;
offset += x_len;
- if (x_off + x_len == fp->second.length)
- ++fp;
+ if (x_off + x_len == bp->second.length)
+ ++bp;
continue;
}
// we are seeing a hole, time to add an entry to fiemap.
m[start] = offset - start;
- dout(20) << __func__ << " get fiemap entry, off = " << start << " len=" << m[start] << dendl;
+ dout(20) << __func__ << " out " << start << "~" << m[start] << dendl;
offset += x_len;
start = offset;
len -= x_len;
continue;
}
- //add tailing
+ // add tailing
if (offset - start != 0) {
m[start] = offset - start;
- dout(20) << __func__ << " get fiemap entry, off = " << start << " len=" << m[start] << dendl;
+ dout(20) << __func__ << " out " << start << "~" << m[start] << dendl;
}
::encode(m, bl);
- dout(20) << __func__ << " " << offset << "~" << len << " size = 0 (" << m << ")" << dendl;
+ dout(20) << __func__ << " " << offset << "~" << len
+ << " size = 0 (" << m << ")" << dendl;
return 0;
}
o->onode.nid = ++nid_last;
dout(20) << __func__ << " " << o->onode.nid << dendl;
if (nid_last > nid_max) {
-#warning fixme this could race?
nid_max += g_conf->newstore_nid_prealloc;
bufferlist bl;
::encode(nid_max, bl);
}
}
-int NewStore::_recover_next_fid()
+NewStore::TransContext *NewStore::_txc_create(OpSequencer *osr)
{
- bufferlist bl;
- db->get(PREFIX_SUPER, "fid_max", &bl);
- try {
- ::decode(fid_max, bl);
- } catch (buffer::error& e) {
- }
- dout(1) << __func__ << " old fid_max " << fid_max << dendl;
- fid_last = fid_max;
-
- if (fid_last.fset > 0) {
- char s[32];
- snprintf(s, sizeof(s), "%u", fid_last.fset);
- assert(fset_fd < 0);
- fset_fd = ::openat(frag_fd, s, O_DIRECTORY, 0644);
- if (fset_fd < 0) {
- int r = -errno;
- derr << __func__ << " cannot open created " << path << "/fragments/"
- << s << ": " << cpp_strerror(r) << dendl;
- return r;
- }
- }
-
- return 0;
+ TransContext *txc = new TransContext(osr);
+ txc->t = db->get_transaction();
+ osr->queue_new(txc);
+ dout(20) << __func__ << " osr " << osr << " = " << txc << dendl;
+ return txc;
}
-int NewStore::_open_fid(fid_t fid, unsigned flags)
+void NewStore::_txc_release(TransContext *txc, uint64_t offset, uint64_t length)
{
- if (fid.handle.length() && g_conf->newstore_open_by_handle) {
- int fd = fs->open_handle(path_fd, fid.handle, flags);
- if (fd >= 0) {
- dout(30) << __func__ << " " << fid << " = " << fd
- << " (open by handle)" << dendl;
- return fd;
- }
- int err = -errno;
- dout(30) << __func__ << " " << fid << " = " << cpp_strerror(err)
- << " (with open by handle, falling back to file name)" << dendl;
- }
-
- char fn[32];
- snprintf(fn, sizeof(fn), "%u/%u", fid.fset, fid.fno);
- int fd = ::openat(frag_fd, fn, flags);
- if (fd < 0) {
- int r = -errno;
- derr << __func__ << " on " << fid << ": " << cpp_strerror(r) << dendl;
- return r;
- }
- dout(30) << __func__ << " " << fid << " = " << fd << dendl;
- return fd;
+ txc->released.insert(offset, length);
}
-int NewStore::_create_fid(TransContext *txc, OnodeRef o,
- fid_t *fid, unsigned flags)
+void NewStore::_txc_state_proc(TransContext *txc)
{
- {
- Mutex::Locker l(fid_lock);
- if (fid_last.fset > 0 &&
- fid_last.fno > 0 &&
- fid_last.fset == fid_max.fset &&
- fid_last.fno < g_conf->newstore_max_dir_size) {
- ++fid_last.fno;
- if (fid_last.fno >= fid_max.fno) {
- // raise fid_max, same fset, capping to max_dir_size
- fid_max.fno = min(fid_max.fno + g_conf->newstore_fid_prealloc,
- g_conf->newstore_max_dir_size);
- assert(fid_max.fno >= fid_last.fno);
- bufferlist bl;
- ::encode(fid_max, bl);
- txc->t->set(PREFIX_SUPER, "fid_max", bl);
- dout(10) << __func__ << " fid_max now " << fid_max << dendl;
- }
- } else {
- // new fset
- ++fid_last.fset;
- fid_last.fno = 1;
- dout(10) << __func__ << " creating " << fid_last.fset << dendl;
- char s[32];
- snprintf(s, sizeof(s), "%u", fid_last.fset);
- int r = ::mkdirat(frag_fd, s, 0755);
- if (r < 0) {
- r = -errno;
- derr << __func__ << " cannot create " << path << "/fragments/"
- << s << ": " << cpp_strerror(r) << dendl;
- return r;
- }
- if (fset_fd >= 0)
- VOID_TEMP_FAILURE_RETRY(::close(fset_fd));
- fset_fd = ::openat(frag_fd, s, O_DIRECTORY, 0644);
- if (fset_fd < 0) {
- r = -errno;
- derr << __func__ << " cannot open created " << path << "/fragments/"
- << s << ": " << cpp_strerror(r) << dendl;
- }
-
- fid_max = fid_last;
- fid_max.fno = g_conf->newstore_fid_prealloc;
- bufferlist bl;
- ::encode(fid_max, bl);
- txc->t->set(PREFIX_SUPER, "fid_max", bl);
- dout(10) << __func__ << " fid_max now " << fid_max << dendl;
- }
- *fid = fid_last;
- }
-
- dout(10) << __func__ << " " << fid_last << dendl;
- char s[32];
- snprintf(s, sizeof(s), "%u", fid->fno);
- int fd = ::openat(fset_fd, s, flags | O_CREAT, 0644);
- if (fd < 0) {
- int r = -errno;
- derr << __func__ << " cannot create " << path << "/fragments/"
- << *fid << ": " << cpp_strerror(r) << dendl;
- return r;
- }
-
- if (o->onode.expected_object_size) {
- unsigned hint = MIN(o->onode.expected_object_size,
- o->onode.frag_size);
- dout(20) << __func__ << " set alloc hint to " << hint << dendl;
- fs->set_alloc_hint(fd, hint);
- }
-
- if (g_conf->newstore_open_by_handle) {
- int r = fs->get_handle(fd, &fid->handle);
- if (r < 0) {
- dout(30) << __func__ << " get_handle got " << cpp_strerror(r) << dendl;
- } else {
- dout(30) << __func__ << " got handle: ";
- bufferlist bl;
- bl.append(fid->handle);
- bl.hexdump(*_dout);
- *_dout << dendl;
- }
- }
-
- dout(30) << __func__ << " " << *fid << " = " << fd << dendl;
- return fd;
-}
-
-int NewStore::_remove_fid(fid_t fid)
-{
- char fn[32];
- snprintf(fn, sizeof(fn), "%u/%u", fid.fset, fid.fno);
- int r = ::unlinkat(frag_fd, fn, 0);
- if (r < 0)
- return -errno;
- return 0;
-}
-
-NewStore::TransContext *NewStore::_txc_create(OpSequencer *osr)
-{
- TransContext *txc = new TransContext(osr);
- txc->t = db->get_transaction();
- osr->queue_new(txc);
- dout(20) << __func__ << " osr " << osr << " = " << txc << dendl;
- return txc;
-}
-
-void NewStore::_txc_state_proc(TransContext *txc)
-{
- while (true) {
- dout(10) << __func__ << " txc " << txc
- << " " << txc->get_state_name() << dendl;
- switch (txc->state) {
- case TransContext::STATE_PREPARE:
- if (!txc->pending_aios.empty()) {
- txc->state = TransContext::STATE_AIO_WAIT;
- _txc_aio_submit(txc);
- return;
+ while (true) {
+ dout(10) << __func__ << " txc " << txc
+ << " " << txc->get_state_name() << dendl;
+ switch (txc->state) {
+ case TransContext::STATE_PREPARE:
+ if (txc->ioc.has_aios()) {
+ txc->state = TransContext::STATE_AIO_WAIT;
+ _txc_aio_submit(txc);
+ return;
}
// ** fall-thru **
case TransContext::STATE_AIO_WAIT:
- if (!txc->sync_items.empty()) {
- txc->state = TransContext::STATE_FSYNC_WAIT;
- if (!g_conf->newstore_sync_io) {
- _txc_queue_fsync(txc);
- return;
- }
- _txc_do_sync_fsync(txc);
- }
_txc_finish_io(txc); // may trigger blocked txc's too
return;
break;
case TransContext::STATE_WAL_APPLYING:
- if (!txc->pending_aios.empty()) {
+ if (txc->ioc.has_aios()) {
txc->state = TransContext::STATE_WAL_AIO_WAIT;
_txc_aio_submit(txc);
return;
}
}
-void NewStore::_txc_process_fsync(fsync_item *i)
-{
- dout(20) << __func__ << " txc " << i->txc << dendl;
- int r = ::fdatasync(i->fd);
- if (r < 0) {
- r = -errno;
- derr << __func__ << " error from fdatasync on " << i->fd
- << " txc " << i->txc
- << ": " << cpp_strerror(r) << dendl;
- assert(0 == "error from fdatasync");
- }
- VOID_TEMP_FAILURE_RETRY(::close(i->fd));
- if (i->txc->finish_fsync()) {
- _txc_finish_io(i->txc);
- }
- dout(20) << __func__ << " txc " << i->txc << " done" << dendl;
-}
-
void NewStore::_txc_finish_io(TransContext *txc)
{
dout(20) << __func__ << " " << txc << dendl;
/*
* we need to preserve the order of kv transactions,
- * even though fsyncs will complete in any order.
+ * even though aio will complete in any order.
*/
OpSequencer *osr = txc->osr.get();
return 0;
}
-void NewStore::_txc_queue_fsync(TransContext *txc)
-{
- dout(20) << __func__ << " txc " << txc << dendl;
- fsync_wq.lock();
- for (list<fsync_item>::iterator p = txc->sync_items.begin();
- p != txc->sync_items.end();
- ++p) {
- fsync_wq._enqueue(&*p);
- fsync_wq._wake();
- }
- fsync_wq.unlock();
-}
-
-void NewStore::_txc_do_sync_fsync(TransContext *txc)
-{
- dout(20) << __func__ << " txc " << txc << dendl;
- for (list<fsync_item>::iterator p = txc->sync_items.begin();
- p != txc->sync_items.end(); ++p) {
- dout(30) << __func__ << " fsync " << p->fd << dendl;
- int r = ::fdatasync(p->fd);
- if (r < 0) {
- r = -errno;
- derr << __func__ << " fsync: " << cpp_strerror(r) << dendl;
- assert(0 == "fsync error");
- }
- VOID_TEMP_FAILURE_RETRY(::close(p->fd));
- }
-}
-
void NewStore::_txc_finish_kv(TransContext *txc)
{
dout(20) << __func__ << " txc " << txc << dendl;
osr->q.pop_front();
delete txc;
osr->qcond.Signal();
+ if (osr->q.empty())
+ dout(20) << __func__ << " osr " << osr << " q now empty" << dendl;
}
}
-void NewStore::_aio_thread()
-{
- dout(10) << __func__ << " start" << dendl;
- while (!aio_stop) {
- dout(40) << __func__ << " polling" << dendl;
- int max = 16;
- FS::aio_t *aio[max];
- int r = aio_queue.get_next_completed(g_conf->newstore_aio_poll_ms,
- aio, max);
- if (r < 0) {
- derr << __func__ << " got " << cpp_strerror(r) << dendl;
- }
- if (r > 0) {
- dout(30) << __func__ << " got " << r << " completed aios" << dendl;
- for (int i = 0; i < r; ++i) {
- TransContext *txc = static_cast<TransContext*>(aio[i]->priv);
- int left = txc->num_aio.dec();
- dout(10) << __func__ << " finished aio " << aio[i] << " txc " << txc
- << " state " << txc->get_state_name() << ", "
- << left << " aios left" << dendl;
- VOID_TEMP_FAILURE_RETRY(::close(aio[i]->fd));
- if (left == 0) {
- _txc_state_proc(txc);
- }
- }
- }
- }
- dout(10) << __func__ << " end" << dendl;
-}
-
void NewStore::_kv_sync_thread()
{
dout(10) << __func__ << " start" << dendl;
utime_t start = ceph_clock_now(NULL);
kv_lock.Unlock();
+ dout(30) << __func__ << " committing txc " << kv_committing << dendl;
+ dout(30) << __func__ << " wal_cleaning txc " << wal_cleaning << dendl;
+
+ // one transaction to force a sync
+ KeyValueDB::Transaction t = db->get_transaction();
+
+ // allocations. consolidate before submitting to freelist so that
+ // we avoid redundant kv ops.
+ interval_set<uint64_t> allocated, released;
+ for (std::deque<TransContext *>::iterator it = kv_committing.begin();
+ it != kv_committing.end();
+ ++it) {
+ TransContext *txc = *it;
+ allocated.insert(txc->allocated);
+ if (txc->wal_txn) {
+ dout(20) << __func__ << " txc " << txc
+ << " allocated " << txc->allocated
+ << " (will release " << txc->released << " after wal)"
+ << dendl;
+ txc->wal_txn->released.swap(txc->released);
+ assert(txc->released.empty());
+ } else {
+ dout(20) << __func__ << " txc " << *it
+ << " allocated " << txc->allocated
+ << " released " << txc->released
+ << dendl;
+ released.insert((*it)->released);
+ }
+ }
+ for (std::deque<TransContext *>::iterator it = wal_cleaning.begin();
+ it != wal_cleaning.end();
+ ++it) {
+ TransContext *txc = *it;
+ if (!txc->wal_txn->released.empty()) {
+ dout(20) << __func__ << " txc " << txc
+ << " (post-wal) released " << txc->wal_txn->released
+ << dendl;
+ released.insert(txc->wal_txn->released);
+ }
+ }
+ for (interval_set<uint64_t>::iterator p = allocated.begin();
+ p != allocated.end();
+ ++p) {
+ dout(20) << __func__ << " alloc " << p.get_start() << "~" << p.get_len()
+ << dendl;
+ fm->allocate(p.get_start(), p.get_len(), t);
+ }
+ for (interval_set<uint64_t>::iterator p = released.begin();
+ p != released.end();
+ ++p) {
+ dout(20) << __func__ << " release " << p.get_start()
+ << "~" << p.get_len() << dendl;
+ fm->release(p.get_start(), p.get_len(), t);
+ alloc->release(p.get_start(), p.get_len());
+ }
+
+ alloc->commit_start();
+
+ // flush/barrier on block device
+ bdev->flush();
+
if (!g_conf->newstore_sync_submit_transaction) {
for (std::deque<TransContext *>::iterator it = kv_committing.begin();
it != kv_committing.end();
}
}
- // one transaction to force a sync. clean up wal keys while we
- // are at it.
- KeyValueDB::Transaction txc_cleanup_sync = db->get_transaction();
+ // cleanup sync wal keys
for (std::deque<TransContext *>::iterator it = wal_cleaning.begin();
it != wal_cleaning.end();
++it) {
wal_transaction_t& wt =*(*it)->wal_txn;
// cleanup the data in overlays
for (list<wal_op_t>::iterator p = wt.ops.begin(); p != wt.ops.end(); ++p) {
- for (vector<overlay_t>::iterator q = p->overlays.begin();
- q != p->overlays.end(); ++q) {
+ for (vector<uint64_t>::iterator q = p->removed_overlays.begin();
+ q != p->removed_overlays.end();
+ ++q) {
string key;
- get_overlay_key(p->nid, q->key, &key);
- txc_cleanup_sync->rmkey(PREFIX_OVERLAY, key);
+ get_overlay_key(p->nid, *q, &key);
+ t->rmkey(PREFIX_OVERLAY, key);
}
}
- // cleanup the shared overlays. this may double delete something we
- // did above, but that's less work than doing careful ref counting
- // of the overlay key/value pairs.
- for (vector<string>::iterator p = wt.shared_overlay_keys.begin();
- p != wt.shared_overlay_keys.end(); ++p) {
- txc_cleanup_sync->rmkey(PREFIX_OVERLAY, *p);
- }
// cleanup the wal
string key;
get_wal_key(wt.seq, &key);
- txc_cleanup_sync->rmkey(PREFIX_WAL, key);
+ t->rmkey(PREFIX_WAL, key);
}
- db->submit_transaction_sync(txc_cleanup_sync);
+ db->submit_transaction_sync(t);
utime_t finish = ceph_clock_now(NULL);
utime_t dur = finish - start;
dout(20) << __func__ << " committed " << kv_committing.size()
wal_cleaning.pop_front();
}
+ alloc->commit_finish();
+
// this is as good a place as any ...
_reap_collections();
dout(10) << __func__ << " finish" << dendl;
}
-wal_op_t *NewStore::_get_wal_op(TransContext *txc)
+wal_op_t *NewStore::_get_wal_op(TransContext *txc, OnodeRef o)
{
if (!txc->wal_txn) {
txc->wal_txn = new wal_transaction_t;
}
txc->wal_txn->ops.push_back(wal_op_t());
+ txc->wal_op_onodes.push_back(o);
return &txc->wal_txn->ops.back();
}
dout(20) << __func__ << " txc " << txc << " seq " << wt.seq << dendl;
txc->state = TransContext::STATE_WAL_APPLYING;
- assert(txc->pending_aios.empty());
- int r = _do_wal_transaction(wt, txc);
- assert(r == 0);
+ assert(txc->ioc.pending_aios.empty());
+ vector<OnodeRef>::iterator q = txc->wal_op_onodes.begin();
+ for (list<wal_op_t>::iterator p = wt.ops.begin();
+ p != wt.ops.end();
+ ++p, ++q) {
+ int r = _do_wal_op(*p, &txc->ioc);
+ assert(r == 0);
+ }
_txc_state_proc(txc);
return 0;
return 0;
}
-int NewStore::_do_wal_transaction(wal_transaction_t& wt,
- TransContext *txc)
+int NewStore::_do_wal_op(wal_op_t& wo, IOContext *ioc)
{
- vector<int> sync_fds;
- sync_fds.reserve(wt.ops.size());
+ const uint64_t block_size = bdev->get_block_size();
+ const uint64_t block_mask = ~(block_size - 1);
// read all the overlay data first for apply
- _do_read_all_overlays(wt);
+ _do_read_all_overlays(wo);
- for (list<wal_op_t>::iterator p = wt.ops.begin(); p != wt.ops.end(); ++p) {
- switch (p->op) {
- case wal_op_t::OP_WRITE:
- {
- dout(20) << __func__ << " write " << p->fid << " "
- << p->offset << "~" << p->length << dendl;
- unsigned flags = O_RDWR;
- if (g_conf->newstore_o_direct &&
- (p->offset & ~CEPH_PAGE_MASK) == 0 &&
- (p->length & ~CEPH_PAGE_MASK) == 0) {
- dout(20) << __func__ << " page-aligned io, using O_DIRECT, "
- << p->data.buffers().size() << " buffers" << dendl;
- flags |= O_DIRECT | O_DSYNC;
- if (!p->data.is_page_aligned()) {
- dout(20) << __func__ << " rebuilding buffer to be page-aligned"
- << dendl;
- p->data.rebuild();
- }
- }
- int fd = _open_fid(p->fid, flags);
- if (fd < 0)
- return fd;
-#ifdef HAVE_LIBAIO
- if (g_conf->newstore_aio && txc && (flags & O_DIRECT)) {
- txc->pending_aios.push_back(FS::aio_t(txc, fd));
- FS::aio_t& aio = txc->pending_aios.back();
- p->data.prepare_iov(&aio.iov);
- aio.pwritev(p->offset);
- dout(2) << __func__ << " prepared aio " << &aio << dendl;
- } else
-#endif
- {
- int r = ::lseek64(fd, p->offset, SEEK_SET);
- if (r < 0) {
- r = -errno;
- derr << __func__ << " lseek64 on " << fd << " got: "
- << cpp_strerror(r) << dendl;
- return r;
- }
- r = p->data.write_fd(fd);
- if (r < 0) {
- derr << __func__ << " write_fd on " << fd << " got: "
- << cpp_strerror(r) << dendl;
- return r;
- }
- if (!(flags & O_DIRECT))
- sync_fds.push_back(fd);
- else
- VOID_TEMP_FAILURE_RETRY(::close(fd));
- }
- }
- break;
- case wal_op_t::OP_ZERO:
- {
- dout(20) << __func__ << " zero " << p->fid << " "
- << p->offset << "~" << p->length << dendl;
- int fd = _open_fid(p->fid, O_RDWR);
- if (fd < 0)
- return fd;
- int r = fs->zero(fd, p->offset, p->length);
- if (r < 0) {
- derr << __func__ << " zero on " << fd << " got: "
- << cpp_strerror(r) << dendl;
- return r;
- }
- // FIXME: do aio fdatasync?
- sync_fds.push_back(fd);
- }
- break;
- case wal_op_t::OP_TRUNCATE:
- {
- dout(20) << __func__ << " truncate " << p->fid << " "
- << p->offset << dendl;
- int fd = _open_fid(p->fid, O_RDWR);
- if (fd < 0)
- return fd;
- int r = ::ftruncate(fd, p->offset);
- if (r < 0) {
- r = -errno;
- derr << __func__ << " truncate on " << fd << " got: "
- << cpp_strerror(r) << dendl;
- return r;
- }
- // note: we are not syncing this truncate. instead, we are
- // careful about only reading as much of the fragment as we
- // know is valid, and truncating to expected size before
- // extending the file.
+ switch (wo.op) {
+ case wal_op_t::OP_WRITE:
+ {
+ dout(20) << __func__ << " write " << wo.extent << dendl;
+ // FIXME: do the reads async?
+ bufferlist bl;
+ bl.claim(wo.data);
+ uint64_t offset = wo.extent.offset;
+ bufferlist first;
+ uint64_t first_len = offset & ~block_mask;
+ if (first_len) {
+ offset = offset & block_mask;
+ dout(20) << __func__ << " reading initial partial block "
+ << offset << "~" << block_size << dendl;
+ bdev->read(offset, block_size, &first, ioc);
+ bufferlist t;
+ t.substr_of(first, 0, first_len);
+ t.claim_append(bl);
+ bl.swap(t);
+ }
+ if (wo.extent.end() & ~block_mask) {
+ uint64_t last_offset = wo.extent.end() & block_mask;
+ bufferlist last;
+ if (last_offset == offset && first.length()) {
+ last.claim(first); // same block we read above
+ } else {
+ dout(20) << __func__ << " reading trailing partial block "
+ << last_offset << "~" << block_size << dendl;
+ bdev->read(last_offset, block_size, &last, ioc);
}
- break;
-
- case wal_op_t::OP_REMOVE:
- dout(20) << __func__ << " remove " << p->fid << dendl;
- _remove_fid(p->fid);
- // note: we do not fsync the directory. instead, we tolerate
- // leaked fragments in a crash. in practice, this will be
- // exceedingly rare.
- break;
+ bufferlist t;
+ uint64_t endoff = wo.extent.end() & ~block_mask;
+ t.substr_of(last, endoff, block_size - endoff);
+ bl.claim_append(t);
+ }
+ assert((bl.length() & ~block_mask) == 0);
+ bdev->aio_write(offset, bl, ioc);
+ }
+ break;
- default:
- assert(0 == "unrecognized wal op");
+ case wal_op_t::OP_ZERO:
+ {
+ dout(20) << __func__ << " zero " << wo.extent << dendl;
+ uint64_t offset = wo.extent.offset;
+ uint64_t length = wo.extent.length;
+ bufferlist first;
+ uint64_t first_len = offset & ~block_mask;
+ if (first_len) {
+ uint64_t first_offset = offset & block_mask;
+ dout(20) << __func__ << " reading initial partial block "
+ << first_offset << "~" << block_size << dendl;
+ bdev->read(first_offset, block_size, &first, ioc);
+ size_t z_len = MIN(block_size - first_len, length);
+ memset(first.c_str() + first_len, 0, z_len);
+ bdev->aio_write(first_offset, first, ioc);
+ offset += block_size - first_len;
+ length -= z_len;
+ }
+ assert(offset % block_size == 0);
+ if (length >= block_size) {
+ uint64_t middle_len = length & block_mask;
+ dout(20) << __func__ << " zero " << offset << "~" << length << dendl;
+ bdev->aio_zero(offset, middle_len, ioc);
+ offset += middle_len;
+ length -= middle_len;
+ }
+ assert(offset % block_size == 0);
+ if (length > 0) {
+ assert(length < block_size);
+ bufferlist last;
+ dout(20) << __func__ << " reading trailing partial block "
+ << offset << "~" << block_size << dendl;
+ bdev->read(offset, block_size, &last, ioc);
+ memset(last.c_str(), 0, length);
+ bdev->aio_write(offset, last, ioc);
}
}
+ break;
- for (vector<int>::iterator p = sync_fds.begin();
- p != sync_fds.end();
- ++p) {
- int r = ::fdatasync(*p);
- assert(r == 0);
- VOID_TEMP_FAILURE_RETRY(::close(*p));
+ default:
+ assert(0 == "unrecognized wal op");
}
return 0;
int NewStore::_wal_replay()
{
dout(10) << __func__ << " start" << dendl;
- KeyValueDB::Iterator it = db->get_iterator(PREFIX_WAL);
- it->lower_bound(string());
- KeyValueDB::Transaction cleanup = db->get_transaction();
+ OpSequencerRef osr = new OpSequencer;
int count = 0;
- while (it->valid()) {
+ KeyValueDB::Iterator it = db->get_iterator(PREFIX_WAL);
+ for (it->lower_bound(string()); it->valid(); it->next(), ++count) {
+ dout(20) << __func__ << " replay " << pretty_binary_string(it->key())
+ << dendl;
+ TransContext *txc = _txc_create(osr.get());
+ txc->wal_txn = new wal_transaction_t;
bufferlist bl = it->value();
bufferlist::iterator p = bl.begin();
- wal_transaction_t wt;
try {
- ::decode(wt, p);
+ ::decode(*txc->wal_txn, p);
} catch (buffer::error& e) {
- derr << __func__ << " failed to decode wal txn " << it->key() << dendl;
+ derr << __func__ << " failed to decode wal txn "
+ << pretty_binary_string(it->key()) << dendl;
return -EIO;
}
-
- // Get the overlay data of the WAL for replay
- _do_read_all_overlays(wt);
- dout(20) << __func__ << " replay " << it->key() << dendl;
- int r = _do_wal_transaction(wt, NULL); // don't bother with aio here
- if (r < 0)
- return r;
- cleanup->rmkey(PREFIX_WAL, it->key());
- ++count;
- it->next();
- }
- if (count) {
- dout(10) << __func__ << " cleanup" << dendl;
- db->submit_transaction_sync(cleanup);
+ txc->state = TransContext::STATE_KV_DONE;
+ _txc_state_proc(txc);
}
+ dout(20) << __func__ << " flushing osr" << dendl;
+ osr->flush();
dout(10) << __func__ << " completed " << count << " events" << dendl;
return 0;
}
void NewStore::_txc_aio_submit(TransContext *txc)
{
- int num = txc->pending_aios.size();
- dout(10) << __func__ << " txc " << txc << " submitting " << num << dendl;
- assert(num > 0);
- txc->num_aio.set(num);
-
- // move these aside, and get our end iterator position now, as the
- // aios might complete as soon as they are submitted and queue more
- // wal aio's.
- list<FS::aio_t>::iterator e = txc->submitted_aios.begin();
- txc->submitted_aios.splice(e, txc->pending_aios);
- list<FS::aio_t>::iterator p = txc->submitted_aios.begin();
- assert(p != e);
- bool done = false;
- while (!done) {
- FS::aio_t& aio = *p;
- dout(20) << __func__ << " aio " << &aio << " fd " << aio.fd << dendl;
- for (vector<iovec>::iterator q = aio.iov.begin(); q != aio.iov.end(); ++q)
- dout(30) << __func__ << " iov " << (void*)q->iov_base
- << " len " << q->iov_len << dendl;
- dout(30) << " fd " << aio.fd << " offset " << lseek64(aio.fd, 0, SEEK_CUR)
- << dendl;
-
- // be careful: as soon as we submit aio we race with completion.
- // since we are holding a ref take care not to dereference txc at
- // all after that point.
- list<FS::aio_t>::iterator cur = p;
- ++p;
- done = (p == e);
-
- // do not dereference txc (or it's contents) after we submit (if
- // done == true and we don't loop)
- int retries = 0;
- int r = aio_queue.submit(*cur, &retries);
- if (retries)
- derr << __func__ << " retries " << retries << dendl;
- if (r) {
- derr << " aio submit got " << cpp_strerror(r) << dendl;
- assert(r == 0);
- }
- }
+ dout(10) << __func__ << " txc " << txc << dendl;
+ bdev->aio_submit(&txc->ioc);
}
int NewStore::_txc_add_transaction(TransContext *txc, Transaction *t)
txc->t->rmkey(PREFIX_OVERLAY, key);
o->onode.overlay_map.erase(p++);
}
- o->onode.shared_overlays.clear();
+ o->onode.overlay_refs.clear();
return 0;
}
p->first + p->second.length <= offset + length) {
dout(20) << __func__ << " rm " << p->first << " " << p->second
<< dendl;
- if (o->onode.shared_overlays.count(p->second.key) == 0) {
+ if (o->onode.put_overlay_ref(p->second.key)) {
string key;
get_overlay_key(o->onode.nid, p->first, &key);
txc->t->rmkey(PREFIX_OVERLAY, key);
uint64_t by = offset + length - p->first;
nov.value_offset += by;
nov.length -= by;
- o->onode.shared_overlays.insert(p->second.key);
+ o->onode.get_overlay_ref(p->second.key);
++p;
++changed;
}
{
_do_overlay_trim(txc, o, offset, length);
+ // let's avoid considering how overlay interacts with cached tail
+ // blocks for now.
+ o->clear_tail();
+
dout(10) << __func__ << " " << o->oid << " "
<< offset << "~" << length << dendl;
overlay_t& ov = o->onode.overlay_map[offset] =
return 0;
}
-int NewStore::_do_write_all_overlays(TransContext *txc,
- OnodeRef o)
+int NewStore::_do_write_overlays(TransContext *txc,
+ OnodeRef o,
+ uint64_t orig_offset,
+ uint64_t orig_length)
{
if (o->onode.overlay_map.empty())
return 0;
- for (map<uint64_t,overlay_t>::iterator p = o->onode.overlay_map.begin();
- p != o->onode.overlay_map.end(); ) {
- dout(10) << __func__ << " overlay " << p->first
- << "~" << p->second.length << " " << p->second << dendl;
-
- // find or create frag
- uint64_t frag_first = 0;
- fragment_t *f;
- map<uint64_t,fragment_t>::iterator fp = o->onode.find_fragment(p->first);
- if (fp == o->onode.data_map.end() ||
- fp->first >= p->first + p->second.length ||
- fp->first + fp->second.length <= p->first) {
- dout(20) << __func__ << " frag " << fp->first << " " << fp->second
- << dendl;
- frag_first = p->first - p->first % o->onode.frag_size;
- if (frag_first == fp->first) {
- // extend existing frag
- f = &fp->second;
- int r = _clean_fid_tail(txc, *f);
- if (r < 0)
- return r;
- f->length = (p->first + p->second.length) - frag_first;
- dout(20) << __func__ << " extended " << f->fid << " to "
- << frag_first << "~" << f->length << dendl;
- } else {
- // new frag
- f = &o->onode.data_map[frag_first];
- f->offset = 0;
- f->length = p->first + p->second.length - frag_first;
- assert(f->length <= o->onode.frag_size);
- int fd = _create_fid(txc, o, &f->fid, O_RDWR);
- if (fd < 0) {
- return fd;
+ uint64_t min_alloc_size = g_conf->newstore_min_alloc_size;
+
+ uint64_t offset = 0;
+ uint64_t length = 0;
+ wal_op_t *op = NULL;
+
+ map<uint64_t,overlay_t>::iterator p =
+ o->onode.overlay_map.lower_bound(orig_offset);
+ while (true) {
+ if (p != o->onode.overlay_map.end() && p->first < orig_offset + orig_length) {
+ if (!op) {
+ dout(10) << __func__ << " overlay " << p->first
+ << "~" << p->second.length << " " << p->second
+ << " (first)" << dendl;
+ op = _get_wal_op(txc, o);
+ op->nid = o->onode.nid;
+ op->op = wal_op_t::OP_WRITE;
+ op->overlays.push_back(p->second);
+ offset = p->first;
+ length = p->second.length;
+
+ if (o->onode.put_overlay_ref(p->second.key)) {
+ string key;
+ get_overlay_key(o->onode.nid, p->first, &key);
+ txc->t->rmkey(PREFIX_OVERLAY, key);
}
- VOID_TEMP_FAILURE_RETRY(::close(fd));
- dout(20) << __func__ << " create " << f->fid << dendl;
+ o->onode.overlay_map.erase(p++);
+ continue;
}
- } else {
- frag_first = fp->first;
- f = &fp->second;
- }
- assert(frag_first <= p->first);
- assert(p->first + p->second.length <= frag_first + f->length);
- wal_op_t *op = _get_wal_op(txc);
- op->op = wal_op_t::OP_WRITE;
- op->offset = p->first - frag_first;
- op->length = p->second.length;
- op->fid = f->fid;
- // The overlays will be removed from the db after applying the WAL
- op->nid = o->onode.nid;
- op->overlays.push_back(p->second);
-
- // Combine with later overlays if contiguous
- map<uint64_t,overlay_t>::iterator prev = p, next = p;
- ++next;
- while (next != o->onode.overlay_map.end()) {
- if (prev->first + prev->second.length == next->first &&
- next->first < frag_first + o->onode.frag_size) {
- dout(10) << __func__ << " combining overlay " << next->first
- << "~" << next->second << dendl;
- op->length += next->second.length;
- op->overlays.push_back(next->second);
-
- if (next->first + next->second.length > frag_first + f->length) {
- f->length = next->first + next->second.length - frag_first;
- dout(20) << __func__ << " extended fragment " << f->fid
- << " to " << frag_first << "~" << f->length << dendl;
+ // contiguous? and in the same allocation unit?
+ if (offset + length == p->first &&
+ p->first % min_alloc_size) {
+ dout(10) << __func__ << " overlay " << p->first
+ << "~" << p->second.length << " " << p->second
+ << " (contiguous)" << dendl;
+ op->overlays.push_back(p->second);
+ length += p->second.length;
+
+ if (o->onode.put_overlay_ref(p->second.key)) {
+ string key;
+ get_overlay_key(o->onode.nid, p->first, &key);
+ txc->t->rmkey(PREFIX_OVERLAY, key);
}
-
- ++prev;
- ++next;
- } else {
- break;
+ o->onode.overlay_map.erase(p++);
+ continue;
}
}
- p = next;
- }
+ if (!op) {
+ break;
+ }
+ assert(length <= min_alloc_size);
- // put the shared overlay keys into the WAL transaction, so that we
- // can cleanup them later after applying the WAL
- for (set<uint64_t>::iterator p = o->onode.shared_overlays.begin();
- p != o->onode.shared_overlays.end();
- ++p) {
- dout(10) << __func__ << " shared overlay " << *p << dendl;
- string key;
- get_overlay_key(o->onode.nid, *p, &key);
- txc->wal_txn->shared_overlay_keys.push_back(key);
+ // emit
+ map<uint64_t, extent_t>::iterator bp = o->onode.find_extent(offset);
+ if (bp == o->onode.block_map.end() ||
+ length == min_alloc_size) {
+ int r = _do_allocate(txc, o, offset, length, 0, false);
+ if (r < 0)
+ return r;
+ bp = o->onode.find_extent(offset);
+ if (bp->second.has_flag(extent_t::FLAG_UNWRITTEN)) {
+ dout(10) << __func__ << " zero new allocation " << bp->second << dendl;
+ bdev->aio_zero(bp->second.offset, bp->second.length, &txc->ioc);
+ bp->second.clear_flag(extent_t::FLAG_UNWRITTEN);
+ }
+ }
+ uint64_t x_off = offset - bp->first;
+ dout(10) << __func__ << " wal write " << offset << "~" << length
+ << " to extent " << bp->first << ": " << bp->second
+ << " x_off " << x_off << " overlay data from "
+ << offset << "~" << length << dendl;
+ op->extent.offset = bp->second.offset + x_off;
+ op->extent.length = length;
+ op = NULL;
+
+ if (p == o->onode.overlay_map.end() || p->first >= orig_offset + orig_length) {
+ break;
+ }
+ ++p;
}
- o->onode.overlay_map.clear();
- o->onode.shared_overlays.clear();
txc->write_onode(o);
return 0;
}
-void NewStore::_do_read_all_overlays(wal_transaction_t& wt)
+void NewStore::_do_read_all_overlays(wal_op_t& wo)
{
- for (list<wal_op_t>::iterator p = wt.ops.begin(); p != wt.ops.end(); ++p) {
- for (vector<overlay_t>::iterator q = p->overlays.begin();
- q != p->overlays.end(); ++q) {
- string key;
- get_overlay_key(p->nid, q->key, &key);
- bufferlist bl, bl_data;
- db->get(PREFIX_OVERLAY, key, &bl);
- bl_data.substr_of(bl, q->value_offset, q->length);
- p->data.claim_append(bl_data);
- }
+ for (vector<overlay_t>::iterator q = wo.overlays.begin();
+ q != wo.overlays.end(); ++q) {
+ string key;
+ get_overlay_key(wo.nid, q->key, &key);
+ bufferlist bl, bl_data;
+ db->get(PREFIX_OVERLAY, key, &bl);
+ bl_data.substr_of(bl, q->value_offset, q->length);
+ wo.data.claim_append(bl_data);
}
return;
}
dout(30) << __func__ << " " << o
<< " nid " << o->onode.nid
<< " size " << o->onode.size
- << " frag_size " << o->onode.frag_size
<< " expected_object_size " << o->onode.expected_object_size
<< " expected_write_size " << o->onode.expected_write_size
<< dendl;
dout(30) << __func__ << " attr " << p->first
<< " len " << p->second.length() << dendl;
}
- for (map<uint64_t,fragment_t>::iterator p = o->onode.data_map.begin();
- p != o->onode.data_map.end();
+ uint64_t pos = 0;
+ for (map<uint64_t,extent_t>::iterator p = o->onode.block_map.begin();
+ p != o->onode.block_map.end();
++p) {
- dout(30) << __func__ << " fragment " << p->first << " " << p->second
+ dout(30) << __func__ << " extent " << p->first << " " << p->second
<< dendl;
+ assert(p->first >= pos);
+ pos = p->first + p->second.length;
}
+ pos = 0;
for (map<uint64_t,overlay_t>::iterator p = o->onode.overlay_map.begin();
p != o->onode.overlay_map.end();
++p) {
dout(30) << __func__ << " overlay " << p->first << " " << p->second
<< dendl;
+ assert(p->first >= pos);
+ pos = p->first + p->second.length;
+ }
+ if (!o->onode.overlay_refs.empty()) {
+ dout(30) << __func__ << " overlay_refs " << o->onode.overlay_refs << dendl;
+ }
+}
+
+void NewStore::_pad_zeros(
+ OnodeRef o,
+ bufferlist *bl, uint64_t *offset, uint64_t *length,
+ uint64_t block_size)
+{
+ dout(40) << "before:\n";
+ bl->hexdump(*_dout);
+ *_dout << dendl;
+ // front
+ size_t front_pad = *offset % block_size;
+ size_t back_pad = 0;
+ if (front_pad) {
+ size_t front_copy = MIN(block_size - front_pad, *length);
+ bufferptr z = buffer::create_page_aligned(block_size);
+ memset(z.c_str(), 0, front_pad);
+ memcpy(z.c_str() + front_pad, bl->get_contiguous(0, front_copy), front_copy);
+ if (front_copy + front_pad < block_size) {
+ back_pad = block_size - (*length + front_pad);
+ memset(z.c_str() + front_pad + *length, 0, back_pad);
+ }
+ bufferlist old, t;
+ old.swap(*bl);
+ t.substr_of(old, front_copy, *length - front_copy);
+ bl->append(z);
+ bl->claim_append(t);
+ *offset -= front_pad;
+ *length += front_pad + back_pad;
+ }
+
+ // back
+ uint64_t end = *offset + *length;
+ unsigned back_copy = end % block_size;
+ if (back_copy) {
+ assert(back_pad == 0);
+ back_pad = block_size - back_copy;
+ assert(back_copy <= *length);
+ bufferptr tail(block_size);
+ memcpy(tail.c_str(), bl->get_contiguous(*length - back_copy, back_copy),
+ back_copy);
+ memset(tail.c_str() + back_copy, 0, back_pad);
+ bufferlist old;
+ old.swap(*bl);
+ bl->substr_of(old, 0, *length - back_copy);
+ bl->append(tail);
+ *length += back_pad;
+ if (end > o->onode.size && g_conf->newstore_cache_tails) {
+ o->tail_bl.clear();
+ o->tail_bl.append(tail, 0, back_copy);
+ o->tail_offset = end - back_copy;
+ dout(20) << __func__ << " cached "<< back_copy << " of tail block at "
+ << o->tail_offset << dendl;
+ }
}
- if (!o->onode.shared_overlays.empty()) {
- dout(30) << __func__ << " shared_overlays " << o->onode.shared_overlays
- << dendl;
+ dout(20) << __func__ << " pad " << front_pad << " + " << back_pad
+ << " on front/back, now " << *offset << "~" << *length << dendl;
+ dout(40) << "after:\n";
+ bl->hexdump(*_dout);
+ *_dout << dendl;
+}
+
+int NewStore::_do_allocate(TransContext *txc,
+ OnodeRef o,
+ uint64_t orig_offset, uint64_t orig_length,
+ uint32_t fadvise_flags,
+ bool allow_overlay)
+{
+ dout(20) << __func__
+ << " " << o->oid << " " << orig_offset << "~" << orig_length
+ << " - have " << o->onode.size
+ << " bytes in " << o->onode.block_map.size()
+ << " extents" << dendl;
+ uint64_t min_alloc_size = g_conf->newstore_min_alloc_size;
+
+ // start with any full blocks we will write
+ uint64_t offset = orig_offset;
+ uint64_t length = orig_length;
+ uint64_t head = 0;
+ uint64_t tail = 0;
+ if (offset % min_alloc_size) {
+ head = min_alloc_size - (offset % min_alloc_size);
+ offset += head;
+ if (length >= head)
+ length -= head;
+ }
+ if ((offset + length) % min_alloc_size) {
+ tail = (offset + length) % min_alloc_size;
+ if (length >= tail)
+ length -= tail;
+ }
+
+ map<uint64_t, extent_t>::iterator bp;
+
+ uint64_t orig_end = orig_offset + orig_length;
+ if (orig_offset / min_alloc_size == orig_end / min_alloc_size) {
+ // we fall within the same block
+ offset = orig_offset - orig_offset % min_alloc_size;
+ length = 0;
+ assert(offset <= orig_offset);
+ dout(20) << " io falls within " << offset << "~" << min_alloc_size << dendl;
+ if (allow_overlay && _can_overlay_write(o, orig_length)) {
+ dout(20) << " entire write will be captured by overlay" << dendl;
+ } else {
+ bp = o->onode.find_extent(offset);
+ if (bp == o->onode.block_map.end()) {
+ dout(20) << " not yet allocated" << dendl;
+ length = min_alloc_size;
+ } else {
+ dout(20) << " will presumably WAL" << dendl;
+ }
+ }
+ } else {
+ dout(20) << " initial full " << offset << "~" << length
+ << ", head " << head << " tail " << tail << dendl;
+
+ // include tail?
+ if (tail) {
+ if (allow_overlay && _can_overlay_write(o, tail)) {
+ dout(20) << " tail " << head << " will be captured by overlay" << dendl;
+ } else {
+ bp = o->onode.find_extent(orig_offset + orig_length - 1);
+ if (bp == o->onode.block_map.end()) {
+ dout(20) << " tail " << tail << " not yet allocated" << dendl;
+ length += min_alloc_size;
+ } else {
+ dout(20) << " tail " << tail << " will presumably WAL" << dendl;
+ }
+ }
+ }
+
+ // include head?
+ bp = o->onode.find_extent(orig_offset);
+ if (head) {
+ if (allow_overlay && _can_overlay_write(o, head)) {
+ dout(20) << " head " << head << " will be captured by overlay" << dendl;
+ } else if (bp == o->onode.block_map.end()) {
+ dout(20) << " head " << head << " not yet allocated" << dendl;
+ offset -= min_alloc_size;
+ length += min_alloc_size;
+ } else {
+ dout(20) << " head " << head << " will presumably WAL" << dendl;
+ }
+ }
}
+
+ if (length) {
+ dout(20) << " must alloc " << offset << "~" << length << dendl;
+
+ // positional hint
+ uint64_t hint = 0;
+
+ int r = alloc->reserve(length);
+ if (r < 0) {
+ derr << __func__ << " failed to reserve " << length << dendl;
+ return r;
+ }
+
+ // deallocate existing extents
+ bp = o->onode.seek_extent(offset);
+ while (bp != o->onode.block_map.end() &&
+ bp->first < offset + length &&
+ bp->first + bp->second.length > offset) {
+ dout(30) << " bp " << bp->first << ": " << bp->second << dendl;
+ if (bp->first < offset) {
+ uint64_t left = offset - bp->first;
+ if (bp->first + bp->second.length <= offset + length) {
+ dout(20) << " trim tail " << bp->first << ": " << bp->second << dendl;
+ _txc_release(txc,
+ bp->second.offset + left,
+ bp->second.length - left);
+ bp->second.length = left;
+ dout(20) << " now " << bp->first << ": " << bp->second << dendl;
+ hint = bp->first + bp->second.length;
+ ++bp;
+ } else {
+ dout(20) << " split " << bp->first << ": " << bp->second << dendl;
+ _txc_release(txc, bp->second.offset + left, length);
+ o->onode.block_map[offset + length] =
+ extent_t(bp->second.offset + left + length,
+ bp->second.length - (left + length));
+ bp->second.length = left;
+ dout(20) << " left " << bp->first << ": " << bp->second << dendl;
+ ++bp;
+ dout(20) << " right " << bp->first << ": " << bp->second << dendl;
+ assert(bp->first == offset + length);
+ hint = bp->first + bp->second.length;
+ }
+ } else {
+ assert(bp->first >= offset);
+ if (bp->first + bp->second.length > offset + length) {
+ uint64_t overlap = offset + length - bp->first;
+ dout(20) << " trim head " << bp->first << ": " << bp->second
+ << " (overlap " << overlap << ")" << dendl;
+ _txc_release(txc, bp->second.offset, overlap);
+ o->onode.block_map[bp->first + overlap] =
+ extent_t(bp->second.offset + overlap,
+ bp->second.length - overlap);
+ o->onode.block_map.erase(bp++);
+ dout(20) << " now " << bp->first << ": " << bp->second << dendl;
+ assert(bp->first == offset + length);
+ hint = bp->first;
+ } else {
+ dout(20) << " dealloc " << bp->first << ": " << bp->second << dendl;
+ _txc_release(txc, bp->second.offset, bp->second.length);
+ hint = bp->first + bp->second.length;
+ o->onode.block_map.erase(bp++);
+ }
+ }
+ }
+
+ // allocate our new extent(s)
+ while (length > 0) {
+ extent_t e;
+ // for safety, set the UNWRITTEN flag here. We should clear this in
+ // _do_write or else we likely have problems.
+ e.flags |= extent_t::FLAG_UNWRITTEN;
+ int r = alloc->allocate(length, min_alloc_size, hint,
+ &e.offset, &e.length);
+ assert(r == 0);
+ assert(e.length <= length); // bc length is a multiple of min_alloc_size
+ txc->allocated.insert(e.offset, e.length);
+ o->onode.block_map[offset] = e;
+ dout(10) << __func__ << " alloc " << offset << ": " << e << dendl;
+ length -= e.length;
+ offset += e.length;
+ hint = e.end();
+ }
+ }
+
+ return 0;
+}
+
+bool NewStore::_can_overlay_write(OnodeRef o, uint64_t length)
+{
+ return
+ (int)o->onode.overlay_map.size() < g_conf->newstore_overlay_max &&
+ (int)length <= g_conf->newstore_overlay_max_length;
}
int NewStore::_do_write(TransContext *txc,
bufferlist& orig_bl,
uint32_t fadvise_flags)
{
- int fd = -1;
int r = 0;
- unsigned flags;
dout(20) << __func__
<< " " << o->oid << " " << orig_offset << "~" << orig_length
<< " - have " << o->onode.size
- << " bytes in " << o->onode.data_map.size()
- << " fragments" << dendl;
+ << " bytes in " << o->onode.block_map.size()
+ << " extents" << dendl;
_dump_onode(o);
o->exists = true;
- if (!o->onode.frag_size && o->onode.data_map.empty() &&
- o->onode.overlay_map.empty()) {
- o->onode.frag_size = g_conf->newstore_min_frag_size;
- dout(20) << __func__ << " set frag_size " << o->onode.frag_size << dendl;
+ if (orig_length == 0) {
+ return 0;
}
- if (orig_offset + orig_length > o->onode.size) {
- dout(20) << __func__ << " extending size to " << orig_offset + orig_length
- << dendl;
- o->onode.size = orig_offset + orig_length;
+ uint64_t block_size = bdev->get_block_size();
+ const uint64_t block_mask = ~(block_size - 1);
+ uint64_t min_alloc_size = g_conf->newstore_min_alloc_size;
+ map<uint64_t, extent_t>::iterator bp;
+ uint64_t length;
+
+ r = _do_allocate(txc, o, orig_offset, orig_length, fadvise_flags, true);
+ if (r < 0) {
+ derr << __func__ << " allocate failed, " << cpp_strerror(r) << dendl;
+ goto out;
}
- uint64_t frag_size = o->onode.frag_size;
- uint64_t length;
+ bp = o->onode.seek_extent(orig_offset);
for (uint64_t offset = orig_offset;
offset < orig_offset + orig_length;
offset += length) {
- // calc length for this fragment / chunk
+ // cut to extent
length = orig_offset + orig_length - offset;
- if (offset / frag_size != (offset + length) / frag_size) {
- length = (offset / frag_size + 1) * frag_size - offset;
+ if (bp == o->onode.block_map.end() ||
+ bp->first > offset) {
+ // no allocation; crop at alloc boundary (this will be an overlay write)
+ uint64_t end = ROUND_UP_TO(offset + 1, min_alloc_size);
+ if (offset + length > end)
+ length = end - offset;
+ } else {
+ // we are inside this extent; don't go past it
+ if (bp->first + bp->second.length < offset + length) {
+ assert(bp->first <= offset);
+ length = bp->first + bp->second.length - offset;
+ }
}
- dout(20) << __func__ << " chunk " << offset << "~" << length
- << " (frag_size " << frag_size << ")"
- << dendl;
bufferlist bl;
bl.substr_of(orig_bl, offset - orig_offset, length);
-
- if ((int)o->onode.overlay_map.size() < g_conf->newstore_overlay_max &&
- (int)length <= g_conf->newstore_overlay_max_length) {
- // write an overlay
+ if (bp == o->onode.block_map.end())
+ dout(20) << __func__ << " chunk " << offset << "~" << length
+ << " (no extent)" << dendl;
+ else
+ dout(20) << __func__ << " chunk " << offset << "~" << length
+ << " extent " << bp->first << ": " << bp->second << dendl;
+
+ if (_can_overlay_write(o, length)) {
r = _do_overlay_write(txc, o, offset, length, bl);
if (r < 0)
goto out;
- txc->write_onode(o);
+ if (bp != o->onode.block_map.end() &&
+ bp->first < offset + length)
+ ++bp;
continue;
}
- flags = O_RDWR;
- if (g_conf->newstore_o_direct &&
- (offset & ~CEPH_PAGE_MASK) == 0 &&
- (length & ~CEPH_PAGE_MASK) == 0) {
- dout(20) << __func__ << " page-aligned, can use O_DIRECT, "
- << bl.buffers().size() << " buffers" << dendl;
- flags |= O_DIRECT | O_DSYNC;
- if (!bl.is_page_aligned()) {
- dout(20) << __func__ << " rebuilding buffer to be page-aligned" << dendl;
- bl.rebuild();
- }
- }
-
- map<uint64_t, fragment_t>::iterator fp =
- o->onode.data_map.lower_bound(offset);
-
- if ((fp == o->onode.data_map.end() || fp->first > offset) &&
- fp != o->onode.data_map.begin()) {
- --fp;
- if (offset < fp->first + frag_size &&
- offset >= fp->first + fp->second.length) {
- // -- append (possibly with gap) --
- fragment_t &f = fp->second;
- fd = _open_fid(f.fid, flags);
- if (fd < 0) {
- r = fd;
- goto out;
- }
- r = _clean_fid_tail_fd(f, fd); // in case there is trailing crap
- if (r < 0) {
- goto out;
- }
- f.length = (offset + length) - fp->first;
- uint64_t x_offset = offset - fp->first;
- dout(20) << __func__ << " append " << f.fid << " writing "
- << offset << "~" << length << " to "
- << x_offset << "~" << length << dendl;
-#ifdef HAVE_LIBAIO
- if (g_conf->newstore_aio && (flags & O_DIRECT)) {
- txc->pending_aios.push_back(FS::aio_t(txc, fd));
- FS::aio_t& aio = txc->pending_aios.back();
- bl.prepare_iov(&aio.iov);
- txc->aio_bl.append(bl);
- aio.pwritev(x_offset);
- dout(2) << __func__ << " prepared aio " << &aio << dendl;
- } else
-#endif
- {
- ::lseek64(fd, x_offset, SEEK_SET);
- r = bl.write_fd(fd);
- if (r < 0) {
- derr << __func__ << " bl.write_fd error: " << cpp_strerror(r)
- << dendl;
- goto out;
- }
- txc->sync_fd(fd);
- }
- continue;
+ assert(bp != o->onode.block_map.end());
+ assert(offset >= bp->first);
+ assert(offset + length <= bp->first + bp->second.length);
+
+ // (pad and) overwrite unused portion of extent for an append?
+ if (offset > bp->first &&
+ offset >= o->onode.size && // past eof +
+ (offset / block_size != (o->onode.size - 1) / block_size)) {// diff block
+ dout(20) << __func__ << " append" << dendl;
+ _pad_zeros(o, &bl, &offset, &length, block_size);
+ assert(offset % block_size == 0);
+ assert(length % block_size == 0);
+ // the trailing block is zeroed to the end
+ uint64_t from = MAX(ROUND_UP_TO(o->onode.size, block_size), bp->first);
+ if (offset > from) {
+ uint64_t x_off = from - bp->first;
+ uint64_t z_len = offset - from;
+ dout(20) << __func__ << " zero " << from << "~" << z_len
+ << " x_off " << x_off << dendl;
+ bdev->aio_zero(bp->second.offset + x_off, z_len, &txc->ioc);
}
- ++fp;
+ uint64_t x_off = offset - bp->first;
+ dout(20) << __func__ << " write " << offset << "~" << length
+ << " x_off " << x_off << dendl;
+ bdev->aio_write(bp->second.offset + x_off, bl, &txc->ioc);
+ bp->second.clear_flag(extent_t::FLAG_UNWRITTEN);
+ ++bp;
+ continue;
}
- if (fp == o->onode.data_map.end() ||
- fp->first >= offset + length) {
- // -- new frag --
- // Note: unless we are at EOF, allocate the full frag_size,
- // even though we only write to part of it.
- uint64_t frag_first = offset - (offset % frag_size);
- fragment_t &f = o->onode.data_map[frag_first];
- f.offset = 0;
- f.length = MIN(frag_size, o->onode.size - frag_first);
- assert(f.length <= frag_size);
- fd = _create_fid(txc, o, &f.fid, flags);
- if (fd < 0) {
- r = fd;
- goto out;
- }
- uint64_t x_offset = offset - frag_first;
- dout(20) << __func__ << " create " << f.fid << " writing "
- << offset << "~" << length
- << " to " << x_offset << "~" << length << dendl;
-#ifdef HAVE_LIBAIO
- if (g_conf->newstore_aio && (flags & O_DIRECT)) {
- txc->pending_aios.push_back(FS::aio_t(txc, fd));
- FS::aio_t& aio = txc->pending_aios.back();
- bl.prepare_iov(&aio.iov);
- txc->aio_bl.append(bl);
- aio.pwritev(x_offset);
- dout(2) << __func__ << " prepared aio " << &aio << dendl;
- } else
-#endif
- {
- ::lseek64(fd, x_offset, SEEK_SET);
- r = bl.write_fd(fd);
- if (r < 0) {
- derr << __func__ << " bl.write_fd error: " << cpp_strerror(r) << dendl;
- goto out;
+ // use cached tail block?
+ uint64_t tail_start = o->onode.size - o->onode.size % block_size;
+ if (offset >= bp->first &&
+ offset > tail_start &&
+ offset + length >= o->onode.size &&
+ o->tail_bl.length() &&
+ (offset / block_size == (o->onode.size - 1) / block_size)) {
+ dout(20) << __func__ << " using cached tail" << dendl;
+ assert((offset & block_mask) == (o->onode.size & block_mask));
+ uint64_t tail_off = offset % block_size;
+ if (tail_off >= o->tail_bl.length()) {
+ bufferlist t;
+ t = o->tail_bl;
+ if (tail_off > t.length()) {
+ bufferptr z(tail_off - t.length());
+ z.zero();
+ t.append(z);
}
- txc->sync_fd(fd);
+ offset -= t.length();
+ length += t.length();
+ t.claim_append(bl);
+ bl.swap(t);
+ } else {
+ bufferlist t;
+ t.substr_of(o->tail_bl, 0, tail_off);
+ offset -= t.length();
+ length += t.length();
+ t.claim_append(bl);
+ bl.swap(t);
}
+ assert(offset == tail_start);
+ assert(!bp->second.has_flag(extent_t::FLAG_UNWRITTEN));
+ _pad_zeros(o, &bl, &offset, &length, block_size);
+ uint64_t x_off = offset - bp->first;
+ dout(20) << __func__ << " write " << offset << "~" << length
+ << " x_off " << x_off << dendl;
+ bdev->aio_write(bp->second.offset + x_off, bl, &txc->ioc);
+ ++bp;
continue;
}
- if (fp != o->onode.data_map.end() &&
- fp->first == offset &&
- fp->second.length <= length) {
- // -- overwrite/replace entire frag --
- fragment_t& f = fp->second;
-
- _do_overlay_trim(txc, o, offset, length);
+ if (offset + length > (o->onode.size & block_mask) &&
+ o->tail_bl.length()) {
+ dout(20) << __func__ << " clearing cached tail" << dendl;
+ o->clear_tail();
+ }
- wal_op_t *op = _get_wal_op(txc);
- op->op = wal_op_t::OP_REMOVE;
- op->fid = fp->second.fid;
+ if (offset % min_alloc_size == 0 &&
+ length % min_alloc_size == 0) {
+ assert(bp->second.has_flag(extent_t::FLAG_UNWRITTEN));
+ }
- f.length = length;
- fd = _create_fid(txc, o, &f.fid, O_RDWR);
- if (fd < 0) {
- r = fd;
- goto out;
+ if (bp->second.has_flag(extent_t::FLAG_UNWRITTEN)) {
+ _pad_zeros(o, &bl, &offset, &length, block_size);
+ if (offset > bp->first) {
+ uint64_t z_len = offset - bp->first;
+ dout(20) << __func__ << " zero " << bp->first << "~" << z_len << dendl;
+ bdev->aio_zero(bp->second.offset, z_len, &txc->ioc);
}
- dout(20) << __func__ << " replace old fid " << op->fid
- << " with new fid " << f.fid
- << ", writing " << offset << "~" << length
- << " to 0~" << length << dendl;
-
-#ifdef HAVE_LIBAIO
- if (g_conf->newstore_aio && (flags & O_DIRECT)) {
- txc->pending_aios.push_back(FS::aio_t(txc, fd));
- FS::aio_t& aio = txc->pending_aios.back();
- bl.prepare_iov(&aio.iov);
- txc->aio_bl.append(bl);
- aio.pwritev(0);
- dout(2) << __func__ << " prepared aio " << &aio << dendl;
- } else
-#endif
- {
- r = bl.write_fd(fd);
- if (r < 0) {
- derr << __func__ << " bl.write_fd error: " << cpp_strerror(r) << dendl;
- goto out;
- }
- txc->sync_fd(fd);
+ uint64_t x_off = offset - bp->first;
+ dout(20) << __func__ << " write " << offset << "~" << length
+ << " x_off " << x_off << dendl;
+ bdev->aio_write(bp->second.offset + x_off, bl, &txc->ioc);
+ if (offset + length < bp->first + bp->second.length &&
+ offset + length <= o->onode.size) {
+ uint64_t end = offset + length;
+ uint64_t z_len = bp->first + bp->second.length - end;
+ uint64_t x_off = end - bp->first;
+ dout(20) << __func__ << " zero " << end << "~" << z_len
+ << " x_off " << x_off << dendl;
+ bdev->aio_zero(bp->second.offset + x_off, z_len, &txc->ioc);
}
+ bp->second.clear_flag(extent_t::FLAG_UNWRITTEN);
+ ++bp;
continue;
}
- if (true) {
- // -- WAL --
- r = _do_write_all_overlays(txc, o);
- if (r < 0)
- goto out;
- fragment_t& f = fp->second;
- assert(fp->first <= offset);
- assert(offset + length <= fp->first + fp->second.length);
- r = _clean_fid_tail(txc, f);
- if (r < 0)
- goto out;
- wal_op_t *op = _get_wal_op(txc);
- op->op = wal_op_t::OP_WRITE;
- op->offset = offset - fp->first;
- op->length = length;
- op->fid = f.fid;
- op->data = bl;
- if (offset + length - fp->first > f.length) {
- f.length = offset + length - fp->first;
- }
- dout(20) << __func__ << " wal " << f.fid << " write "
- << offset << "~" << length << " to "
- << op->offset << "~" << op->length
- << dendl;
- continue;
+ // WAL.
+ r = _do_write_overlays(txc, o, bp->first, bp->second.length);
+ if (r < 0)
+ goto out;
+ assert(bp->first <= offset);
+ assert(offset + length <= bp->first + bp->second.length);
+ wal_op_t *op = _get_wal_op(txc, o);
+ op->op = wal_op_t::OP_WRITE;
+ op->extent.offset = bp->second.offset + offset - bp->first;
+ op->extent.length = length;
+ op->data = bl;
+ if (offset + length - bp->first > bp->second.length) {
+ op->extent.length = offset + length - bp->first;
}
-
- assert(0 == "don't get here");
+ dout(20) << __func__ << " wal write "
+ << offset << "~" << length << " to " << op->extent
+ << dendl;
+ ++bp;
+ continue;
}
r = 0;
- out:
- return r;
-}
-
-int NewStore::_clean_fid_tail_fd(const fragment_t& f, int fd)
-{
- struct stat st;
- int r = ::fstat(fd, &st);
- if (r < 0) {
- r = -errno;
- derr << __func__ << " failed to fstat " << f.fid << ": "
- << cpp_strerror(r) << dendl;
- return r;
+ if (orig_offset + orig_length > o->onode.size) {
+ dout(20) << __func__ << " extending size to " << orig_offset + orig_length
+ << dendl;
+ o->onode.size = orig_offset + orig_length;
}
- if (st.st_size > f.offset + f.length) {
- dout(20) << __func__ << " frag " << f.fid << " is long (" << st.st_size
- << "), truncating to " << (f.offset + f.length) << dendl;
- r = ::ftruncate(fd, f.offset + f.length);
- if (r < 0) {
- derr << __func__ << " failed to ftruncate " << f.fid << ": "
- << cpp_strerror(r) << dendl;
- return r;
+
+ // make sure we didn't leave unwritten extents behind
+ for (map<uint64_t,extent_t>::iterator p = o->onode.block_map.begin();
+ p != o->onode.block_map.end();
+ ++p) {
+ if (p->second.has_flag(extent_t::FLAG_UNWRITTEN)) {
+ derr << __func__ << " left behind an unwritten extent, out of sync with "
+ << "_do_allocate" << dendl;
+ _dump_onode(o);
+ assert(0 == "leaked unwritten extent");
}
- return 1;
}
- return 0;
-}
-int NewStore::_clean_fid_tail(TransContext *txc, const fragment_t& f)
-{
- int fd = _open_fid(f.fid, O_RDWR);
- if (fd < 0) {
- return fd;
- }
- int r = _clean_fid_tail_fd(f, fd);
- if (r < 0) {
- return r;
- }
- if (r > 0) {
- txc->sync_fd(fd);
- } else {
- // all good!
- VOID_TEMP_FAILURE_RETRY(::close(fd));
- }
- return 0;
+ out:
+ return r;
}
-
int NewStore::_write(TransContext *txc,
CollectionRef& c,
const ghobject_t& oid,
// overlay
_do_overlay_trim(txc, o, offset, length);
- map<uint64_t,fragment_t>::iterator fp = o->onode.find_fragment(offset);
- while (fp != o->onode.data_map.end()) {
- if (fp->first >= offset + length)
+ map<uint64_t,extent_t>::iterator bp = o->onode.seek_extent(offset);
+ while (bp != o->onode.block_map.end()) {
+ if (bp->first >= offset + length)
break;
- fragment_t& f = fp->second;
- if (offset <= fp->first &&
- offset + length >= fp->first + fp->second.length) {
+ if (offset <= bp->first &&
+ (offset + length >= bp->first + bp->second.length ||
+ offset >= o->onode.size)) {
// remove fragment
- dout(20) << __func__ << " wal rm fragment " << fp->first << " "
- << f << dendl;
- wal_op_t *op = _get_wal_op(txc);
- op->op = wal_op_t::OP_REMOVE;
- op->fid = f.fid;
- o->onode.data_map.erase(fp++);
+ dout(20) << __func__ << " dealloc " << bp->first << ": "
+ << bp->second << dendl;
+ _txc_release(txc, bp->second.offset, bp->second.length);
+ o->onode.block_map.erase(bp++);
continue;
}
- // start,end are offsets in the fragment
- uint64_t start = 0;
- if (offset > fp->first) {
- start = offset - fp->first;
+ // start,end are offsets in the extent
+ uint64_t x_off = 0;
+ if (offset > bp->first) {
+ x_off = offset - bp->first;
}
- assert(o->onode.frag_size);
- uint64_t end = MIN(offset + length - fp->first,
- o->onode.frag_size);
-
- if (end >= f.length) {
- // truncate fragment
- wal_op_t *op = _get_wal_op(txc);
- op->op = wal_op_t::OP_TRUNCATE;
- op->fid = f.fid;
- op->offset = start;
- dout(20) << __func__ << " wal truncate fragment " << fp->first << " "
- << f << " to " << start << dendl;
- } else {
- // WAL
- r = _clean_fid_tail(txc, f);
- if (r < 0)
- goto out;
+ uint64_t x_len = MIN(length, bp->second.length - x_off);
- wal_op_t *op = _get_wal_op(txc);
- op->op = wal_op_t::OP_ZERO;
- op->offset = start;
- op->length = end - start;
- op->fid = f.fid;
- }
+ // WAL
+ wal_op_t *op = _get_wal_op(txc, o);
+ op->op = wal_op_t::OP_ZERO;
+ op->extent.offset = bp->second.offset + x_off;
+ op->extent.length = x_len;
+ dout(20) << __func__ << " wal zero " << x_off << "~" << x_len
+ << " " << op->extent << dendl;
- // size fragment up?
- if (end > f.length) {
- f.length = end;
- dout(20) << __func__ << " frag " << f.fid << " sized up to "
- << f.length << dendl;
- }
- fp++;
+ bp++;
}
if (offset + length > o->onode.size) {
}
txc->write_onode(o);
- out:
dout(10) << __func__ << " " << c->cid << " " << oid
<< " " << offset << "~" << length
<< " = " << r << dendl;
int NewStore::_do_truncate(TransContext *txc, OnodeRef o, uint64_t offset)
{
+ uint64_t block_size = bdev->get_block_size();
+ uint64_t min_alloc_size = g_conf->newstore_min_alloc_size;
+ uint64_t alloc_end = ROUND_UP_TO(offset, min_alloc_size);
+
+ // ensure any wal IO has completed before we truncate off any extents
+ // they may touch.
+ o->flush();
+
// trim down fragments
- map<uint64_t,fragment_t>::iterator fp = o->onode.data_map.end();
- if (fp != o->onode.data_map.begin())
- --fp;
- while (fp != o->onode.data_map.end()) {
- if (fp->first + fp->second.length <= offset) {
+ map<uint64_t,extent_t>::iterator bp = o->onode.block_map.end();
+ if (bp != o->onode.block_map.begin())
+ --bp;
+ while (bp != o->onode.block_map.end()) {
+ if (bp->first + bp->second.length <= alloc_end) {
break;
}
- if (fp->first >= offset) {
- dout(20) << __func__ << " wal rm fragment " << fp->first << " "
- << fp->second << dendl;
- wal_op_t *op = _get_wal_op(txc);
- op->op = wal_op_t::OP_REMOVE;
- op->fid = fp->second.fid;
- if (fp != o->onode.data_map.begin()) {
- o->onode.data_map.erase(fp--);
+ if (bp->first >= alloc_end) {
+ dout(20) << __func__ << " dealloc " << bp->first << ": "
+ << bp->second << dendl;
+ _txc_release(txc, bp->second.offset, bp->second.length);
+ if (bp != o->onode.block_map.begin()) {
+ o->onode.block_map.erase(bp--);
continue;
} else {
- o->onode.data_map.erase(fp);
+ o->onode.block_map.erase(bp);
break;
}
} else {
- assert(fp->first + fp->second.length > offset);
- assert(fp->first < offset);
- uint64_t newlen = offset - fp->first;
- dout(20) << __func__ << " wal truncate fragment " << fp->first << " "
- << fp->second << " to " << newlen << dendl;
- fragment_t& f = fp->second;
- f.length = newlen;
- wal_op_t *op = _get_wal_op(txc);
- op->op = wal_op_t::OP_TRUNCATE;
- op->offset = offset;
- op->fid = f.fid;
+ assert(bp->first + bp->second.length > alloc_end);
+ assert(bp->first < alloc_end);
+ uint64_t newlen = alloc_end - bp->first;
+ assert(newlen % min_alloc_size == 0);
+ dout(20) << __func__ << " trunc " << bp->first << ": " << bp->second
+ << " to " << newlen << dendl;
+ _txc_release(txc, bp->second.offset + newlen, bp->second.length - newlen);
+ bp->second.length = newlen;
break;
}
}
- // truncate up trailing fragment?
+ // zero extent if trimming up?
+ if (offset > o->onode.size) {
+ map<uint64_t,extent_t>::iterator bp = o->onode.block_map.end();
+ if (bp != o->onode.block_map.begin())
+ --bp;
+ if (bp != o->onode.block_map.end() &&
+ bp->first + bp->second.length > o->onode.size) {
+ // we need to zero from onode.size to offset.
+ assert(offset > bp->first); // else we would have trimmed it above
+ assert(o->onode.size > bp->first); // we do no preallocation (yet)
+ uint64_t x_off = o->onode.size - bp->first;
+ uint64_t x_len = ROUND_UP_TO(offset, block_size) - o->onode.size;
+ wal_op_t *op = _get_wal_op(txc, o);
+ op->op = wal_op_t::OP_ZERO;
+ op->extent.offset = bp->second.offset + x_off;
+ op->extent.length = x_len;
+ dout(20) << __func__ << " wal zero " << x_off << "~" << x_len
+ << " " << op->extent << dendl;
+ }
+ } else if (offset < o->onode.size &&
+ offset % block_size != 0) {
+ // zero trailing block?
+ map<uint64_t,extent_t>::iterator bp = o->onode.find_extent(offset);
+ if (bp != o->onode.block_map.end()) {
+ wal_op_t *op = _get_wal_op(txc, o);
+ op->op = wal_op_t::OP_ZERO;
+ uint64_t z_len = block_size - offset % block_size;
+ op->extent.offset = bp->second.offset + offset - bp->first;
+ op->extent.length = block_size - offset % block_size;
+ dout(20) << __func__ << " wal zero tail " << offset << "~" << z_len
+ << " at " << op->extent << dendl;
+ }
+ }
// trim down overlays
map<uint64_t,overlay_t>::iterator op = o->onode.overlay_map.end();
break;
}
if (op->first >= offset) {
- if (!o->onode.shared_overlays.count(op->second.key)) {
+ if (o->onode.put_overlay_ref(op->second.key)) {
dout(20) << __func__ << " rm overlay " << op->first << " "
<< op->second << dendl;
string key;
txc->t->rmkey(PREFIX_OVERLAY, key);
} else {
dout(20) << __func__ << " rm overlay " << op->first << " "
- << op->second << " (shared)" << dendl;
+ << op->second << " (put ref)" << dendl;
}
if (op != o->onode.overlay_map.begin()) {
o->onode.overlay_map.erase(op--);
}
}
+ // trim down cached tail
+ if (o->tail_bl.length()) {
+ if (offset / block_size != o->onode.size / block_size) {
+ dout(20) << __func__ << " clear cached tail" << dendl;
+ o->clear_tail();
+ }
+ }
+
o->onode.size = offset;
txc->write_onode(o);
return 0;
{
string key;
o->exists = false;
- if (!o->onode.data_map.empty()) {
- for (map<uint64_t,fragment_t>::iterator p = o->onode.data_map.begin();
- p != o->onode.data_map.end();
+ if (!o->onode.block_map.empty()) {
+ for (map<uint64_t,extent_t>::iterator p = o->onode.block_map.begin();
+ p != o->onode.block_map.end();
++p) {
- dout(20) << __func__ << " will wal remove " << p->second.fid << dendl;
- wal_op_t *op = _get_wal_op(txc);
- op->op = wal_op_t::OP_REMOVE;
- op->fid = p->second.fid;
+ dout(20) << __func__ << " dealloc " << p->second << dendl;
+ _txc_release(txc, p->second.offset, p->second.length);
}
}
- o->onode.data_map.clear();
+ o->onode.block_map.clear();
+ _do_overlay_clear(txc, o);
o->onode.size = 0;
if (o->onode.omap_head) {
_do_omap_clear(txc, o->onode.omap_head);
break;
}
txc->t->rmkey(PREFIX_OMAP, it->key());
- dout(30) << __func__ << " rm " << it->key() << dendl;
+ dout(30) << __func__ << " rm " << pretty_binary_string(it->key()) << dendl;
it->next();
}
}
break;
}
txc->t->rmkey(PREFIX_OMAP, it->key());
- dout(30) << __func__ << " rm " << it->key() << dendl;
+ dout(30) << __func__ << " rm " << pretty_binary_string(it->key()) << dendl;
it->next();
}
r = 0;
o->onode.expected_write_size = expected_write_size;
txc->write_onode(o);
- if (o->onode.data_map.empty() && o->onode.overlay_map.empty()) {
- // FIXME: we could do something clever with onode.frag_size here.
- }
-
out:
dout(10) << __func__ << " " << c->cid << " " << oid
<< " object_size " << expected_object_size
goto out;
// truncate any old data
- while (!newo->onode.data_map.empty()) {
- wal_op_t *op = _get_wal_op(txc);
- op->op = wal_op_t::OP_REMOVE;
- op->fid = newo->onode.data_map.rbegin()->second.fid;
- newo->onode.data_map.erase(newo->onode.data_map.rbegin()->first);
- }
+ r = _do_truncate(txc, newo, 0);
+ if (r < 0)
+ goto out;
r = _do_write(txc, newo, 0, oldo->onode.size, bl, 0);