]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Introducing timeranged scan, timeranged dump in ldb. Also the ability to count in...
authorMayank Agarwal <amayank@fb.com>
Wed, 19 Jun 2013 02:57:54 +0000 (19:57 -0700)
committerMayank Agarwal <amayank@fb.com>
Thu, 20 Jun 2013 01:45:13 +0000 (18:45 -0700)
Summary:
Scan and Dump commands in ldb use iterator. We need to also print timestamp for ttl databases for debugging. For this I create a TtlIterator class pointer in these functions and assign it the value of Iterator pointer which actually points to t TtlIterator object, and access the new function ValueWithTS which can return TS also. Buckets feature for dump command: gives a count of different key-values in the specified time-range distributed across the time-range partitioned according to bucket-size. start_time and end_time are specified in unixtimestamp and bucket in seconds on the user-commandline
Have commented out 3 ines from ldb_test.py so that the test does not break right now. It breaks because timestamp is also printed now and I have to look at wildcards in python to compare properly.

Test Plan: python tools/ldb_test.py

Reviewers: vamsi, dhruba, haobo, sheki

Reviewed By: vamsi

CC: leveldb
Differential Revision: https://reviews.facebook.net/D11403

util/ldb_cmd.cc
util/ldb_cmd.h
util/ldb_tool.cc
utilities/ttl/db_ttl.cc
utilities/ttl/db_ttl.h

index 9985bad86ac148e68a52f1fa2d6bb0699d669670..8476f3e8eae23e43cecc67e661592432f89e626b 100644 (file)
@@ -3,17 +3,19 @@
 // found in the LICENSE file.
 
 #include "util/ldb_cmd.h"
-#include <dirent.h>
-
-#include <sstream>
-#include <string>
-#include <stdexcept>
 
-#include "leveldb/write_batch.h"
 #include "db/dbformat.h"
 #include "db/log_reader.h"
 #include "db/filename.h"
 #include "db/write_batch_internal.h"
+#include "leveldb/write_batch.h"
+#include "util/coding.h"
+
+#include <ctime>
+#include <dirent.h>
+#include <sstream>
+#include <string>
+#include <stdexcept>
 
 namespace leveldb {
 
@@ -24,6 +26,9 @@ const string LDBCommand::ARG_HEX = "hex";
 const string LDBCommand::ARG_KEY_HEX = "key_hex";
 const string LDBCommand::ARG_VALUE_HEX = "value_hex";
 const string LDBCommand::ARG_TTL = "ttl";
+const string LDBCommand::ARG_TTL_START = "start_time";
+const string LDBCommand::ARG_TTL_END = "end_time";
+const string LDBCommand::ARG_TIMESTAMP = "timestamp";
 const string LDBCommand::ARG_FROM = "from";
 const string LDBCommand::ARG_TO = "to";
 const string LDBCommand::ARG_MAX_KEYS = "max_keys";
@@ -523,15 +528,53 @@ void ManifestDumpCommand::DoCommand() {
 
 // ----------------------------------------------------------------------------
 
+string ReadableTime(int unixtime) {
+  char time_buffer [80];
+  time_t rawtime = unixtime;
+  struct tm * timeinfo = localtime(&rawtime);
+  strftime(time_buffer, 80, "%c", timeinfo);
+  return string(time_buffer);
+}
+
+// This function only called when it's the sane case of >1 buckets in time-range
+// Also called only when timekv falls between ttl_start and ttl_end provided
+void IncBucketCounts(uint64_t* bucket_counts, int ttl_start, int time_range,
+      int bucket_size, int timekv, int num_buckets) {
+  if (time_range <= 0 || timekv < ttl_start || timekv > (ttl_start + time_range)
+      || bucket_size <= 0 || num_buckets < 2) {
+    fprintf(stderr, "Error: bucketizing\n");
+    return;
+  }
+  int bucket = (timekv - ttl_start);
+  bucket = (bucket == 0) ? 1 : ceil(bucket / (double)bucket_size);
+  bucket_counts[bucket - 1]++;
+}
+
+void PrintBucketCounts(uint64_t* bucket_counts, int ttl_start, int ttl_end,
+      int bucket_size, int num_buckets) {
+  int time_point = ttl_start;
+  for(int i = 0; i < num_buckets - 1; i++, time_point += bucket_size) {
+    fprintf(stdout, "Keys in range %s to %s : %lu\n",
+            ReadableTime(time_point).c_str(),
+            ReadableTime(time_point + bucket_size).c_str(), bucket_counts[i]);
+  }
+  fprintf(stdout, "Keys in range %s to %s : %lu\n",
+          ReadableTime(time_point).c_str(),
+          ReadableTime(ttl_end).c_str(), bucket_counts[num_buckets - 1]);
+}
+
 const string DBDumperCommand::ARG_COUNT_ONLY = "count_only";
 const string DBDumperCommand::ARG_STATS = "stats";
+const string DBDumperCommand::ARG_TTL_BUCKET = "bucket";
 
 DBDumperCommand::DBDumperCommand(const vector<string>& params,
       const map<string, string>& options, const vector<string>& flags) :
     LDBCommand(options, flags, true,
                BuildCmdLineOptions({ARG_TTL, ARG_HEX, ARG_KEY_HEX,
                                     ARG_VALUE_HEX, ARG_FROM, ARG_TO,
-                                    ARG_MAX_KEYS, ARG_COUNT_ONLY, ARG_STATS})),
+                                    ARG_MAX_KEYS, ARG_COUNT_ONLY, ARG_STATS,
+                                    ARG_TTL_START, ARG_TTL_END,
+                                    ARG_TTL_BUCKET, ARG_TIMESTAMP})),
     null_from_(true),
     null_to_(true),
     max_keys_(-1),
