// for more than filestore_max_inline_xattrs attrs
OPTION(filestore_max_inline_xattrs, OPT_U32, 2)
+OPTION(filestore_sloppy_crc, OPT_BOOL, false) // track sloppy crcs
+OPTION(filestore_sloppy_crc_block_size, OPT_INT, 65536)
+
OPTION(filestore_max_sync_interval, OPT_DOUBLE, 5) // seconds
OPTION(filestore_min_sync_interval, OPT_DOUBLE, .01) // seconds
OPTION(filestore_btrfs_snap, OPT_BOOL, true)
int FileStore::lfn_truncate(coll_t cid, const ghobject_t& oid, off_t length)
{
IndexedPath path;
- int r = lfn_find(cid, oid, &path);
+ FDRef fd;
+ int r = lfn_open(cid, oid, false, &fd, &path);
if (r < 0)
return r;
- r = ::truncate(path->path(), length);
+ r = ::ftruncate(**fd, length);
if (r < 0)
r = -errno;
+ if (r >= 0 && m_filestore_sloppy_crc) {
+ int rc = backend->_crc_update_truncate(**fd, length);
+ assert(rc >= 0);
+ }
assert(!m_filestore_fail_eio || r != -EIO);
return r;
}
m_filestore_queue_committing_max_ops(g_conf->filestore_queue_committing_max_ops),
m_filestore_queue_committing_max_bytes(g_conf->filestore_queue_committing_max_bytes),
m_filestore_do_dump(false),
- m_filestore_dump_fmt(true)
+ m_filestore_dump_fmt(true),
+ m_filestore_sloppy_crc(g_conf->filestore_sloppy_crc),
+ m_filestore_sloppy_crc_block_size(g_conf->filestore_sloppy_crc_block_size)
{
m_filestore_kill_at.set(g_conf->filestore_kill_at);
}
bptr.set_length(got); // properly size the buffer
bl.push_back(bptr); // put it in the target bufferlist
+
+ if (m_filestore_sloppy_crc && (!replaying || backend->can_checkpoint())) {
+ ostringstream ss;
+ int errors = backend->_crc_verify_read(**fd, offset, got, bl, &ss);
+ if (errors > 0) {
+ dout(0) << "FileStore::read " << cid << "/" << oid << " " << offset << "~"
+ << got << " ... BAD CRC:\n" << ss.str() << dendl;
+ assert(0 == "bad crc on read");
+ }
+ }
+
lfn_close(fd);
dout(10) << "FileStore::read " << cid << "/" << oid << " " << offset << "~"
if (r == 0)
r = bl.length();
+ if (r >= 0 && m_filestore_sloppy_crc) {
+ int rc = backend->_crc_update_write(**fd, offset, len, bl);
+ assert(rc >= 0);
+ }
+
// flush?
if (!replaying &&
g_conf->filestore_wbthrottle_enable)
ret = -errno;
lfn_close(fd);
+ if (ret >= 0 && m_filestore_sloppy_crc) {
+ int rc = backend->_crc_update_zero(**fd, offset, len);
+ assert(rc >= 0);
+ }
+
if (ret == 0)
goto out; // yay!
if (ret != -EOPNOTSUPP)
break;
pos += r;
}
+ if (r >= 0 && m_filestore_sloppy_crc) {
+ int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
+ assert(rc >= 0);
+ }
dout(20) << "_do_copy_range " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
return r;
}
"filestore_kill_at",
"filestore_fail_eio",
"filestore_replica_fadvise",
+ "filestore_sloppy_crc",
+ "filestore_sloppy_crc_block_size",
NULL
};
return KEYS;
changed.count("filestore_queue_committing_max_bytes") ||
changed.count("filestore_kill_at") ||
changed.count("filestore_fail_eio") ||
+ changed.count("filestore_sloppy_crc") ||
+ changed.count("filestore_sloppy_crc_block_size") ||
changed.count("filestore_replica_fadvise")) {
Mutex::Locker l(lock);
m_filestore_min_sync_interval = conf->filestore_min_sync_interval;
m_filestore_kill_at.set(conf->filestore_kill_at);
m_filestore_fail_eio = conf->filestore_fail_eio;
m_filestore_replica_fadvise = conf->filestore_replica_fadvise;
+ m_filestore_sloppy_crc = conf->filestore_sloppy_crc;
+ m_filestore_sloppy_crc_block_size = conf->filestore_sloppy_crc_block_size;
}
if (changed.count("filestore_commit_timeout")) {
Mutex::Locker l(sync_entry_timeo_lock);
std::ofstream m_filestore_dump;
JSONFormatter m_filestore_dump_fmt;
atomic_t m_filestore_kill_at;
+ bool m_filestore_sloppy_crc;
+ int m_filestore_sloppy_crc_block_size;
FSSuperblock superblock;
/**
int _copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff) {
return filestore->_do_copy_range(from, to, srcoff, len, dstoff);
}
+ int get_crc_block_size() {
+ return filestore->m_filestore_sloppy_crc_block_size;
+ }
public:
FileStoreBackend(FileStore *fs) : filestore(fs) {}
virtual ~FileStoreBackend() {};
virtual bool has_fiemap() = 0;
virtual int do_fiemap(int fd, off_t start, size_t len, struct fiemap **pfiemap) = 0;
virtual int clone_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff) = 0;
+
+ // hooks for (sloppy) crc tracking
+ virtual int _crc_update_write(int fd, loff_t off, size_t len, const bufferlist& bl) = 0;
+ virtual int _crc_update_truncate(int fd, loff_t off) = 0;
+ virtual int _crc_update_zero(int fd, loff_t off, size_t len) = 0;
+ virtual int _crc_update_clone_range(int srcfd, int destfd,
+ loff_t srcoff, size_t len, loff_t dstoff) = 0;
+ virtual int _crc_verify_read(int fd, loff_t off, size_t len, const bufferlist& bl,
+ ostream *out) = 0;
};
#endif
#include "common/config.h"
#include "common/sync_filesystem.h"
+#include "common/SloppyCRCMap.h"
+#include "os/chain_xattr.h"
+
+#define SLOPPY_CRC_XATTR "user.cephos.scrc"
+
+
#define dout_subsys ceph_subsys_filestore
#undef dout_prefix
#define dout_prefix *_dout << "genericfilestorebackend(" << get_basedir_path() << ") "
free(fiemap);
return ret;
}
+
+
+int GenericFileStoreBackend::_crc_load_or_init(int fd, SloppyCRCMap *cm)
+{
+ char buf[100];
+ bufferptr bp;
+ int l = chain_fgetxattr(fd, SLOPPY_CRC_XATTR, buf, sizeof(buf));
+ if (l == -ENODATA) {
+ return 0;
+ }
+ if (l >= 0) {
+ bp = buffer::create(l);
+ memcpy(bp.c_str(), buf, l);
+ } else if (l == -ERANGE) {
+ l = chain_fgetxattr(fd, SLOPPY_CRC_XATTR, 0, 0);
+ if (l > 0) {
+ bp = buffer::create(l);
+ l = chain_fgetxattr(fd, SLOPPY_CRC_XATTR, bp.c_str(), l);
+ }
+ }
+ bufferlist bl;
+ bl.append(bp);
+ bufferlist::iterator p = bl.begin();
+ try {
+ ::decode(*cm, p);
+ }
+ catch (buffer::error &e) {
+ return -EIO;
+ }
+ return 0;
+}
+
+int GenericFileStoreBackend::_crc_save(int fd, SloppyCRCMap *cm)
+{
+ bufferlist bl;
+ ::encode(*cm, bl);
+ return chain_fsetxattr(fd, SLOPPY_CRC_XATTR, bl.c_str(), bl.length());
+}
+
+int GenericFileStoreBackend::_crc_update_write(int fd, loff_t off, size_t len, const bufferlist& bl)
+{
+ SloppyCRCMap scm(get_crc_block_size());
+ int r = _crc_load_or_init(fd, &scm);
+ if (r < 0)
+ return r;
+ ostringstream ss;
+ scm.write(off, len, bl, &ss);
+ dout(30) << __func__ << "\n" << ss.str() << dendl;
+ r = _crc_save(fd, &scm);
+ return r;
+}
+
+int GenericFileStoreBackend::_crc_update_truncate(int fd, loff_t off)
+{
+ SloppyCRCMap scm(get_crc_block_size());
+ int r = _crc_load_or_init(fd, &scm);
+ if (r < 0)
+ return r;
+ scm.truncate(off);
+ r = _crc_save(fd, &scm);
+ return r;
+}
+
+int GenericFileStoreBackend::_crc_update_zero(int fd, loff_t off, size_t len)
+{
+ SloppyCRCMap scm(get_crc_block_size());
+ int r = _crc_load_or_init(fd, &scm);
+ if (r < 0)
+ return r;
+ scm.zero(off, len);
+ r = _crc_save(fd, &scm);
+ return r;
+}
+
+int GenericFileStoreBackend::_crc_update_clone_range(int srcfd, int destfd,
+ loff_t srcoff, size_t len, loff_t dstoff)
+{
+ SloppyCRCMap scm_src(get_crc_block_size());
+ SloppyCRCMap scm_dst(get_crc_block_size());
+ int r = _crc_load_or_init(srcfd, &scm_src);
+ if (r < 0)
+ return r;
+ r = _crc_load_or_init(destfd, &scm_dst);
+ if (r < 0)
+ return r;
+ ostringstream ss;
+ scm_dst.clone_range(srcoff, len, dstoff, scm_src, &ss);
+ dout(30) << __func__ << "\n" << ss.str() << dendl;
+ r = _crc_save(destfd, &scm_dst);
+ return r;
+}
+
+int GenericFileStoreBackend::_crc_verify_read(int fd, loff_t off, size_t len, const bufferlist& bl,
+ ostream *out)
+{
+ SloppyCRCMap scm(get_crc_block_size());
+ int r = _crc_load_or_init(fd, &scm);
+ if (r < 0)
+ return r;
+ return scm.read(off, len, bl, out);
+}
#include "FileStore.h"
+class SloppyCRCMap;
+
class GenericFileStoreBackend : public FileStoreBackend {
private:
bool ioctl_fiemap;
public:
GenericFileStoreBackend(FileStore *fs);
virtual ~GenericFileStoreBackend() {};
+
virtual int detect_features();
virtual int create_current();
virtual bool can_checkpoint() { return false; };
virtual int clone_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff) {
return _copy_range(from, to, srcoff, len, dstoff);
}
+
+private:
+ int _crc_load_or_init(int fd, SloppyCRCMap *cm);
+ int _crc_save(int fd, SloppyCRCMap *cm);
+public:
+ virtual int _crc_update_write(int fd, loff_t off, size_t len, const bufferlist& bl);
+ virtual int _crc_update_truncate(int fd, loff_t off);
+ virtual int _crc_update_zero(int fd, loff_t off, size_t len);
+ virtual int _crc_update_clone_range(int srcfd, int destfd,
+ loff_t srcoff, size_t len, loff_t dstoff);
+ virtual int _crc_verify_read(int fd, loff_t off, size_t len, const bufferlist& bl,
+ ostream *out);
};
#endif