From b293d451146513fabf1a059ffb14781d0299a622 Mon Sep 17 00:00:00 2001 From: Tommi Virtanen Date: Thu, 7 Apr 2011 12:24:06 -0700 Subject: [PATCH] osbdb: Remove dead code. Nothing has used the BerkeleyDB-backed object storage in years. Signed-off-by: Tommi Virtanen --- debian/copyright | 4 - src/Makefile.am | 3 - src/common/config.h | 10 - src/doc/bdb.txt | 48 - src/os/BDBMap.h | 138 -- src/os/FakeStoreBDBCollections.h | 167 --- src/os/FileStore.h | 1 - src/osbdb/OSBDB.cc | 2169 ------------------------------ src/osbdb/OSBDB.h | 482 ------- src/osd/OSD.cc | 5 - src/test/old/testos.cc | 10 - src/test/old/testosbdb.cc | 347 ----- 12 files changed, 3384 deletions(-) delete mode 100644 src/doc/bdb.txt delete mode 100644 src/os/BDBMap.h delete mode 100644 src/os/FakeStoreBDBCollections.h delete mode 100644 src/osbdb/OSBDB.cc delete mode 100644 src/osbdb/OSBDB.h delete mode 100644 src/test/old/testosbdb.cc diff --git a/debian/copyright b/debian/copyright index 95d227b5ccbc4..33a936e351481 100644 --- a/debian/copyright +++ b/debian/copyright @@ -17,10 +17,6 @@ Copyright: Copyright (C) 2010 Canonical License: GPL2 -Files: src/osbdb/OSBDB.h -Copyright: Copyright (C) 2007 Casey Marshall -License: LGPL2.1 - Files: src/mount/canonicalize.c Copyright: Copyright (C) 1993 Rick Sladkey License: LGPL2 or later diff --git a/src/Makefile.am b/src/Makefile.am index f123f677bbb72..4b87b3f071b74 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1024,15 +1024,12 @@ noinst_HEADERS = \ msg/tcp.h\ objclass/objclass.h\ os/btrfs_ioctl.h\ - os/BDBMap.h\ os/Fake.h\ - os/FakeStoreBDBCollections.h\ os/FileJournal.h\ os/FileStore.h\ os/Journal.h\ os/JournalingObjectStore.h\ os/ObjectStore.h\ - osbdb/OSBDB.h\ osd/Ager.h\ osd/OSD.h\ osd/OSDCaps.h\ diff --git a/src/common/config.h b/src/common/config.h index 295b66cb6a444..0f51d118b4fde 100644 --- a/src/common/config.h +++ b/src/common/config.h @@ -488,16 +488,6 @@ public: int bdev_fake_mb; int bdev_fake_max_mb; -#ifdef USE_OSBDB - bool bdbstore; - int debug_bdbstore; - bool bdbstore_btree; - int bdbstore_ffactor; - int bdbstore_nelem; - int bdbstore_pagesize; - int bdbstore_cachesize; - bool bdbstore_transactional; -#endif // USE_OSBDB }; extern md_config_t g_conf; diff --git a/src/doc/bdb.txt b/src/doc/bdb.txt deleted file mode 100644 index 63e647f5bb3cc..0000000000000 --- a/src/doc/bdb.txt +++ /dev/null @@ -1,48 +0,0 @@ -OBJECT STORE ON BERKELEY DB ---------------------------- - -OSBDB is an implementation of an object store that uses Berkeley DB as -the underlying storage. It is meant to be an alternative to EBOFS. - -BUILDING --------- - -You will need to have Berkeley DB installed, including the developent -packages. We've tested this with Berkeley DB 4.4.20 on Ubuntu 6.10. - -To compile OSBDB support, you need to pass the argument "want_bdb=yes" -to "make." If you don't specify this, OSBDB and all its associated -support is not included in the executables. - -RUNNING -------- - -To use OSBDB in Ceph, simply pass the --bdbstore flag to programs. You -don't need to create a "device" for OSBDB ahead of time; Berkeley DB -will take care of creating the files. You also *cannot* use a raw -device as your store -- it must be regular file. - -OSBDB additionally accepts the following flags: - - --bdbstore-btree Configures OSBDB to use the "Btree" - database type for Berkeley DB. The default - database type is "Hash". - - --bdbstore-hash-ffactor Sets the "fill factor" for the hash - database type. Takes an integer argument. - - --bdbstore-hash-nelem Sets the "nelem" parameter for the hash - database type. Takes an integer argument. - - --bdbstore-hash-pagesize Sets the page size for the hash database - type. Takes an integer argument. - - --bdbstore-cachesize Sets the cache size. Takes an integer - argument, which must be a power of two, and - no less than 20 KiB. - - --bdbstore-transactional Enable (in-memory-only) transactions for - all operations in the OSBDB store. - - --debug-bdbstore Set the debug level. Takes an integer - argument. diff --git a/src/os/BDBMap.h b/src/os/BDBMap.h deleted file mode 100644 index 434c87f3c3c3c..0000000000000 --- a/src/os/BDBMap.h +++ /dev/null @@ -1,138 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - -#ifndef CEPH_BERKELEYDB_H -#define CEPH_BERKELEYDB_H - -#include -#include -#include - -#include "common/debug.h" - -using namespace std; - -template -class BDBMap { - private: - DB *dbp; - - public: - BDBMap() : dbp(0) {} - ~BDBMap() { - close(); - } - - bool is_open() { return dbp ? true:false; } - - // open/close - int open(const char *fn) { - //cout << "open " << fn << endl; - - int r; - if ((r = db_create(&dbp, NULL, 0)) != 0) { - derr << "db_create: " << db_strerror(r) << dendl; - assert(0); - } - - dbp->set_errfile(dbp, stderr); - dbp->set_errpfx(dbp, "bdbmap"); - - r = dbp->open(dbp, NULL, fn, NULL, DB_BTREE, DB_CREATE, 0644); - if (r != 0) { - dbp->err(dbp, r, "%s", fn); - } - assert(r == 0); - return 0; - } - void close() { - if (dbp) { - dbp->close(dbp,0); - dbp = 0; - } - } - void remove(const char *fn) { - if (!dbp) open(fn); - if (dbp) { - dbp->remove(dbp, fn, 0, 0); - dbp = 0; - } else { - ::unlink(fn); - } - } - - // accessors - int put(K key, - D data) { - DBT k; - memset(&k, 0, sizeof(k)); - k.data = &key; - k.size = sizeof(K); - DBT d; - memset(&d, 0, sizeof(d)); - d.data = &data; - d.size = sizeof(data); - return dbp->put(dbp, NULL, &k, &d, 0); - } - - int get(K key, - D& data) { - DBT k; - memset(&k, 0, sizeof(k)); - k.data = &key; - k.size = sizeof(key); - DBT d; - memset(&d, 0, sizeof(d)); - d.data = &data; - d.size = sizeof(data); - int r = dbp->get(dbp, NULL, &k, &d, 0); - return r; - } - - int del(K key) { - DBT k; - memset(&k, 0, sizeof(k)); - k.data = &key; - k.size = sizeof(key); - return dbp->del(dbp, NULL, &k, 0); - } - - int list_keys(list& ls) { - DBC *cursor = 0; - int r = dbp->cursor(dbp, NULL, &cursor, 0); - assert(r == 0); - - DBT k,d; - memset(&k, 0, sizeof(k)); - memset(&d, 0, sizeof(d)); - - while ((r = cursor->c_get(cursor, &k, &d, DB_NEXT)) == 0) { - K key; - assert(k.size == sizeof(key)); - memcpy(&key, k.data, k.size); - ls.push_back(key); - } - if (r != DB_NOTFOUND) { - dbp->err(dbp, r, "DBcursor->get"); - assert(r == DB_NOTFOUND); - } - - cursor->c_close(cursor); - return 0; - } - -}; - -#endif diff --git a/src/os/FakeStoreBDBCollections.h b/src/os/FakeStoreBDBCollections.h deleted file mode 100644 index 34b7e43301e3f..0000000000000 --- a/src/os/FakeStoreBDBCollections.h +++ /dev/null @@ -1,167 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - -#ifndef CEPH_FAKESTOREBDBCOLLECTIONS_H -#define CEPH_FAKESTOREBDBCOLLECTIONS_H - -#include "BDBMap.h" -#include "ObjectStore.h" -#include "common/Mutex.h" - -#define BDBHASH_DIRS 128LL -#define BDBHASH_FUNC(x) (((x) ^ ((x)>>30) ^ ((x)>>18) ^ ((x)>>45) ^ 0xdead1234) * 884811 % BDBHASH_DIRS) - -class FakeStoreBDBCollections { - private: - int whoami; - string basedir; - - Mutex bdblock; - - // collection dbs - BDBMap collections; - map*> collection_map; - - // dirs - void get_dir(string& dir) { - dir = basedir + "/" + string(whoami); - } - void get_collfn(coll_t c, string &fn) { - char s[100]; - snprintf(s, sizeof(s), "%d/%02llx/%016llx.co", whoami, BDBHASH_FUNC(c), c); - fn = basedir + "/" + s; - } - - void open_collections() { - string cfn; - get_dir(cfn); - cfn += "/collections"; - collections.open(cfn.c_str()); - list ls; - collections.list_keys(ls); - } - void close_collections() { - if (collections.is_open()) - collections.close(); - - for (map*>::iterator it = collection_map.begin(); - it != collection_map.end(); - it++) { - it->second->close(); - } - collection_map.clear(); - } - - int open_collection(coll_t c) { - if (collection_map.count(c)) - return 0; // already open. - - string fn; - get_collfn(c,fn); - collection_map[c] = new BDBMap; - int r = collection_map[c]->open(fn.c_str()); - if (r != 0) - collection_map.erase(c); // failed - return r; - } - - public: - FakeStoreBDBCollections(int w, string& bd) : whoami(w), basedir(bd) {} - ~FakeStoreBDBCollections() { - close_collections(); - } - - int list_collections(list& ls) { - bdblock.Lock(); - if (!collections.is_open()) open_collections(); - - ls.clear(); - collections.list_keys(ls); - bdblock.Unlock(); - return 0; - } - int create_collection(coll_t c) { - bdblock.Lock(); - if (!collections.is_open()) open_collections(); - - collections.put(c, 1); - open_collection(c); - bdblock.Unlock(); - return 0; - } - int destroy_collection(coll_t c) { - bdblock.Lock(); - if (!collections.is_open()) open_collections(); - - collections.del(c); - - open_collection(c); - collection_map[c]->close(); - - string fn; - get_collfn(c,fn); - collection_map[c]->remove(fn.c_str()); - delete collection_map[c]; - collection_map.erase(c); - bdblock.Unlock(); - return 0; - } - int collection_stat(coll_t c, struct stat *st) { - bdblock.Lock(); - if (!collections.is_open()) open_collections(); - - string fn; - get_collfn(c,fn); - int r = ::stat(fn.c_str(), st); - bdblock.Unlock(); - return r; - } - bool collection_exists(coll_t c) { - bdblock.Lock(); - struct stat st; - int r = collection_stat(c, &st) == 0; - bdblock.Unlock(); - return r; - } - int collection_add(coll_t c, object_t o) { - bdblock.Lock(); - if (!collections.is_open()) open_collections(); - - open_collection(c); - collection_map[c]->put(o,1); - bdblock.Unlock(); - return 0; - } - int collection_remove(coll_t c, object_t o) { - bdblock.Lock(); - if (!collections.is_open()) open_collections(); - - open_collection(c); - collection_map[c]->del(o); - bdblock.Unlock(); - return 0; - } - int collection_list(coll_t c, list& o) { - bdblock.Lock(); - if (!collections.is_open()) open_collections(); - - open_collection(c); - collection_map[c]->list_keys(o); - bdblock.Unlock(); - return 0; - } -}; - -#endif diff --git a/src/os/FileStore.h b/src/os/FileStore.h index bfe1bc35975e5..a052b1a03452d 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -25,7 +25,6 @@ #include "common/Mutex.h" #include "Fake.h" -//#include "FakeStoreBDBCollections.h" #include #include diff --git a/src/osbdb/OSBDB.cc b/src/osbdb/OSBDB.cc deleted file mode 100644 index 2adc2d5fb1c7b..0000000000000 --- a/src/osbdb/OSBDB.cc +++ /dev/null @@ -1,2169 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* OSBDB.cc -- ObjectStore on top of Berkeley DB. - Copyright (C) 2007 Casey Marshall - -Ceph - scalable distributed file system - -This is free software; you can redistribute it and/or -modify it under the terms of the GNU Lesser General Public -License version 2.1, as published by the Free Software -Foundation. See file COPYING. */ - - -#include -#include -#include -#include "OSBDB.h" -#include "common/Timer.h" - -using namespace std; - -#define dout_prefix *_dout << "bdbstore(" << device << ")@" << __LINE__ << "." -#define derr(x) dout(x) - -#define CLEANUP(onsafe) do { \ - dout(6) << "DELETE " << hex << onsafe << dec << dendl; \ - delete onsafe; \ - } while (0) -#define COMMIT(onsafe) do { \ - dout(6) << "COMMIT " << hex << onsafe << dec << dendl; \ - sync(onsafe); \ - } while (0) - - // Have a lock, already. - -class scoped_lock -{ -private: - Mutex *m; -public: - scoped_lock(Mutex *m) : m(m) { m->Lock(); } - ~scoped_lock() { m->Unlock(); } -}; - - // Utilities. - -// Starting off with my own bsearch; mail reader to follow... - -// Perform a binary search on a sorted array, returning the insertion -// point for key, or key if it is exactly found. In other words, this -// will return a pointer to the element that will come after key if -// key were to be inserted into the sorted array. -// -// Requires that T have < and > operators defined. -template -uint32_t binary_search (T *array, size_t size, T key) -{ - int low = 0; - int high = size; - int p = (low + high) / 2; - - while (low < high - 1) - { - if (array[p] > key) - { - high = p; - } - else if (array[p] < key) - { - low = p; - } - else - return p; - - p = (low + high) / 2; - } - - if (array[p] < key) - p++; - else if (array[p] > key && p > 0) - p--; - return p; -} - - // Management. - -DbEnv *OSBDB::getenv () -{ - DbEnv *envp = new DbEnv (DB_CXX_NO_EXCEPTIONS); - if (g_conf.debug > 1 || g_conf.debug_bdbstore > 1) - envp->set_error_stream (&std::cerr); - if (g_conf.debug > 2 || g_conf.debug_bdbstore > 2) - envp->set_message_stream (&std::cout); - envp->set_flags (DB_LOG_INMEMORY, 1); - //env->set_flags (DB_DIRECT_DB, 1); - int env_flags = (DB_CREATE - | DB_THREAD - //| DB_INIT_LOCK - | DB_INIT_MPOOL - //| DB_INIT_TXN - //| DB_INIT_LOG - | DB_PRIVATE); - if (envp->open (NULL, env_flags, 0) != 0) - { - std::cerr << "failed to open environment " << std::endl; - assert(0); - } - return envp; -} - -int OSBDB::opendb(DBTYPE type, int flags, bool new_env) -{ - env = getenv(); - db = new Db(env, 0); - db->set_error_stream (&std::cerr); - db->set_message_stream (&std::cout); - db->set_flags (0); - if (!g_conf.bdbstore_btree) - { - if (g_conf.bdbstore_pagesize > 0) - db->set_pagesize (g_conf.bdbstore_pagesize); - if (g_conf.bdbstore_ffactor > 0 && g_conf.bdbstore_nelem > 0) - { - db->set_h_ffactor (g_conf.bdbstore_ffactor); - db->set_h_nelem (g_conf.bdbstore_nelem); - } - } - if (g_conf.bdbstore_cachesize > 0) - { - db->set_cachesize (0, g_conf.bdbstore_cachesize, 0); - } - - flags = flags | DB_THREAD; - if (transactional) - flags = flags | DB_AUTO_COMMIT; - - int ret; - if ((ret = db->open (NULL, device.c_str(), NULL, type, flags, 0)) != 0) - { - derr(1) << "failed to open database: " << device << ": " - << db_strerror(ret) << std::dendl; - return -EINVAL; - } - opened = true; - return 0; -} - -int OSBDB::mount() -{ - dout(2) << "mount " << device << dendl; - - if (mounted) - { - dout(4) << "..already mounted" << dendl; - return 0; - } - - if (!opened) - { - int ret; - if ((ret = opendb ()) != 0) - { - dout(4) << "..returns " << ret << dendl; - return ret; - } - } - - // XXX Do we want anything else in the superblock? - - Dbt key (OSBDB_SUPERBLOCK_KEY, 1); - stored_superblock super; - Dbt value (&super, sizeof (super)); - value.set_dlen (sizeof (super)); - value.set_ulen (sizeof (super)); - value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL); - - if (db->get (NULL, &key, &value, 0) != 0) - { - dout(4) << "..get superblock fails" << dendl; - return -EINVAL; // XXX how to say "badly formed fs?" - } - - dout(3) << ".mount " << super << dendl; - - if (super.version != OSBDB_THIS_VERSION) - { - dout(4) << "version mismatch (" << super.version << ")" << dendl; - return -EINVAL; - } - - DBTYPE t; - db->get_type (&t); - - if (t == DB_BTREE) - { - u_int32_t minkey; - u_int32_t flags; - db->get_bt_minkey (&minkey); - db->get_flags (&flags); - dout(1) << "mounted version " << OSBDB_THIS_VERSION << "; Btree; " - << "min keys per page: " << minkey << "; flags: " - << hex << flags << dec << dendl; - cout << dec; - } - else - { - u_int32_t ffactor; - u_int32_t nelem; - u_int32_t flags; - db->get_h_ffactor (&ffactor); - db->get_h_nelem (&nelem); - db->get_flags (&flags); - dout(1) << "mounted version " << OSBDB_THIS_VERSION << "; Hash; " - << "fill factor: " << ffactor - << " table size: " << nelem << "; flags: " - << hex << flags << dec << dendl; - cout << dec; - } - - mounted = true; - dout(4) << "..mounted" << dendl; - return 0; -} - -int OSBDB::umount() -{ - if (!mounted) - return -EINVAL; - - dout(2) << "umount" << dendl; - - int ret; - if (opened) - { - if (transactional) - { - env->log_flush (NULL); - if ((ret = env->lsn_reset (device.c_str(), 0)) != 0) - { - derr(1) << "lsn_reset: " << db_strerror (ret) << dendl; - } - } - - db->sync (0); - - if ((ret = db->close (0)) != 0) - { - derr(1) << "close: " << db_strerror(ret) << dendl; - return -EINVAL; - } - delete db; - db = NULL; - - if (env) - { - env->close (0); - delete env; - env = NULL; - } - } - mounted = false; - opened = false; - dout(4) << "..unmounted" << dendl; - return 0; -} - -int OSBDB::mkfs() -{ - if (mounted) - return -EINVAL; - - dout(2) << "mkfs" << dendl; - - string d = env_dir; - d += device; - unlink (d.c_str()); - - int ret; - if ((ret = opendb((g_conf.bdbstore_btree ? DB_BTREE : DB_HASH), - DB_CREATE, true)) != 0) - { - derr(1) << "failed to open database: " << device << ": " - << db_strerror(ret) << std::dendl; - return -EINVAL; - } - opened = true; - dout(3) << "..opened " << device << dendl; - - uint32_t c; - ret = db->truncate (NULL, &c, 0); - if (ret != 0) - { - derr(1) << "db truncate failed: " << db_strerror (ret) << dendl; - return -EIO; // ??? - } - - Dbt key (OSBDB_SUPERBLOCK_KEY, 1); - struct stored_superblock sb; - sb.version = OSBDB_THIS_VERSION; - Dbt value (&sb, sizeof (sb)); - - dout(3) << "..writing superblock" << dendl; - if ((ret = db->put (NULL, &key, &value, 0)) != 0) - { - derr(1) << "failed to write superblock: " << db_strerror (ret) - << dendl; - return -EIO; - } - dout(3) << "..wrote superblock" << dendl; - dout(4) << "..mkfs done" << dendl; - return 0; -} - - // Objects. - -int OSBDB::pick_object_revision_lt(object_t& oid) -{ - // Not really needed. - dout(0) << "pick_object_revision_lt " << oid << dendl; - return -ENOSYS; -} - -bool OSBDB::exists(object_t oid) -{ - dout(2) << "exists " << oid << dendl; - struct stat st; - bool ret = (stat (oid, &st) == 0); - dout(4) << "..returns " << ret << dendl; - return ret; -} - -int OSBDB::statfs (struct statfs *st) -{ - // Hacky? - if (::statfs (device.c_str(), st) != 0) - { - int ret = -errno; - derr(1) << "statfs returns " << ret << dendl; - return ret; - } - st->f_type = OSBDB_MAGIC; - dout(4) << "..statfs OK" << dendl; - return 0; -} - -int OSBDB::stat(object_t oid, struct stat *st) -{ - if (!mounted) - { - dout(4) << "not mounted!" << dendl; - return -EINVAL; - } - - dout(2) << "stat " << oid << dendl; - - object_inode_key ikey = new_object_inode_key(oid); - stored_object obj; - Dbt key (&ikey, sizeof_object_inode_key()); - Dbt value (&obj, sizeof (obj)); - value.set_flags (DB_DBT_USERMEM); - value.set_ulen (sizeof (obj)); - - dout(3) << " lookup " << ikey << dendl; - int ret; - if ((ret = db->get (NULL, &key, &value, 0)) != 0) - { - derr(1) << " get returned " << ret << dendl; - return -ENOENT; - } - - st->st_size = obj.length; - dout(3) << "stat length:" << obj.length << dendl; - dout(4) << "..stat OK" << dendl; - return 0; -} - -int OSBDB::remove(object_t oid, Context *onsafe) -{ - if (!mounted) - { - derr(1) << "not mounted!" << dendl; - if (onsafe != NULL) - CLEANUP(onsafe); - return -EINVAL; - } - - dout(6) << "Context " << hex << onsafe << dec << dendl; - scoped_lock __lock(&lock); - dout(2) << "remove " << oid << dendl; - - DbTxn *txn = NULL; - if (transactional) - env->txn_begin (NULL, &txn, 0); - - oid_t id; - mkoid(id, oid); - Dbt key (&id, sizeof (oid_t)); - int ret; - if ((ret = db->del (txn, &key, 0)) != 0) - { - derr(1) << ".del returned error: " << db_strerror (ret) << dendl; - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - return -EIO; - } - - object_inode_key _ikey = new_object_inode_key (oid); - Dbt ikey (&_ikey, sizeof_object_inode_key()); - if ((ret = db->del (txn, &ikey, 0)) != 0) - { - derr(1) << ".del returned error: " << db_strerror (ret) << dendl; - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - return -EIO; - } - - attrs_id aids = new_attrs_id (oid); - Dbt askey (&aids, sizeof_attrs_id()); - Dbt asval; - asval.set_flags (DB_DBT_MALLOC); - if (db->get (txn, &askey, &asval, 0) == 0) - { - // We have attributes; remove them. - stored_attrs *sap = (stored_attrs *) asval.get_data(); - auto_ptr sa (sap); - for (unsigned i = 0; i < sap->count; i++) - { - attr_id aid = new_attr_id (oid, sap->names[i].name); - Dbt akey (&aid, sizeof (aid)); - if ((ret = db->del (txn, &akey, 0)) != 0) - { - derr(1) << ".del returns error: " << db_strerror (ret) << dendl; - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - return -EIO; - } - } - if ((ret = db->del (txn, &askey, 0)) != 0) - { - derr(1) << ".del returns error: " << db_strerror (ret) << dendl; - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - return -EIO; - } - } - - // XXX check del return value - - if (txn != NULL) - txn->commit (0); - if (onsafe != NULL) - COMMIT(onsafe); - dout(4) << "..remove OK" << dendl; - return 0; -} - -int OSBDB::truncate(object_t oid, off_t size, Context *onsafe) -{ - dout(6) << "Context " << hex << onsafe << dec << dendl; - - if (!mounted) - { - derr(1) << "not mounted!" << dendl; - if (onsafe != NULL) - CLEANUP(onsafe); - return -EINVAL; - } - - scoped_lock __lock(&lock); - dout(2) << "truncate " << size << dendl; - - if (size > 0xFFFFFFFF) - { - derr(1) << "object size too big!" << dendl; - if (onsafe != NULL) - CLEANUP(onsafe); - return -ENOSPC; - } - - DbTxn *txn = NULL; - - if (transactional) - env->txn_begin (NULL, &txn, 0); - - object_inode_key ikey = new_object_inode_key (oid); - stored_object obj; - Dbt key (&ikey, sizeof_object_inode_key()); - Dbt value (&obj, sizeof (obj)); - value.set_dlen (sizeof (obj)); - value.set_ulen (sizeof (obj)); - value.set_flags (DB_DBT_USERMEM); - - if (db->get (txn, &key, &value, 0) != 0) - { - if (txn) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - dout(4) << "..returns -ENOENT" << dendl; - return -ENOENT; - } - - if (obj.length < size) - { - oid_t id; - mkoid (id, oid); - Dbt okey (&id, sizeof (oid_t)); - char b[] = { '\0' }; - Dbt newVal (b, 1); - newVal.set_doff ((size_t) size); - newVal.set_dlen (1); - newVal.set_ulen (1); - newVal.set_flags (DB_DBT_PARTIAL); - if (db->put (txn, &okey, &newVal, 0) != 0) - { - if (txn) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".updating object failed" << dendl; - return -EIO; - } - - obj.length = size; - value.set_ulen (sizeof (obj)); - if (db->put (txn, &key, &value, 0) != 0) - { - if (txn) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".updating object info failed" << dendl; - return -EIO; - } - } - else if (obj.length > size) - { - obj.length = size; - Dbt tval (&obj, sizeof (obj)); - tval.set_ulen (sizeof (obj)); - tval.set_flags (DB_DBT_USERMEM); - if (db->put (txn, &key, &tval, 0) != 0) - { - if (txn) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".updating object info failed" << dendl; - return -EIO; - } - if (size == 0) - { - char x[1]; - oid_t id; - mkoid (id, oid); - Dbt okey (&id, sizeof (oid_t)); - Dbt oval (&x, 0); - if (db->put (txn, &okey, &oval, 0) != 0) - { - if (txn) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".updating object failed" << dendl; - return -EIO; - } - } - else - { - oid_t id; - mkoid (id, oid); - Dbt okey (&id, sizeof (oid_t)); - Dbt oval; - oval.set_flags (DB_DBT_MALLOC); - if (db->get (txn, &okey, &oval, 0) != 0) - { - if (txn) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".getting old object failed" << dendl; - return -EIO; - } - auto_ptr ovalPtr ((char *) oval.get_data()); - oval.set_size ((size_t) size); - oval.set_ulen ((size_t) size); - if (db->put (txn, &okey, &oval, 0) != 0) - { - if (txn) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".putting new object failed" << dendl; - return -EIO; - } - } - } - - if (txn) - txn->commit (0); - if (onsafe != NULL) - COMMIT(onsafe); - - dout(4) << "..truncate OK" << dendl; - return 0; -} - -int OSBDB::read(object_t oid, off_t offset, size_t len, bufferlist& bl) -{ - if (!mounted) - { - derr(1) << "not mounted!" << dendl; - return -EINVAL; - } - - dout(2) << "read " << oid << " " << offset << " " - << len << dendl; - - if (bl.length() < len) - { - int remain = len - bl.length(); - bufferptr ptr (remain); - bl.push_back(ptr); - } - - DbTxn *txn = NULL; - if (transactional) - env->txn_begin (NULL, &txn, 0); - - object_inode_key _ikey = new_object_inode_key (oid); - stored_object obj; - Dbt ikey (&_ikey, sizeof_object_inode_key()); - Dbt ival (&obj, sizeof (obj)); - ival.set_flags (DB_DBT_USERMEM); - ival.set_ulen (sizeof(obj)); - - dout(3) << "..get " << _ikey << dendl; - int ret; - if ((ret = db->get (txn, &ikey, &ival, 0)) != 0) - { - if (txn) - txn->abort(); - derr(1) << "get returned " << db_strerror (ret) << dendl; - return -ENOENT; - } - - dout(3) << "..object has size " << obj.length << dendl; - - if (offset == 0 && len >= obj.length) - { - len = obj.length; - dout(3) << "..doing full read of " << len << dendl; - oid_t id; - mkoid (id, oid); - Dbt key (&id, sizeof (oid_t)); - Dbt value (bl.c_str(), len); - value.set_ulen (len); - value.set_flags (DB_DBT_USERMEM); - dout(3) << "..getting " << oid << dendl; - if ((ret = db->get (txn, &key, &value, 0)) != 0) - { - derr(1) << ".get returned " << db_strerror (ret) << dendl; - if (txn) - txn->abort(); - return -EIO; - } - } - else - { - if (offset > obj.length) - { - dout(2) << "..offset out of range" << dendl; - return 0; - } - if (offset + len > obj.length) - len = obj.length - (size_t) offset; - dout(3) << "..doing partial read of " << len << dendl; - oid_t id; - mkoid (id, oid); - Dbt key (&id, sizeof (oid)); - Dbt value; - char *data = bl.c_str(); - dout(3) << ".bufferlist c_str returned " << ((void*) data) << dendl; - value.set_data (data); - value.set_doff ((size_t) offset); - value.set_dlen (len); - value.set_ulen (len); - value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL); - dout(3) << "..getting " << oid << dendl; - if ((ret = db->get (txn, &key, &value, 0)) != 0) - { - derr(1) << ".get returned " << db_strerror (ret) << dendl; - if (txn) - txn->abort(); - return -EIO; - } - } - - if (txn) - txn->commit (0); - dout(4) << "..read OK, returning " << len << dendl; - return len; -} - -int OSBDB::write(object_t oid, off_t offset, size_t len, - bufferlist& bl, Context *onsafe) -{ - dout(6) << "Context " << hex << onsafe << dec << dendl; - if (!mounted) - { - derr(1) << "not mounted!" << dendl; - if (onsafe != NULL) - CLEANUP(onsafe); - return -EINVAL; - } - - scoped_lock __lock(&lock); - dout(2) << "write " << oid << " " << offset << " " - << len << dendl; - - if (offset > 0xFFFFFFFFL || offset + len > 0xFFFFFFFFL) - { - derr(1) << "object too big" << dendl; - if (onsafe != NULL) - CLEANUP(onsafe); - return -ENOSPC; - } - - DbTxn *txn = NULL; - if (transactional) - env->txn_begin (txn, &txn, 0); - - object_inode_key _ikey = new_object_inode_key (oid); - stored_object obj; - Dbt ikey (&_ikey, sizeof_object_inode_key()); - Dbt ival (&obj, sizeof (obj)); - ival.set_ulen (sizeof (obj)); - ival.set_flags (DB_DBT_USERMEM); - - int ret; - dout(3) << "..getting " << _ikey << dendl; - if (db->get (txn, &ikey, &ival, 0) != 0) - { - dout(3) << "..writing new object" << dendl; - - // New object. - obj.length = (size_t) offset + len; - dout(3) << "..mapping " << _ikey << " => " - << obj << dendl; - if ((ret = db->put (txn, &ikey, &ival, 0)) != 0) - { - derr(1) << "..put returned " << db_strerror (ret) << dendl; - if (txn) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - return -EIO; - } - - oid_t id; - mkoid (id, oid); - Dbt key (&id, sizeof (oid_t)); - Dbt value (bl.c_str(), len); - if (offset == 0) // whole object - { - value.set_flags (DB_DBT_USERMEM); - value.set_ulen (len); - } - else - { - value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL); - value.set_ulen (len); - value.set_doff ((size_t) offset); - value.set_dlen (len); - } - dout(3) << "..mapping " << oid << " => (" - << obj.length << " bytes)" << dendl; - if ((ret = db->put (txn, &key, &value, 0)) != 0) - { - derr(1) << "..put returned " << db_strerror (ret) << dendl; - if (txn) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - return -EIO; - } - - if (txn != NULL) - txn->commit (0); - if (onsafe != NULL) - COMMIT(onsafe); - - dout(4) << "..write OK, returning " << len << dendl; - return len; - } - - if (offset == 0 && len >= obj.length) - { - if (len != obj.length) - { - obj.length = len; - if ((ret = db->put (txn, &ikey, &ival, 0)) != 0) - { - derr(1) << " put returned " << db_strerror (ret) << dendl; - if (txn) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - return -EIO; - } - } - oid_t id; - mkoid(id, oid); - Dbt key (&id, sizeof (oid_t)); - Dbt value (bl.c_str(), len); - if (db->put (txn, &key, &value, 0) != 0) - { - if (txn) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << "..writing object failed!" << dendl; - return -EIO; - } - } - else - { - if (offset + len > obj.length) - { - obj.length = (size_t) offset + len; - if (db->put (txn, &ikey, &ival, 0) != 0) - { - if (txn) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << "..writing object info failed!" << dendl; - return -EIO; - } - } - oid_t id; - mkoid(id, oid); - Dbt key (&id, sizeof (oid_t)); - Dbt value (bl.c_str(), len); - value.set_doff ((size_t) offset); - value.set_dlen (len); - value.set_ulen (len); - value.set_flags (DB_DBT_PARTIAL); - if (db->put (txn, &key, &value, 0) != 0) - { - if (txn) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << "..writing object failed!" << dendl; - return -EIO; - } - } - - if (txn != NULL) - txn->commit (0); - if (onsafe != NULL) - COMMIT(onsafe); - - dout(4) << "..write OK, returning " << len << dendl; - return len; -} - -int OSBDB::clone(object_t oid, object_t noid) -{ - if (!mounted) - { - derr(1) << "not mounted!" << dendl; - return -EINVAL; - } - - dout(2) << "clone " << oid << ", " << noid << dendl; - - if (exists (noid)) - { - dout(4) << "..target exists; returning -EEXIST" << dendl; - return -EEXIST; - } - - DbTxn *txn = NULL; - if (transactional) - env->txn_begin (NULL, &txn, 0); - - object_inode_key _ikey = new_object_inode_key (oid); - object_inode_key _nikey = new_object_inode_key (noid); - stored_object obj; - Dbt ikey (&_ikey, sizeof_object_inode_key()); - Dbt ival (&obj, sizeof (obj)); - Dbt nikey (&_nikey, sizeof_object_inode_key()); - ival.set_ulen (sizeof (obj)); - ival.set_flags (DB_DBT_USERMEM); - - oid_t id, nid; - mkoid(id, oid); - mkoid(nid, noid); - Dbt key (&id, sizeof (oid_t)); - Dbt nkey (&oid, sizeof (oid_t)); - Dbt value; - value.set_flags (DB_DBT_MALLOC); - - if (db->get (txn, &ikey, &ival, 0) != 0) - { - if (txn) - txn->abort(); - derr(1) << "..getting object info failed!" << dendl; - return -ENOENT; - } - if (db->get (txn, &key, &value, 0) != 0) - { - if (txn) - txn->abort(); - derr(1) << "..getting original object failed" << dendl; - return -ENOENT; - } - auto_ptr valueptr ((char *) value.get_data()); - - if (db->put (txn, &nikey, &ival, 0) != 0) - { - if (txn) - txn->abort(); - derr(1) << "..putting object info failed" << dendl; - return -EIO; - } - if (db->put (txn, &nkey, &value, 0) != 0) - { - if (txn) - txn->abort(); - derr(1) << "..putting new object failed" << dendl; - return -EIO; - } - - if (txn) - txn->commit (0); - - dout(4) << "..clone OK" << dendl; - return 0; -} - - // Collections - -int OSBDB::list_collections(list& ls) -{ - if (!mounted) - { - derr(1) << "not mounted!" << dendl; - return -EINVAL; - } - - dout(2) << "list_collections" << dendl; - - Dbt key (COLLECTIONS_KEY, 1); - Dbt value; - value.set_flags (DB_DBT_MALLOC); - - if (db->get (NULL, &key, &value, 0) != 0) - { - dout(4) << "..no collections" << dendl; - return 0; // no collections. - } - - auto_ptr sc ((stored_colls *) value.get_data()); - stored_colls *scp = sc.get(); - for (uint32_t i = 0; i < sc->count; i++) - ls.push_back (scp->colls[i]); - - dout(4) << "..list_collections returns " << scp->count << dendl; - return scp->count; -} - -int OSBDB::create_collection(coll_t c, Context *onsafe) -{ - dout(6) << "Context " << hex << onsafe << dec << dendl; - if (!mounted) - { - derr(1) << "not mounted" << dendl; - if (onsafe != NULL) - CLEANUP(onsafe); - return -EINVAL; - } - - scoped_lock __lock(&lock); - dout(2) << "create_collection " << hex << c << dec << dendl; - - Dbt key (COLLECTIONS_KEY, 1); - Dbt value; - value.set_flags (DB_DBT_MALLOC); - - DbTxn *txn = NULL; - if (transactional) - env->txn_begin (NULL, &txn, 0); - - stored_colls *scp = NULL; - size_t sz = 0; - bool created = false; - if (db->get (txn, &key, &value, 0) != 0) - { - sz = sizeof (stored_colls) + sizeof (coll_t); - scp = (stored_colls *) malloc (sz); - scp->count = 0; - created = true; - } - else - { - scp = (stored_colls *) value.get_data(); - sz = value.get_size(); - } - - auto_ptr sc (scp); - int ins = 0; - if (scp->count > 0) - ins = binary_search (scp->colls, scp->count, c); - if (ins < scp->count && scp->colls[ins] == c) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".collection " << c << " already exists " << dendl; - return -EEXIST; - } - - dout(3) << "..insertion point: " << ins << dendl; - - // Make room for a new collection ID. - if (!created) - { - sz += sizeof (coll_t); - dout(3) << "..increase size to " << sz << dendl; - stored_colls *scp2 = (stored_colls *) realloc (scp, sz); - sc.release (); - sc.reset (scp2); - scp = scp2; - } - - int n = (scp->count - ins) * sizeof (coll_t); - if (n > 0) - { - dout(3) << "..moving " << n << " bytes up" << dendl; - memmove (&scp->colls[ins + 1], &scp->colls[ins], n); - } - scp->count++; - scp->colls[ins] = c; - - dout(3) << "..collections: " << scp << dendl; - - // Put the modified collection list back. - { - Dbt value2 (scp, sz); - if (db->put (txn, &key, &value2, 0) != 0) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".writing new collections list failed" << dendl; - return -EIO; - } - } - - // Create the new collection. - { - stored_coll new_coll; - new_coll.count = 0; - Dbt coll_key (&c, sizeof (coll_t)); - Dbt coll_value (&new_coll, sizeof (stored_coll)); - if (db->put (txn, &coll_key, &coll_value, 0) != 0) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".writing new collection failed" << dendl; - return -EIO; - } - } - - if (txn != NULL) - txn->commit (0); - if (onsafe != NULL) - COMMIT(onsafe); - - dout(4) << "..create_collection OK" << dendl; - return 0; -} - -int OSBDB::destroy_collection(coll_t c, Context *onsafe) -{ - dout(6) << "Context " << hex << onsafe << dec << dendl; - if (!mounted) - { - derr(1) << "not mounted" << dendl; - if (onsafe != NULL) - CLEANUP(onsafe); - return -EINVAL; - } - - scoped_lock __lock(&lock); - dout(2) << "destroy_collection " << hex << c << dec << dendl; - - Dbt key (COLLECTIONS_KEY, 1); - Dbt value; - value.set_flags (DB_DBT_MALLOC); - DbTxn *txn = NULL; - - if (transactional) - env->txn_begin (NULL, &txn, 0); - - if (db->get (txn, &key, &value, 0) != 0) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".collection list doesn't exist" << dendl; - return -ENOENT; // XXX - } - - stored_colls *scp = (stored_colls *) value.get_data(); - auto_ptr valueBuf (scp); - if (scp->count == 0) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".collection " << c << " not listed" << dendl; - return -ENOENT; - } - uint32_t ins = binary_search (scp->colls, scp->count, c); - dout(4) << "..insertion point is " << ins << dendl; - if (ins >= scp->count || scp->colls[ins] != c) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".collection " << c << " not listed" << dendl; - return -ENOENT; - } - - dout(4) << "..collections list is " << scp << dendl; - - // Move the rest of the list down in memory, if needed. - if (ins < scp->count) - { - size_t n = scp->count - ins - 1; - dout(4) << "..shift list down " << n << dendl; - memmove (&scp->colls[ins], &scp->colls[ins + 1], n); - } - - dout(4) << "..collections list is " << scp << dendl; - - // Modify the record size to be one less. - Dbt nvalue (scp, value.get_size() - sizeof (coll_t)); - nvalue.set_flags (DB_DBT_USERMEM); - if (db->put (txn, &key, &nvalue, 0) != 0) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".putting modified collection list failed" << dendl; - return -EIO; - } - - // Delete the collection. - Dbt collKey (&c, sizeof (coll_t)); - if (db->del (txn, &collKey, 0) != 0) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".deleting collection failed" << dendl; - return -EIO; - } - - if (txn != NULL) - txn->commit (0); - if (onsafe != NULL) - COMMIT(onsafe); - dout(4) << "..destroy_collection OK" << dendl; - return 0; -} - -bool OSBDB::collection_exists(coll_t c) -{ - if (!mounted) - { - derr(1) << "not mounted" << dendl; - return -EINVAL; - } - - dout(2) << "collection_exists " << hex << c << dec << dendl; - - /*Dbt key (COLLECTIONS_KEY, 1); - Dbt value; - value.set_flags (DB_DBT_MALLOC); - - if (db->get (NULL, &key, &value, 0) != 0) - { - dout(4) << "..no collection list; return false" << dendl; - return false; - } - - stored_colls *scp = (stored_colls *) value.get_data(); - auto_ptr sc (scp); - dout(5) << "..collection list is " << scp << dendl; - if (scp->count == 0) - { - dout(4) << "..empty collection list; return false" << dendl; - return false; - } - uint32_t ins = binary_search (scp->colls, scp->count, c); - dout(4) << "..insertion point is " << ins << dendl; - - int ret = (scp->colls[ins] == c); - dout(4) << "..returns " << ret << dendl; - return ret;*/ - - Dbt key (&c, sizeof (coll_t)); - Dbt value; - value.set_flags (DB_DBT_MALLOC); - if (db->get (NULL, &key, &value, 0) != 0) - { - dout(4) << "..no collection, return false" << dendl; - return false; - } - void *val = value.get_data(); - free (val); - dout(4) << "..collection exists; return true" << dendl; - return true; -} - -int OSBDB::collection_stat(coll_t c, struct stat *st) -{ - if (!mounted) - { - derr(1) << "not mounted" << dendl; - return -EINVAL; - } - - dout(2) << "collection_stat " << c << dendl; - // XXX is this needed? - return -ENOSYS; -} - -int OSBDB::collection_add(coll_t c, object_t o, Context *onsafe) -{ - dout(6) << "Context " << hex << onsafe << dec << dendl; - if (!mounted) - { - dout(2) << "not mounted" << dendl; - if (onsafe != NULL) - CLEANUP(onsafe); - return -EINVAL; - } - - scoped_lock __lock(&lock); - dout(2) << "collection_add " << hex << c << dec << " " << o << dendl; - - Dbt key (&c, sizeof (coll_t)); - Dbt value; - value.set_flags (DB_DBT_MALLOC); - DbTxn *txn = NULL; - - if (transactional) - env->txn_begin (NULL, &txn, 0); - - if (db->get (txn, &key, &value, 0) != 0) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << "failed to find collection" << dendl; - return -ENOENT; - } - - size_t sz = value.get_size(); - stored_coll *scp = (stored_coll *) value.get_data(); - auto_ptr sc (scp); - - // Find the insertion point for the new object ID. - uint32_t ins = 0; - if (scp->count > 0) - { - ins = binary_search (scp->objects, scp->count, o); - // Already there? - if (ins < scp->count && scp->objects[ins] == o) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << "collection already has object" << dendl; - return -EEXIST; - } - } - - // Make room for the new value, and add it. - sz += sizeof (object_t); - scp = (stored_coll *) realloc (scp, sz); - sc.release(); - sc.reset (scp); - dout(3) << "..current collection: " << scp << dendl; - if (ins < scp->count - 1) - { - size_t n = (scp->count - ins) * sizeof (object_t); - dout(3) << "..move up " << n << " bytes" << dendl; - memmove (&scp->objects[ins + 1], &scp->objects[ins], n); - } - scp->count++; - scp->objects[ins] = o; - - dout(3) << "..collection: " << scp << dendl; - - Dbt nvalue (scp, sz); - if (db->put (txn, &key, &nvalue, 0) != 0) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << "..putting modified collection failed" << dendl; - return -EIO; - } - - if (txn != NULL) - txn->commit (0); - if (onsafe != NULL) - COMMIT(onsafe); - dout(4) << "..collection add OK" << dendl; - return 0; -} - -int OSBDB::collection_remove(coll_t c, object_t o, Context *onsafe) -{ - dout(6) << "Context " << hex << onsafe << dec << dendl; - if (!mounted) - { - derr(1) << "not mounted" << dendl; - if (onsafe != NULL) - CLEANUP(onsafe); - return -EINVAL; - } - - scoped_lock __lock(&lock); - dout(2) << "collection_remove " << hex << c << dec << " " << o << dendl; - - Dbt key (&c, sizeof (coll_t)); - Dbt value; - value.set_flags (DB_DBT_MALLOC); - DbTxn *txn = NULL; - - if (transactional) - env->txn_begin (NULL, &txn, 0); - - if (db->get (txn, &key, &value, 0) != 0) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - dout(1) << "..collection doesn't exist" << dendl; - return -ENOENT; - } - - stored_coll *scp = (stored_coll *) value.get_data(); - auto_ptr sc (scp); - - dout(5) << "..collection is " << scp << dendl; - if (scp->count == 0) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - dout(1) << "..collection is empty" << dendl; - return -ENOENT; - } - uint32_t ins = binary_search (scp->objects, scp->count, o); - dout(4) << "..insertion point is " << ins << dendl; - if (ins >= scp->count || scp->objects[ins] != o) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - dout(1) << "..object not in collection" << dendl; - return -ENOENT; - } - - if (ins < scp->count - 1) - { - size_t n = (scp->count - ins - 1) * sizeof (object_t); - dout(5) << "..moving " << n << " bytes down" << dendl; - memmove (&scp->objects[ins], &scp->objects[ins + 1], n); - } - scp->count--; - - dout(3) << "..collection " << scp << dendl; - - Dbt nval (scp, value.get_size() - sizeof (object_t)); - if (db->put (txn, &key, &nval, 0) != 0) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << "..putting modified collection failed" << dendl; - return -EIO; - } - - if (txn != NULL) - txn->commit (0); - if (onsafe != NULL) - COMMIT(onsafe); - dout(4) << "..collection remove OK" << dendl; - return 0; -} - -int OSBDB::collection_list(coll_t c, list& o) -{ - if (!mounted) - { - derr(1) << "not mounted" << dendl; - return -EINVAL; - } - - Dbt key (&c, sizeof (coll_t)); - Dbt value; - DbTxn *txn = NULL; - - if (transactional) - env->txn_begin (NULL, &txn, 0); - - if (db->get (txn, &key, &value, 0) != 0) - { - if (txn != NULL) - txn->abort(); - return -ENOENT; - } - - stored_coll *scp = (stored_coll *) value.get_data(); - auto_ptr sc (scp); - for (uint32_t i = 0; i < scp->count; i++) - o.push_back (scp->objects[i]); - - if (txn != NULL) - txn->commit (0); - return 0; -} - - // Attributes - -int OSBDB::_setattr(object_t oid, const char *name, - const void *value, size_t size, Context *onsafe, - DbTxn *txn) -{ - dout(6) << "Context " << hex << onsafe << dec << dendl; - if (!mounted) - { - if (onsafe != NULL) - CLEANUP(onsafe); - return -EINVAL; - } - - if (strlen (name) >= OSBDB_MAX_ATTR_LEN) - { - derr(1) << "name too long: " << name << dendl; - if (onsafe != NULL) - CLEANUP(onsafe); - return -ENAMETOOLONG; - } - - scoped_lock __lock(&lock); - - // Add name to attribute list, if needed. - attrs_id aids = new_attrs_id (oid); - Dbt attrs_key (&aids, sizeof_attrs_id()); - Dbt attrs_val; - attrs_val.set_flags (DB_DBT_MALLOC); - stored_attrs *sap = NULL; - size_t sz = 0; - - dout(3) << " getting " << aids << dendl; - if (db->get (txn, &attrs_key, &attrs_val, 0) != 0) - { - dout(2) << " first attribute" << dendl; - sz = sizeof (stored_attrs); - sap = (stored_attrs *) malloc(sz); - sap->count = 0; - } - else - { - sz = attrs_val.get_size(); - sap = (stored_attrs *) attrs_val.get_data(); - dout(2) << "..add to list of " << sap->count << " attrs" << dendl; - } - auto_ptr sa (sap); - - attr_name _name; - strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN); - - int ins = 0; - if (sap->count > 0) - ins = binary_search (sap->names, sap->count, _name); - dout(3) << "..insertion point is " << ins << dendl; - if (sap->count == 0 || - (ins >= sap->count || strcmp (sap->names[ins].name, name) != 0)) - { - sz += sizeof (attr_name); - dout(3) << "..realloc " << ((void *) sap) << " to " - << dec << sz << dendl; - sap = (stored_attrs *) realloc (sap, sz); - dout(3) << "..returns " << ((void *) sap) << dendl; - sa.release (); - sa.reset (sap); - int n = (sap->count - ins) * sizeof (attr_name); - if (n > 0) - { - dout(3) << "..move " << n << " bytes from 0x" - << hex << (&sap->names[ins]) << " to 0x" - << hex << (&sap->names[ins+1]) << dec << dendl; - memmove (&sap->names[ins+1], &sap->names[ins], n); - } - memset (&sap->names[ins], 0, sizeof (attr_name)); - strncpy (sap->names[ins].name, name, OSBDB_MAX_ATTR_LEN); - sap->count++; - - Dbt newAttrs_val (sap, sz); - newAttrs_val.set_ulen (sz); - newAttrs_val.set_flags (DB_DBT_USERMEM); - dout(3) << "..putting " << aids << dendl; - if (db->put (txn, &attrs_key, &newAttrs_val, 0) != 0) - { - derr(1) << ".writing attributes list failed" << dendl; - if (onsafe != NULL) - CLEANUP(onsafe); - return -EIO; - } - } - else - { - dout(3) << "..attribute " << name << " already exists" << dendl; - } - - dout(5) << "..attributes list: " << sap << dendl; - - // Add the attribute. - attr_id aid = new_attr_id (oid, name); - Dbt attr_key (&aid, sizeof (aid)); - Dbt attr_val ((void *) value, size); - dout(3) << "..writing attribute key " << aid << dendl; - if (db->put (txn, &attr_key, &attr_val, 0) != 0) - { - derr(1) << ".writing attribute key failed" << dendl; - if (onsafe != NULL) - CLEANUP(onsafe); - return -EIO; - } - - dout(4) << "..setattr OK" << dendl; - if (onsafe != NULL) - COMMIT(onsafe); - return 0; -} - -int OSBDB::setattr(object_t oid, const char *name, - const void *value, size_t size, - Context *onsafe) -{ - dout(6) << "Context " << hex << onsafe << dec << dendl; - if (!mounted) - { - if (onsafe != NULL) - CLEANUP(onsafe); - return -EINVAL; - } - - DbTxn *txn = NULL; - if (transactional) - env->txn_begin (NULL, &txn, 0); - - dout(2) << "setattr " << oid << ":" << name << " => (" - << size << " bytes)" << dendl; - int ret = _setattr (oid, name, value, size, onsafe, txn); - if (ret == 0) - { - if (txn != NULL) - txn->commit (0); - } - else - { - if (txn != NULL) - txn->abort(); - } - return ret; -} - -int OSBDB::setattrs(object_t oid, map& aset, - Context *onsafe) -{ - dout(6) << "Context " << hex << onsafe << dec << dendl; - if (!mounted) - { - if (onsafe != NULL) - CLEANUP(onsafe); - return -EINVAL; - } - - DbTxn *txn = NULL; - - if (transactional) - env->txn_begin (NULL, &txn, 0); - - map::iterator it; - for (it = aset.begin(); it != aset.end(); it++) - { - string name = it->first; - bufferptr value = it->second; - int ret = _setattr (oid, name.c_str(), value.c_str(), - value.length(), onsafe, txn); - if (ret != 0) - { - if (txn != NULL) - txn->abort(); - return ret; - } - } - - if (txn != NULL) - txn->commit (0); - return 0; -} - -int OSBDB::_getattr (object_t oid, const char *name, void *value, size_t size) -{ - if (!mounted) - return -EINVAL; - - dout(2) << "_getattr " << oid << " " << name << " " << size << dendl; - - attr_id aid = new_attr_id (oid, name); - Dbt key (&aid, sizeof (aid)); - Dbt val (value, size); - val.set_ulen (size); - val.set_doff (0); - val.set_dlen (size); - val.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL); - - int ret; - if ((ret = db->get (NULL, &key, &val, 0)) != 0) - { - derr(1) << ".getting value failed: " << db_strerror (ret) << dendl; - return -ENOENT; - } - - dout(4) << ".._getattr OK; returns " << val.get_size() << dendl; - return val.get_size(); -} - -int OSBDB::getattr(object_t oid, const char *name, void *value, size_t size) -{ - if (!mounted) - return -EINVAL; - - return _getattr (oid, name, value, size); -} - -int OSBDB::getattrs(object_t oid, map& aset) -{ - if (!mounted) - return -EINVAL; - - for (map::iterator it = aset.begin(); - it != aset.end(); it++) - { - int ret = _getattr (oid, (*it).first.c_str(), - (*it).second.c_str(), - (*it).second.length()); - if (ret < 0) - return ret; - } - return 0; -} - -int OSBDB::rmattr(object_t oid, const char *name, Context *onsafe) -{ - dout(6) << "Context " << hex << onsafe << dec << dendl; - if (!mounted) - { - if (onsafe != NULL) - CLEANUP(onsafe); - return -EINVAL; - } - - scoped_lock __lock(&lock); - dout(2) << "rmattr " << oid << " " << name << dendl; - - attrs_id aids = new_attrs_id (oid); - Dbt askey (&aids, sizeof_attrs_id()); - Dbt asvalue; - asvalue.set_flags (DB_DBT_MALLOC); - - DbTxn *txn = NULL; - - if (transactional) - env->txn_begin (NULL, &txn, 0); - - if (db->get (txn, &askey, &asvalue, 0) != 0) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - return -ENOENT; - } - - stored_attrs *sap = (stored_attrs *) asvalue.get_data(); - auto_ptr sa (sap); - - dout(5) << "..attributes list " << sap << dendl; - - if (sap->count == 0) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".empty attribute list" << dendl; - return -ENOENT; - } - - attr_name _name; - memset(&_name, 0, sizeof (_name)); - strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN); - int ins = binary_search (sap->names, sap->count, _name); - dout(4) << "..insertion point is " << ins << dendl; - if (ins >= sap->count || strcmp (sap->names[ins].name, name) != 0) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".attribute not found in list" << dendl; - return -ENOENT; - } - - // Shift the later elements down by one, if needed. - int n = (sap->count - ins) * sizeof (attr_name); - if (n > 0) - { - dout(4) << "..shift down by " << n << dendl; - memmove (&(sap->names[ins]), &(sap->names[ins + 1]), n); - } - sap->count--; - dout(5) << "..attributes list now " << sap << dendl; - - asvalue.set_size(asvalue.get_size() - sizeof (attr_name)); - int ret; - if ((ret = db->put (txn, &askey, &asvalue, 0)) != 0) - { - derr(1) << "put stored_attrs " << db_strerror (ret) << dendl; - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - return -EIO; - } - - // Remove the attribute. - attr_id aid = new_attr_id (oid, name); - Dbt key (&aid, sizeof (aid)); - if ((ret = db->del (txn, &key, 0)) != 0) - { - derr(1) << "deleting " << aid << ": " << db_strerror(ret) << dendl; - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - return -EIO; - } - - if (txn != NULL) - txn->commit (0); - if (onsafe != NULL) - COMMIT(onsafe); - dout(4) << "..rmattr OK" << dendl; - return 0; -} - -int OSBDB::listattr(object_t oid, char *attrs, size_t size) -{ - if (!mounted) - return -EINVAL; - - dout(2) << "listattr " << oid << dendl; - - attrs_id aids = new_attrs_id (oid); - Dbt key (&aids, sizeof_attrs_id()); - Dbt value; - value.set_flags (DB_DBT_MALLOC); - - // XXX Transactions for read atomicity??? - - int ret; - if ((ret = db->get (NULL, &key, &value, 0)) != 0) - { - derr(1) << "fetching " << aids << ": " << db_strerror (ret) - << dendl; - return -ENOENT; - } - - stored_attrs *attrsp = (stored_attrs *) value.get_data(); - auto_ptr _attrs (attrsp); - size_t s = 0; - char *p = attrs; - for (unsigned i = 0; i < attrsp->count && s < size; i++) - { - int n = MIN (OSBDB_MAX_ATTR_LEN, - MIN (strlen (attrsp->names[i].name), size - s - 1)); - strncpy (p, attrsp->names[i].name, n); - p[n] = '\0'; - p = p + n + 1; - } - - dout(4) << "listattr OK" << dendl; - return 0; -} - - // Collection attributes. - -int OSBDB::collection_setattr(coll_t cid, const char *name, - const void *value, size_t size, - Context *onsafe) -{ - dout(6) << "Context " << hex << onsafe << dec << dendl; - if (!mounted) - { - if (onsafe != NULL) - CLEANUP(onsafe); - return -EINVAL; - } - - scoped_lock __lock(&lock); - dout(2) << "collection_setattr " << hex << cid << dec << " " << name - << " (" << size << " bytes)" << dendl; - if (strlen (name) >= OSBDB_MAX_ATTR_LEN) - { - derr(1) << "name too long" << dendl; - if (onsafe != NULL) - CLEANUP(onsafe); - return -ENAMETOOLONG; - } - - // Add name to attribute list, if needed. - coll_attrs_id aids = new_coll_attrs_id (cid); - Dbt attrs_key (&aids, sizeof_coll_attrs_id()); - Dbt attrs_val; - attrs_val.set_flags (DB_DBT_MALLOC); - stored_attrs *sap = NULL; - size_t sz = 0; - - DbTxn *txn = NULL; - if (transactional) - env->txn_begin (NULL, &txn, 0); - - dout(3) << " getting " << aids << dendl; - if (db->get (txn, &attrs_key, &attrs_val, 0) != 0) - { - dout(2) << " first attribute" << dendl; - sz = sizeof (stored_attrs); - sap = (stored_attrs *) malloc(sz); - sap->count = 0; - } - else - { - sz = attrs_val.get_size(); - sap = (stored_attrs *) attrs_val.get_data(); - dout(2) << " add to list of " << sap->count << " attrs" << dendl; - } - auto_ptr sa (sap); - - attr_name _name; - strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN); - - int ins = 0; - if (sap->count > 0) - ins = binary_search (sap->names, sap->count, _name); - dout(3) << " insertion point is " << ins << dendl; - if (ins >= sap->count || strcmp (sap->names[ins].name, name) != 0) - { - sz += sizeof (attr_name); - dout(3) << " realloc " << hex << ((void *) sap) << " to " - << dec << sz << dendl; - sap = (stored_attrs *) realloc (sap, sz); - dout(3) << " returns " << hex << ((void *) sap) << dec << dendl; - sa.release (); - sa.reset (sap); - int n = (sap->count - ins) * sizeof (attr_name); - if (n > 0) - { - dout(3) << " move " << n << " bytes from 0x" - << hex << (&sap->names[ins]) << " to 0x" - << hex << (&sap->names[ins+1]) << dec << dendl; - memmove (&sap->names[ins+1], &sap->names[ins], n); - } - memset (&sap->names[ins], 0, sizeof (attr_name)); - strncpy (sap->names[ins].name, name, OSBDB_MAX_ATTR_LEN); - sap->count++; - - Dbt newAttrs_val (sap, sz); - newAttrs_val.set_ulen (sz); - newAttrs_val.set_flags (DB_DBT_USERMEM); - dout(3) << " putting " << aids << dendl; - if (db->put (txn, &attrs_key, &newAttrs_val, 0) != 0) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".putting new attributes failed" << dendl; - return -EIO; - } - } - else - { - dout(3) << "..attribute " << name << " already exists" << dendl; - } - - dout(3) << "..attributes list: " << sap << dendl; - - // Add the attribute. - coll_attr_id aid = new_coll_attr_id (cid, name); - Dbt attr_key (&aid, sizeof (aid)); - Dbt attr_val ((void *) value, size); - dout(3) << " writing attribute key " << aid << dendl; - if (db->put (txn, &attr_key, &attr_val, 0) != 0) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".putting attribute failed" << dendl; - return -EIO; - } - - if (txn != NULL) - txn->commit (0); - if (onsafe != NULL) - COMMIT(onsafe); - - dout(4) << "..collection setattr OK" << dendl; - return 0; -} - -int OSBDB::collection_rmattr(coll_t cid, const char *name, - Context *onsafe) -{ - dout(6) << "Context " << hex << onsafe << dec << dendl; - if (!mounted) - { - if (onsafe != NULL) - CLEANUP(onsafe); - return -EINVAL; - } - - scoped_lock __lock(&lock); - dout(2) << "collection_rmattr " << hex << cid << dec - << " " << name << dendl; - - coll_attrs_id aids = new_coll_attrs_id (cid); - Dbt askey (&aids, sizeof_coll_attrs_id()); - Dbt asvalue; - asvalue.set_flags (DB_DBT_MALLOC); - - DbTxn *txn = NULL; - if (transactional) - env->txn_begin (NULL, &txn, 0); - - if (db->get (txn, &askey, &asvalue, 0) != 0) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".no attributes list" << dendl; - return -ENOENT; - } - - stored_attrs *sap = (stored_attrs *) asvalue.get_data(); - auto_ptr sa (sap); - - dout(5) << "..attributes list " << sap << dendl; - if (sap->count == 0) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".empty attributes list" << dendl; - return -ENOENT; - } - - attr_name _name; - memset(&_name, 0, sizeof (_name)); - strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN); - int ins = binary_search (sap->names, sap->count, _name); - if (ins >= sap->count || strcmp (sap->names[ins].name, name) != 0) - { - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - derr(1) << ".attribute not listed" << dendl; - return -ENOENT; - } - - // Shift the later elements down by one, if needed. - int n = (sap->count - ins) * sizeof (attr_name); - if (n > 0) - { - dout(4) << "..shift down by " << n << dendl; - memmove (&(sap->names[ins]), &(sap->names[ins + 1]), n); - } - sap->count--; - dout(5) << "..attributes list now " << sap << dendl; - - asvalue.set_size(asvalue.get_size() - sizeof (attr_name)); - int ret; - if ((ret = db->put (txn, &askey, &asvalue, 0)) != 0) - { - derr(1) << "put stored_attrs " << db_strerror (ret) << dendl; - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - return -EIO; - } - - // Remove the attribute. - coll_attr_id aid = new_coll_attr_id (cid, name); - Dbt key (&aid, sizeof (aid)); - if ((ret = db->del (txn, &key, 0)) != 0) - { - derr(1) << "deleting " << aid << ": " << db_strerror(ret) << dendl; - if (txn != NULL) - txn->abort(); - if (onsafe != NULL) - CLEANUP(onsafe); - return -EIO; - } - - if (txn != NULL) - txn->commit (0); - if (onsafe != NULL) - COMMIT(onsafe); - - dout(4) << "..collection rmattr OK" << dendl; - return 0; -} - -int OSBDB::collection_getattr(coll_t cid, const char *name, - void *value, size_t size) -{ - if (!mounted) - return -EINVAL; - - dout(2) << "collection_getattr " << hex << cid << dec - << " " << name << dendl; - - // XXX transactions/read isolation? - - coll_attr_id caid = new_coll_attr_id (cid, name); - Dbt key (&caid, sizeof (caid)); - Dbt val (value, size); - val.set_ulen (size); - val.set_dlen (size); - val.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL); - - if (db->get (NULL, &key, &val, 0) != 0) - { - derr(1) << ".no attribute entry" << dendl; - return -ENOENT; - } - - dout(4) << "..collection getattr OK; returns " << val.get_size() << dendl; - return val.get_size(); -} - -int OSBDB::collection_listattr(coll_t cid, char *attrs, size_t size) -{ - if (!mounted) - return -EINVAL; - - dout(2) << "collection_listattr " << hex << cid << dec << dendl; - - // XXX transactions/read isolation? - - coll_attrs_id caids = new_coll_attrs_id (cid); - Dbt key (&caids, sizeof_coll_attrs_id()); - Dbt value; - value.set_flags (DB_DBT_MALLOC); - - int ret; - if ((ret = db->get (NULL, &key, &value, 0)) != 0) - { - derr(1) << "fetching " << caids << ": " << db_strerror (ret) - << dendl; - return -ENOENT; - } - - stored_attrs *attrsp = (stored_attrs *) value.get_data(); - auto_ptr _attrs (attrsp); - size_t s = 0; - char *p = attrs; - for (unsigned i = 0; i < attrsp->count && s < size; i++) - { - int n = MIN (OSBDB_MAX_ATTR_LEN, - MIN (strlen (attrsp->names[i].name), size - s - 1)); - strncpy (p, attrsp->names[i].name, n); - p[n] = '\0'; - p = p + n + 1; - } - return 0; -} - - // Sync. - -void OSBDB::sync (Context *onsync) -{ - if (!mounted) - return; - - sync(); - - if (onsync != NULL) - { - g_timer.add_event_after(0.1, onsync); - } -} - -void OSBDB::sync() -{ - if (!mounted) - return; - - if (transactional) - { - env->log_flush (NULL); - env->lsn_reset (device.c_str(), 0); - } - db->sync(0); -} diff --git a/src/osbdb/OSBDB.h b/src/osbdb/OSBDB.h deleted file mode 100644 index 8eb2004d3903f..0000000000000 --- a/src/osbdb/OSBDB.h +++ /dev/null @@ -1,482 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* OSBDB.h -- ObjectStore on Berkeley DB. -*- c++ -*- - Copyright (C) 2007 Casey Marshall - -Ceph - scalable distributed file system - -This is free software; you can redistribute it and/or -modify it under the terms of the GNU Lesser General Public -License version 2.1, as published by the Free Software -Foundation. See file COPYING. */ - - -#include -#include "osd/ObjectStore.h" - -#define OSBDB_MAGIC 0x05BDB - -/* - * Maximum length of an attribute name. - */ -#define OSBDB_MAX_ATTR_LEN 256 - -#define OSBDB_THIS_VERSION 1 - -#define OSBDB_SUPERBLOCK_KEY ((void *) "s") - -/* - * The "superblock" of the BDB object store. We store one of these in - * the DB, to store version and other information. We don't record - * anything special here, just the version number the database was - * written with. - * - * In principle, this structure is variable-length, depending on the - * software version writing the superblock. - */ -struct stored_superblock -{ - uint32_t version; -}; - -inline ostream& operator<<(ostream& out, const stored_superblock sb) -{ - out << "osbdb.super(" << sb.version << ")" << endl; - return out; -} - -/** - * An object identifier; we define this so we can have a POD object to - * work with. - */ -struct oid_t // POD -{ - char id[16]; -}; - -inline void mkoid (oid_t& id, object_t& oid) -{ - // XXX byte order? - memcpy (id.id, &oid, sizeof (oid_t)); -} - -inline ostream& operator<<(ostream& out, const oid_t id) -{ - for (int i = 0; i < 16; i++) - { - out.fill('0'); - out << setw(2) << hex << (id.id[i] & 0xFF); - if ((i & 3) == 3) - out << ':'; - } - out.unsetf(ios::right); - out << dec; - return out; -} - -/** - * An "inode" key. We map a 'stored_object' struct to this key for - * every object. - */ -struct object_inode_key // POD -{ - oid_t oid; - char tag; -}; - -/** - * "Constructor" for an object_inode_key. - */ -inline object_inode_key new_object_inode_key (object_t& oid) -{ - object_inode_key key; - memset(&key, 0, sizeof (object_inode_key)); - mkoid (key.oid, oid); - key.tag = 'i'; - return key; -} - -/* - * We use this, instead of sizeof(), to try and guarantee that we - * don't include the structure padding, if any. - * - * This *should* return 17: sizeof (oid_t) == 16; sizeof (char) == 1. - */ -inline size_t sizeof_object_inode_key() -{ - return offsetof(object_inode_key, tag) + sizeof (char); -} - - // Frank Poole: Unfortunately, that sounds a little - // like famous last words. - // -- 2001: A Space Odyssey - -inline ostream& operator<<(ostream& out, const object_inode_key o) -{ - out << o.tag << "/" << o.oid; - return out; -} - -/** - * A stored object. This is essentially the "inode" of the object, - * containing things like the object's length. The object itself is - * stored as-is, mapped by the 128-bit object ID. - */ -struct stored_object -{ - uint32_t length; -}; - -inline ostream& operator<<(ostream& out, const stored_object s) -{ - out << "inode(l:" << s.length << ")"; - return out; -} - -/* - * Key referencing the list of attribute names for an object. This is - * simply the object's ID, with an additional character 'a' appended. - */ -struct attrs_id // POD -{ - oid_t oid; - char tag; -}; - -/* - * "Construtor" for attrs_id. - */ -inline struct attrs_id new_attrs_id (object_t& oid) -{ - attrs_id aid; - memset (&aid, 0, sizeof (attrs_id)); - mkoid(aid.oid, oid); - aid.tag = 'a'; - return aid; -} - -/* - * See explanation for sizeof_object_inode_id. - */ -inline size_t sizeof_attrs_id() -{ - return offsetof(struct attrs_id, tag) + sizeof (char); -} - -inline ostream& operator<<(ostream& out, const attrs_id id) -{ - out << id.tag << "/" << id.oid; - return out; -} - -/* - * Encapsulation of a single attribute name. - */ -struct attr_name // POD -{ - char name[OSBDB_MAX_ATTR_LEN]; -}; - -inline ostream& operator<<(ostream& out, const attr_name n) -{ - out << n.name; - return out; -} - -inline bool operator<(const attr_name n1, const attr_name n2) -{ - return (strncmp (n1.name, n2.name, OSBDB_MAX_ATTR_LEN) < 0); -} - -inline bool operator>(const attr_name n1, const attr_name n2) -{ - return (strncmp (n1.name, n2.name, OSBDB_MAX_ATTR_LEN) > 0); -} - -inline bool operator==(const attr_name n1, const attr_name n2) -{ - std::cerr << n1.name << " == " << n2.name << "?" << endl; - return (strncmp (n1.name, n2.name, OSBDB_MAX_ATTR_LEN) == 0); -} - -inline bool operator!=(const attr_name n1, const attr_name n2) -{ - return !(n1 == n2); -} - -inline bool operator>=(const attr_name n1, const attr_name n2) -{ - return !(n1 < n2); -} - -inline bool operator<=(const attr_name n1, const attr_name n2) -{ - return !(n1 > n2); -} - -/* - * A list of an object or collection's attribute names. - */ -struct stored_attrs -{ - uint32_t count; - attr_name names[0]; // actually variable-length -}; - -inline ostream& operator<<(ostream& out, const stored_attrs *sa) -{ - out << sa->count << " [ "; - for (unsigned i = 0; i < sa->count; i++) - out << sa->names[i] << (i == sa->count - 1 ? " " : ", "); - out << "]"; - return out; -} - -/* - * An object attribute key. An object attribute is mapped simply by - * the object ID appended with the attribute name. Attribute names - * may not be empty, and must be less than 256 characters, in this - * implementation. - */ -struct attr_id // POD -{ - oid_t oid; - attr_name name; -}; - -inline attr_id new_attr_id (object_t& oid, const char *name) -{ - attr_id aid; - memset(&aid, 0, sizeof (attr_id)); - mkoid (aid.oid, oid); - strncpy (aid.name.name, name, OSBDB_MAX_ATTR_LEN); - return aid; -} - -inline ostream& operator<<(ostream &out, const attr_id id) -{ - out << id.oid << ":" << id.name; - return out; -} - -/* - * A key for a collection attributes list. - */ -struct coll_attrs_id // POD -{ - coll_t cid; - char tag; -}; - -inline coll_attrs_id new_coll_attrs_id (coll_t cid) -{ - coll_attrs_id catts; - memset(&catts, 0, sizeof (coll_attrs_id)); - catts.cid = cid; - catts.tag = 'C'; - return catts; -} - -inline size_t sizeof_coll_attrs_id() -{ - return offsetof(coll_attrs_id, tag) + sizeof (char); -} - -inline ostream& operator<<(ostream& out, coll_attrs_id id) -{ - out << id.tag << "/" << id.cid; - return out; -} - -/* - * A collection attribute key. Similar to - */ -struct coll_attr_id // POD -{ - coll_t cid; - attr_name name; -}; - -inline coll_attr_id new_coll_attr_id (coll_t cid, const char *name) -{ - coll_attr_id catt; - memset(&catt, 0, sizeof (coll_attr_id)); - catt.cid = cid; - strncpy (catt.name.name, name, OSBDB_MAX_ATTR_LEN); - return catt; -} - -inline ostream& operator<<(ostream& out, coll_attr_id id) -{ - out << id.cid << ":" << id.name; - return out; -} - -/* - * This is the key we store the master collections list under. - */ -#define COLLECTIONS_KEY ((void *) "c") - -/* - * The master list of collections. There should be one of these per - * OSD. The sole reason for this structure is to have the ability - * to enumerate all collections stored on this OSD. - */ -struct stored_colls -{ - // The number of collections. - uint32_t count; - - // The collection identifiers. This is a sorted list of coll_t - // values. - coll_t colls[0]; // actually variable-length -}; - -inline ostream& operator<<(ostream& out, stored_colls *c) -{ - out << c->count << " [ "; - for (unsigned i = 0; i < c->count; i++) - { - out << hex << c->colls[i]; - if (i < c->count - 1) - out << ", "; - } - out << " ]" << dec; - return out; -} - -/* - * A stored collection (a bag of object IDs). These are referenced by - * the bare collection identifier type, a coll_t (thus, a 32-bit - * integer). Internally this is stored as a sorted list of object IDs. - * - * Note, this structure places all collection items in a single - * record; this may be a memory burden for large collections. - */ -struct stored_coll -{ - // The size of this collection. - uint32_t count; - - // The object IDs in this collection. This is a sorted list of all - // object ID's in this collection. - object_t objects[0]; // actually variable-length -}; - -inline ostream& operator<<(ostream& out, stored_coll *c) -{ - out << c->count << " [ "; - for (unsigned i = 0; i < c->count; i++) - { - out << c->objects[i]; - if (i < c->count - 1) - out << ", "; - } - out << " ]"; - return out; -} - -class OSBDBException : public std::exception -{ - const char *msg; - -public: - OSBDBException(const char *msg) : msg(msg) { } - const char *what() { return msg; } -}; - -/* - * The object store interface for Berkeley DB. - */ -class OSBDB : public ObjectStore -{ - private: - Mutex lock; - DbEnv *env; - Db *db; - string device; - string env_dir; - bool mounted; - bool opened; - bool transactional; - - public: - - OSBDB(const char *dev) throw(OSBDBException) - : lock(true), env(0), db (0), device (dev), mounted(false), opened(false), - transactional(g_conf.bdbstore_transactional) - { - } - - ~OSBDB() - { - if (mounted) - { - umount(); - } - } - - int mount(); - int umount(); - int mkfs(); - - int statfs(struct statfs *buf); - - int pick_object_revision_lt(object_t& oid); - - bool exists(object_t oid); - int stat(object_t oid, struct stat *st); - - int remove(object_t oid, Context *onsafe=0); - - int truncate(object_t oid, off_t size, Context *onsafe=0); - - int read(object_t oid, off_t offset, size_t len, - bufferlist& bl); - int write(object_t oid, off_t offset, size_t len, - bufferlist& bl, Context *onsafe); - - int setattr(object_t oid, const char *name, - const void *value, size_t size, Context *onsafe=0); - int setattrs(object_t oid, map& aset, - Context *onsafe=0); - int getattr(object_t oid, const char *name, - void *value, size_t size); - int getattrs(object_t oid, map& aset); - int rmattr(object_t oid, const char *name, - Context *onsafe=0); - int listattr(object_t oid, char *attrs, size_t size); - - int clone(object_t oid, object_t noid); - - // Collections. - - int list_collections(list& ls); - int create_collection(coll_t c, Context *onsafe=0); - int destroy_collection(coll_t c, Context *onsafe=0); - bool collection_exists(coll_t c); - int collection_stat(coll_t c, struct stat *st); - int collection_add(coll_t c, object_t o, Context *onsafe=0); - int collection_remove(coll_t c, object_t o, Context *onsafe=0); - int collection_list(coll_t c, list& o); - - int collection_setattr(coll_t cid, const char *name, - const void *value, size_t size, - Context *onsafe=0); - int collection_rmattr(coll_t cid, const char *name, - Context *onsafe=0); - int collection_getattr(coll_t cid, const char *name, - void *value, size_t size); - int collection_listattr(coll_t cid, char *attrs, size_t size); - - void sync(Context *onsync); - void sync(); - -private: - int opendb (DBTYPE type=DB_UNKNOWN, int flags=0, bool new_env=false); - - int _setattr(object_t oid, const char *name, const void *value, - size_t size, Context *onsync, DbTxn *txn); - int _getattr(object_t oid, const char *name, void *value, size_t size); - DbEnv *getenv(); -}; diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 5906e8f942122..1079f7d7cf33f 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -23,11 +23,6 @@ #include "common/ceph_argparse.h" #include "os/FileStore.h" -#ifdef USE_OSBDB -#include "osbdb/OSBDB.h" -#endif // USE_OSBDB - - #include "ReplicatedPG.h" //#include "RAID4PG.h" diff --git a/src/test/old/testos.cc b/src/test/old/testos.cc index 24c81590d899c..83641f829d7c0 100644 --- a/src/test/old/testos.cc +++ b/src/test/old/testos.cc @@ -11,7 +11,6 @@ Foundation. See file COPYING. */ #include "osd/ObjectStore.h" #include "ebofs/Ebofs.h" -#include "osbdb/OSBDB.h" #include "include/buffer.h" #include @@ -140,15 +139,6 @@ int main (int argc, char **argv) } os = new Ebofs (osd_file); } - else if (strcasecmp (osd_name, "osbdb") == 0) - { - os = new OSBDB (osd_file); - } - else if (strcasecmp (osd_name, "osbdb-btree") == 0) - { - g_conf.bdbstore_btree = true; - os = new OSBDB (osd_file); - } else { cerr << "I don't know about object store \"" << osd_name << "\"" diff --git a/src/test/old/testosbdb.cc b/src/test/old/testosbdb.cc deleted file mode 100644 index 19268e7587531..0000000000000 --- a/src/test/old/testosbdb.cc +++ /dev/null @@ -1,347 +0,0 @@ -/* testosbdb.cc -- test OSBDB. - Copyright (C) 2007 Casey Marshall */ - - -#include -#include "osbdb/OSBDB.h" - -using namespace std; - -int -main (int argc, char **argv) -{ - vector args; - argv_to_vec (argc, argv, args); - parse_config_options (args); - - g_conf.debug_bdbstore = 10; - //g_conf.bdbstore_btree = true; - char dbfile[256]; - strncpy (dbfile, "/tmp/testosbdb/db.XXXXXX", 256); - mktemp (dbfile); - OSBDB *os = new OSBDB(dbfile); - auto_ptr osPtr (os); - os->mkfs(); - os->mount(); - - // Put an object. - object_t oid (0xDEADBEEF00000000ULL, 0xFEEDFACE); - - cout << "sizeof oid_t is " << sizeof (oid_t) << endl; - cout << "offsetof oid_t.id " << offsetof (oid_t, id) << endl; - - cout << sizeof (object_t) << endl; - cout << sizeof (oid.ino) << endl; - cout << sizeof (oid.bno) << endl; - cout << sizeof (oid.rev) << endl; - - // Shouldn't be there. - if (os->exists (oid)) - { - cout << "FAIL: oid shouldn't be there " << oid << endl; - } - - // Write an object. - char *x = (char *) malloc (1024); - memset(x, 0xaa, 1024); - bufferptr bp (x, 1024); - bufferlist bl; - bl.push_back (bp); - - if (os->write (oid, 0L, 1024, bl, NULL) != 1024) - { - cout << "FAIL: writing object" << endl; - } - - os->sync(); - - // Should be there. - if (!os->exists (oid)) - { - cout << "FAIL: oid should be there: " << oid << endl; - } - - memset(x, 0, 1024); - if (os->read (oid, 0, 1024, bl) != 1024) - { - cout << "FAIL: reading object" << endl; - } - - for (int i = 0; i < 1024; i++) - { - if ((x[i] & 0xFF) != 0xaa) - { - cout << "FAIL: data read out is different" << endl; - break; - } - } - - // Set some attributes - if (os->setattr (oid, "alpha", "value", strlen ("value")) != 0) - { - cout << "FAIL: set attribute" << endl; - } - if (os->setattr (oid, "beta", "value", strlen ("value")) != 0) - { - cout << "FAIL: set attribute" << endl; - } - if (os->setattr (oid, "gamma", "value", strlen ("value")) != 0) - { - cout << "FAIL: set attribute" << endl; - } - if (os->setattr (oid, "fred", "value", strlen ("value")) != 0) - { - cout << "FAIL: set attribute" << endl; - } - - char *attrs = (char *) malloc (1024); - if (os->listattr (oid, attrs, 1024) != 0) - { - cout << "FAIL: listing attributes" << endl; - } - else - { - char *p = attrs; - if (strcmp (p, "alpha") != 0) - { - cout << "FAIL: should be \"alpha:\" \"" << p << "\"" << endl; - } - p = p + strlen (p) + 1; - if (strcmp (p, "beta") != 0) - { - cout << "FAIL: should be \"beta:\" \"" << p << "\"" << endl; - } - p = p + strlen (p) + 1; - if (strcmp (p, "fred") != 0) - { - cout << "FAIL: should be \"fred:\" \"" << p << "\"" << endl; - } - p = p + strlen (p) + 1; - if (strcmp (p, "gamma") != 0) - { - cout << "FAIL: should be \"gamma:\" \"" << p << "\"" << endl; - } - } - - char attrvalue[256]; - memset(attrvalue, 0, sizeof (attrvalue)); - if (os->getattr (oid, "alpha", attrvalue, sizeof(attrvalue)) < 0) - { - cout << "FAIL: getattr alpha" << endl; - } - else if (strncmp ("value", attrvalue, strlen("value")) != 0) - { - cout << "FAIL: read attribute value differs" << endl; - } - memset(attrvalue, 0, sizeof (attrvalue)); - if (os->getattr (oid, "fred", attrvalue, sizeof(attrvalue)) < 0) - { - cout << "FAIL: getattr fred" << endl; - } - else if (strncmp ("value", attrvalue, strlen("value")) != 0) - { - cout << "FAIL: read attribute value differs" << endl; - } - memset(attrvalue, 0, sizeof (attrvalue)); - if (os->getattr (oid, "beta", attrvalue, sizeof(attrvalue)) < 0) - { - cout << "FAIL: getattr beta" << endl; - } - else if (strncmp ("value", attrvalue, strlen("value")) != 0) - { - cout << "FAIL: read attribute value differs" << endl; - } - memset(attrvalue, 0, sizeof (attrvalue)); - if (os->getattr (oid, "gamma", attrvalue, sizeof(attrvalue)) < 0) - { - cout << "FAIL: getattr gamma" << endl; - } - else if (strncmp ("value", attrvalue, strlen("value")) != 0) - { - cout << "FAIL: read attribute value differs" << endl; - } - - if (os->setattr (oid, "alpha", "different", strlen("different")) != 0) - cout << "FAIL: setattr overwrite" << endl; - memset(attrvalue, 0, sizeof (attrvalue)); - if (os->getattr (oid, "alpha", attrvalue, sizeof(attrvalue)) < 0) - { - cout << "FAIL: getattr alpha" << endl; - } - else if (strncmp ("different", attrvalue, strlen("different")) != 0) - { - cout << "FAIL: read attribute value differs" << endl; - } - - if (os->rmattr (oid, "alpha") != 0) - { - cout << "FAIL: rmattr alpha" << endl; - } - if (os->rmattr (oid, "fred") != 0) - { - cout << "FAIL: rmattr fred" << endl; - } - if (os->rmattr (oid, "beta") != 0) - { - cout << "FAIL: rmattr beta" << endl; - } - if (os->rmattr (oid, "gamma") != 0) - { - cout << "FAIL: rmattr gamma" << endl; - } - - coll_t cid = 0xCAFEBABE; - if (os->create_collection (cid) != 0) - { - cout << "FAIL: create_collection" << endl; - } - if (os->create_collection (cid + 10) != 0) - { - cout << "FAIL: create_collection" << endl; - } - if (os->create_collection (cid + 5) != 0) - { - cout << "FAIL: create_collection" << endl; - } - if (os->create_collection (42) != 0) - { - cout << "FAIL: create_collection" << endl; - } - - if (os->collection_add (cid, oid) != 0) - { - cout << "FAIL: collection_add" << endl; - } - - list ls; - if (os->list_collections (ls) < 0) - { - cout << "FAIL: list_collections" << endl; - } - cout << "collections: "; - for (list::iterator it = ls.begin(); it != ls.end(); it++) - { - cout << *it << ", "; - } - cout << endl; - - if (os->destroy_collection (0xCAFEBABE + 10) != 0) - { - cout << "FAIL: destroy_collection" << endl; - } - - if (os->destroy_collection (0xCAFEBADE + 10) == 0) - { - cout << "FAIL: destroy_collection" << endl; - } - - object_t oid2 (12345, 12345); - for (int i = 0; i < 8; i++) - { - oid2.rev++; - if (os->collection_add (cid, oid2) != 0) - { - cout << "FAIL: collection_add" << endl; - } - } - for (int i = 0; i < 8; i++) - { - if (os->collection_remove (cid, oid2) != 0) - { - cout << "FAIL: collection_remove" << endl; - } - oid2.rev--; - } - - if (os->collection_setattr (cid, "alpha", "value", 5) != 0) - cout << "FAIL: collection_setattr" << endl; - if (os->collection_setattr (cid, "beta", "value", 5) != 0) - cout << "FAIL: collection_setattr" << endl; - if (os->collection_setattr (cid, "gamma", "value", 5) != 0) - cout << "FAIL: collection_setattr" << endl; - if (os->collection_setattr (cid, "fred", "value", 5) != 0) - cout << "FAIL: collection_setattr" << endl; - - memset (attrvalue, 0, sizeof (attrvalue)); - if (os->collection_getattr (cid, "alpha", attrvalue, sizeof (attrvalue)) < 0) - cout << "FAIL: collection_getattr" << endl; - else if (strncmp (attrvalue, "value", 5) != 0) - cout << "FAIL: collection attribute value different" << endl; - memset (attrvalue, 0, sizeof (attrvalue)); - if (os->collection_getattr (cid, "beta", attrvalue, sizeof (attrvalue)) < 0) - cout << "FAIL: collection_getattr" << endl; - else if (strncmp (attrvalue, "value", 5) != 0) - cout << "FAIL: collection attribute value different" << endl; - memset (attrvalue, 0, sizeof (attrvalue)); - if (os->collection_getattr (cid, "gamma", attrvalue, sizeof (attrvalue)) < 0) - cout << "FAIL: collection_getattr" << endl; - else if (strncmp (attrvalue, "value", 5) != 0) - cout << "FAIL: collection attribute value different" << endl; - memset (attrvalue, 0, sizeof (attrvalue)); - if (os->collection_getattr (cid, "fred", attrvalue, sizeof (attrvalue)) < 0) - cout << "FAIL: collection_getattr" << endl; - else if (strncmp (attrvalue, "value", 5) != 0) - cout << "FAIL: collection attribute value different" << endl; - - if (os->collection_setattr (cid, "alpha", "eulavvalue", 10) != 0) - cout << "FAIL: collection setattr overwrite" << endl; - memset (attrvalue, 0, sizeof (attrvalue)); - if (os->collection_getattr (cid, "alpha", attrvalue, sizeof (attrvalue)) < 0) - cout << "FAIL: collection_getattr" << endl; - else if (strncmp (attrvalue, "eulavvalue", 10) != 0) - cout << "FAIL: collection attribute value different" << endl; - memset (attrvalue, 0, sizeof (attrvalue)); - if (os->collection_getattr (cid, "beta", attrvalue, sizeof (attrvalue)) < 0) - cout << "FAIL: collection_getattr" << endl; - else if (strncmp (attrvalue, "value", 5) != 0) - cout << "FAIL: collection attribute value different" << endl; - memset (attrvalue, 0, sizeof (attrvalue)); - if (os->collection_getattr (cid, "gamma", attrvalue, sizeof (attrvalue)) < 0) - cout << "FAIL: collection_getattr" << endl; - else if (strncmp (attrvalue, "value", 5) != 0) - cout << "FAIL: collection attribute value different" << endl; - memset (attrvalue, 0, sizeof (attrvalue)); - if (os->collection_getattr (cid, "fred", attrvalue, sizeof (attrvalue)) < 0) - cout << "FAIL: collection_getattr" << endl; - else if (strncmp (attrvalue, "value", 5) != 0) - cout << "FAIL: collection attribute value different" << endl; - - if (os->collection_rmattr (cid, "alpha") != 0) - cout << "FAIL: collection_rmattr" << endl; - if (os->collection_rmattr (cid, "fred") != 0) - cout << "FAIL: collection_rmattr" << endl; - if (os->collection_rmattr (cid, "beta") != 0) - cout << "FAIL: collection_rmattr" << endl; - if (os->collection_rmattr (cid, "gamma") != 0) - cout << "FAIL: collection_rmattr" << endl; - - if (os->collection_rmattr (cid, "alpha") == 0) - cout << "FAIL: collection_rmattr (nonexistent)" << endl; - - // Truncate the object. - if (os->truncate (oid, 512, NULL) != 0) - { - cout << "FAIL: truncate" << endl; - } - - // Expand the object. - if (os->truncate (oid, 1200, NULL) != 0) - { - cout << "FAIL: expand" << endl; - } - - // Delete the object. - if (os->remove (oid) != 0) - { - cout << "FAIL: could not remove object" << endl; - } - - // Shouldn't be there - if (os->exists (oid)) - { - cout << "FAIL: should not be there" << endl; - } - - os->sync(); - exit (0); -} -- 2.39.5