@@ -580,9 +623,14 @@ void DBDumperCommand::Help(string& ret) {
   ret.append("  ");
   ret.append(DBDumperCommand::Name());
   ret.append(HelpRangeCmdArgs());
+  ret.append(" [--" + ARG_TTL + "]");
   ret.append(" [--" + ARG_MAX_KEYS + "=<N>]");
+  ret.append(" [--" + ARG_TIMESTAMP + "]");
   ret.append(" [--" + ARG_COUNT_ONLY + "]");
   ret.append(" [--" + ARG_STATS + "]");
+  ret.append(" [--" + ARG_TTL_BUCKET + "=<N>]");
+  ret.append(" [--" + ARG_TTL_START + "=<N>]");
+  ret.append(" [--" + ARG_TTL_END + "=<N>]");
   ret.append("\n");
 }
 
@@ -614,25 +662,78 @@ void DBDumperCommand::DoCommand() {
   }
 
   int max_keys = max_keys_;
+  int ttl_start;
+  if (!ParseIntOption(option_map_, ARG_TTL_START, ttl_start, exec_state_)) {
+    ttl_start = DBWithTTL::kMinTimestamp; // TTL introduction time
+  }
+  int ttl_end;
+  if (!ParseIntOption(option_map_, ARG_TTL_END, ttl_end, exec_state_)) {
+    ttl_end = DBWithTTL::kMaxTimestamp; // Max time allowed by TTL feature
+  }
+  if (ttl_end < ttl_start) {
+    fprintf(stderr, "Error: End time can't be less than start time\n");
+    delete iter;
+    return;
+  }
+  int time_range = ttl_end - ttl_start;
+  int bucket_size;
+  if (!ParseIntOption(option_map_, ARG_TTL_BUCKET, bucket_size, exec_state_) ||
+      bucket_size <= 0) {
+    bucket_size = time_range; // Will have just 1 bucket by default
+  }
+  // At this point, bucket_size=0 => time_range=0
+  uint64_t num_buckets =
+    bucket_size >= time_range ? 1 : ceil((double)time_range/bucket_size);
+  unique_ptr<uint64_t[]> bucket_counts(new uint64_t[num_buckets]);
+  fill(bucket_counts.get(), bucket_counts.get() + num_buckets, 0);
+  if (is_db_ttl_ && !count_only_ && timestamp_) {
+    fprintf(stdout, "Dumping key-values from %s to %s\n",
+            ReadableTime(ttl_start).c_str(), ReadableTime(ttl_end).c_str());
+  }
+
   for (; iter->Valid(); iter->Next()) {
+    int rawtime = 0;
+    string value;
     // If end marker was specified, we stop before it
     if (!null_to_ && (iter->key().ToString() >= to_))
       break;
     // Terminate if maximum number of keys have been dumped
     if (max_keys == 0)
       break;
+    if (is_db_ttl_) {
+      TtlIterator* it_ttl = (TtlIterator*)iter;
+      struct ValueAndTimestamp val_ts = it_ttl->ValueWithTimestamp();
+      value = val_ts.value.ToString();
+      rawtime = val_ts.timestamp;
+      if (rawtime < ttl_start || rawtime > ttl_end) {
+        continue;
+      }
+    } else {
+      value = iter->value().ToString();
+    }
     if (max_keys > 0) {
       --max_keys;
     }
+    if (is_db_ttl_ && num_buckets > 1) {
+      IncBucketCounts(bucket_counts.get(), ttl_start, time_range, bucket_size,
+                      rawtime, num_buckets);
+    }
     ++count;
     if (!count_only_) {
+      if (is_db_ttl_ && timestamp_) {
+        fprintf(stdout, "%s ", ReadableTime(rawtime).c_str());
+      }
       string str = PrintKeyValue(iter->key().ToString(),
-                                 iter->value().ToString(),
-                                 is_key_hex_, is_value_hex_);
+                                 value, is_key_hex_, is_value_hex_);
       fprintf(stdout, "%s\n", str.c_str());
     }
   }
-  fprintf(stdout, "Keys in range: %lld\n", (long long) count);
+  if (num_buckets > 1 && is_db_ttl_) {
+    PrintBucketCounts(bucket_counts.get(), ttl_start, ttl_end, bucket_size,
+                      num_buckets);
+  } else {
+    fprintf(stdout, "Keys in range: %lld\n", (long long) count);
+  }
   // Clean up
   delete iter;
 }
@@ -920,6 +1021,7 @@ void GetCommand::Help(string& ret) {
   ret.append("  ");
   ret.append(GetCommand::Name());
   ret.append(" <key>");
+  ret.append(" [--" + ARG_TTL + "]");
   ret.append("\n");
 }
 
@@ -1013,6 +1115,7 @@ void BatchPutCommand::Help(string& ret) {
   ret.append("  ");
   ret.append(BatchPutCommand::Name());
   ret.append(" <key> <value> [<key> <value>] [..]");
+  ret.append(" [--" + ARG_TTL + "]");
   ret.append("\n");
 }
 
@@ -1041,9 +1144,9 @@ Options BatchPutCommand::PrepareOptionsForOpenDB() {
 ScanCommand::ScanCommand(const vector<string>& params,
       const map<string, string>& options, const vector<string>& flags) :
     LDBCommand(options, flags, true,
-               BuildCmdLineOptions({ARG_TTL, ARG_HEX, ARG_KEY_HEX,
-                                    ARG_VALUE_HEX, ARG_FROM, ARG_TO,
-                                    ARG_MAX_KEYS})),
+               BuildCmdLineOptions({ARG_TTL, ARG_HEX, ARG_KEY_HEX, ARG_TO,
+                                    ARG_VALUE_HEX, ARG_FROM, ARG_TIMESTAMP,
+                                    ARG_MAX_KEYS, ARG_TTL_START, ARG_TTL_END})),
     start_key_specified_(false),
     end_key_specified_(false),
     max_keys_scanned_(-1) {
@@ -1083,7 +1186,11 @@ void ScanCommand::Help(string& ret) {
   ret.append("  ");
   ret.append(ScanCommand::Name());
   ret.append(HelpRangeCmdArgs());
-  ret.append("--" + ARG_MAX_KEYS + "=N] ");
+  ret.append(" [--" + ARG_TTL + "]");
+  ret.append(" [--" + ARG_TIMESTAMP + "]");
+  ret.append(" [--" + ARG_MAX_KEYS + "=<N>q] ");
+  ret.append(" [--" + ARG_TTL_START + "=<N>]");
+  ret.append(" [--" + ARG_TTL_END + "=<N>]");
   ret.append("\n");
 }
 
@@ -1096,11 +1203,42 @@ void ScanCommand::DoCommand() {
   } else {
     it->SeekToFirst();
   }
+  int ttl_start;
+  if (!ParseIntOption(option_map_, ARG_TTL_START, ttl_start, exec_state_)) {
+    ttl_start = DBWithTTL::kMinTimestamp; // TTL introduction time
+  }
+  int ttl_end;
+  if (!ParseIntOption(option_map_, ARG_TTL_END, ttl_end, exec_state_)) {
+    ttl_end = DBWithTTL::kMaxTimestamp; // Max time allowed by TTL feature
+  }
+  if (ttl_end < ttl_start) {
+    fprintf(stderr, "Error: End time can't be less than start time\n");
+    delete it;
+    return;
+  }
+  if (is_db_ttl_ && timestamp_) {
+    fprintf(stdout, "Scanning key-values from %s to %s\n",
+            ReadableTime(ttl_start).c_str(), ReadableTime(ttl_end).c_str());
+  }
   for ( ;
         it->Valid() && (!end_key_specified_ || it->key().ToString() < end_key_);
         it->Next()) {
     string key = it->key().ToString();
-    string value = it->value().ToString();
+    string value;
+    if (is_db_ttl_) {
+      TtlIterator* it_ttl = (TtlIterator*)it;
+      struct ValueAndTimestamp val_ts = it_ttl->ValueWithTimestamp();
+      int rawtime = val_ts.timestamp;
+      value = val_ts.value.ToString();
+      if (rawtime < ttl_start || rawtime > ttl_end) {
+        continue;
+      }
+      if (timestamp_) {
+        fprintf(stdout, "%s ", ReadableTime(rawtime).c_str());
+      }
+    } else {
+      value = it->value().ToString();
+    }
     fprintf(stdout, "%s : %s\n",
           (is_key_hex_ ? StringToHex(key) : key).c_str(),
           (is_value_hex_ ? StringToHex(value) : value).c_str()
@@ -1176,6 +1314,7 @@ void PutCommand::Help(string& ret) {
   ret.append("  ");
   ret.append(PutCommand::Name());
   ret.append(" <key> <value> ");
+  ret.append(" [--" + ARG_TTL + "]");
   ret.append("\n");
 }
 
@@ -1211,6 +1350,7 @@ DBQuerierCommand::DBQuerierCommand(const vector<string>& params,
 void DBQuerierCommand::Help(string& ret) {
   ret.append("  ");
   ret.append(DBQuerierCommand::Name());
+  ret.append(" [--" + ARG_TTL + "]");
   ret.append("\n");
   ret.append("    Starts a REPL shell.  Type help for list of available "
              "commands.");
index 1fc51c4e3da369fe1f1ff6101392f9f832465c38..d8e4c4b115fff1410a79297e98052861966e2128 100644 (file)
@@ -21,6 +21,7 @@
 #include "util/ldb_cmd_execute_result.h"
 #include "util/string_util.h"
 #include "utilities/utility_db.h"
+#include "utilities/ttl/db_ttl.h"
 
 using std::string;
 using std::map;
@@ -38,6 +39,9 @@ public:
   static const string ARG_KEY_HEX;
   static const string ARG_VALUE_HEX;
   static const string ARG_TTL;
+  static const string ARG_TTL_START;
+  static const string ARG_TTL_END;
+  static const string ARG_TIMESTAMP;
   static const string ARG_FROM;
   static const string ARG_TO;
   static const string ARG_MAX_KEYS;
@@ -162,6 +166,9 @@ protected:
   /** If true, the value is treated as timestamp suffixed */
   bool is_db_ttl_;
 
+  // If true, the kvs are output with their insert/modify timestamp in a ttl db
+  bool timestamp_;
+
   /**
    * Map of options passed on the command-line.
    */
@@ -185,6 +192,7 @@ protected:
       is_key_hex_(false),
       is_value_hex_(false),
       is_db_ttl_(false),
+      timestamp_(false),
       option_map_(options),
       flags_(flags),
       valid_cmd_line_options_(valid_cmd_line_options) {
@@ -197,6 +205,7 @@ protected:
     is_key_hex_ = IsKeyHex(options, flags);
     is_value_hex_ = IsValueHex(options, flags);
     is_db_ttl_ = IsFlagPresent(flags, ARG_TTL);
+    timestamp_ = IsFlagPresent(flags, ARG_TIMESTAMP);
   }
 
   void OpenDB() {
@@ -385,6 +394,7 @@ private:
 
   static const string ARG_COUNT_ONLY;
   static const string ARG_STATS;
+  static const string ARG_TTL_BUCKET;
 };
 
 class DBLoaderCommand: public LDBCommand {
index 3f4cfe95020424ad24d4d9bea3567bda2da5a95a..e46aee39d11f988a545acda428d514982573c6cb 100644 (file)
@@ -18,9 +18,6 @@ public:
     ret.append("\n");
     ret.append("The following optional parameters control if keys/values are "
         "input/output as hex or as plain strings:\n");
-    ret.append("  --" + LDBCommand::ARG_TTL +
-        " with 'put','get','scan','dump','query','batchput'"
-        " : DB supports ttl and value is internally timestamp-suffixed\n");
     ret.append("  --" + LDBCommand::ARG_KEY_HEX +
         " : Keys are input/output as hex\n");
     ret.append("  --" + LDBCommand::ARG_VALUE_HEX +
@@ -31,6 +28,9 @@ public:
 
     ret.append("The following optional parameters control the database "
         "internals:\n");
+    ret.append("  --" + LDBCommand::ARG_TTL +
+        " with 'put','get','scan','dump','query','batchput'"
+        " : DB supports ttl and value is internally timestamp-suffixed\n");
     ret.append("  --" + LDBCommand::ARG_BLOOM_BITS + "=<int,e.g.:14>\n");
     ret.append("  --" + LDBCommand::ARG_COMPRESSION_TYPE +
         "=<no|snappy|zlib|bzip2>\n");
index d4c8fc557433806123be656da39dbd64c1391211..af99f80e4b57a9bef50a054aa234eedee8aae784 100644 (file)
 
 namespace leveldb {
 
-class TtlIterator : public Iterator {
-
- public:
-  TtlIterator(Iterator* iter, int32_t ts_len)
-    : iter_(iter),
-      ts_len_(ts_len) {
-    assert(iter_);
-  }
-
-  ~TtlIterator() {
-    delete iter_;
-  }
-
-  bool Valid() const {
-    return iter_->Valid();
-  }
-
-  void SeekToFirst() {
-    iter_->SeekToFirst();
-  }
-
-  void SeekToLast() {
-    iter_->SeekToLast();
-  }
-
-  void Seek(const Slice& target) {
-    iter_->Seek(target);
-  }
-
-  void Next() {
-    iter_->Next();
-  }
-
-  void Prev() {
-    iter_->Prev();
-  }
-
-  Slice key() const {
-    return iter_->key();
-  }
-
-  Slice value() const {
-    assert(DBWithTTL::SanityCheckTimestamp(iter_->value().ToString()).ok());
-    Slice trimmed_value = iter_->value();
-    trimmed_value.size_ -= ts_len_;
-    return trimmed_value;
-  }
-
-  Status status() const {
-    return iter_->status();
-  }
-
- private:
-  Iterator* iter_;
-  int32_t ts_len_;
-};
-
 // Open the db inside DBWithTTL because options needs pointer to its ttl
 DBWithTTL::DBWithTTL(const int32_t ttl,
                      const Options& options,
index 924bd81750fb8084c8aefceb0f314086da3e526e..b7533cd49ceb4af256b04796e4a72d98d8a64132 100644 (file)
@@ -97,12 +97,86 @@ class DBWithTTL : public DB, CompactionFilter {
 
   static const int32_t kTSLength = sizeof(int32_t); // size of timestamp
 
-  static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM
+  static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8
+
+  static const int32_t kMaxTimestamp = 2147483647; // 01/18/2038:7:14PM GMT-8
 
  private:
   DB* db_;
   int32_t ttl_;
 };
 
+struct ValueAndTimestamp {
+  Slice value;
+  int32_t timestamp;
+};
+
+class TtlIterator : public Iterator {
+
+ public:
+  TtlIterator(Iterator* iter, int32_t ts_len)
+    : iter_(iter),
+      ts_len_(ts_len) {
+    assert(iter_);
+  }
+
+  ~TtlIterator() {
+    delete iter_;
+  }
+
+  bool Valid() const {
+    return iter_->Valid();
+  }
+
+  void SeekToFirst() {
+    iter_->SeekToFirst();
+  }
+
+  void SeekToLast() {
+    iter_->SeekToLast();
+  }
+
+  void Seek(const Slice& target) {
+    iter_->Seek(target);
+  }
+
+  void Next() {
+    iter_->Next();
+  }
+
+  void Prev() {
+    iter_->Prev();
+  }
+
+  Slice key() const {
+    return iter_->key();
+  }
+
+  struct ValueAndTimestamp ValueWithTimestamp() const {
+    assert(DBWithTTL::SanityCheckTimestamp(iter_->value().ToString()).ok());
+    struct ValueAndTimestamp val_ts;
+    val_ts.timestamp = DecodeFixed32(
+      iter_->value().data() + iter_->value().size() - DBWithTTL::kTSLength);
+    val_ts.value = iter_->value();
+    val_ts.value.size_ -= ts_len_;
+    return val_ts;
+  }
+
+  Slice value() const {
+    assert(DBWithTTL::SanityCheckTimestamp(iter_->value().ToString()).ok());
+    Slice trimmed_value = iter_->value();
+    trimmed_value.size_ -= ts_len_;
+    return trimmed_value;
+  }
+
+  Status status() const {
+    return iter_->status();
+  }
+
+ private:
+  Iterator* iter_;
+  int32_t ts_len_;
+};
+
 }
 #endif  // LEVELDB_UTILITIES_TTL_DB_TTL_H_