]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Compressor: add decompress failed codes
authorHaomai Wang <haomaiwang@gmail.com>
Fri, 20 Mar 2015 04:14:02 +0000 (12:14 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Wed, 1 Jul 2015 13:12:08 +0000 (21:12 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/compressor/AsyncCompressor.cc
src/compressor/AsyncCompressor.h
src/compressor/SnappyCompressor.h
src/test/common/test_async_compressor.cc

index d9b52b06ac5a40878760d79cabe51895eddc8636..e2ca2fe48bbfecd4255a88a957e8b3b5b87c0f0d 100644 (file)
@@ -140,17 +140,27 @@ int AsyncCompressor::get_compress_data(uint64_t compress_id, bufferlist &data, b
     ldout(cct, 10) << __func__ << " missing to get compress job id=" << compress_id << dendl;
     return -ENOENT;
   }
+  int status;
 
  retry:
-  if (it->second.status.read() == DONE) {
+  status = it->second.status.read();
+  if (status == DONE) {
     ldout(cct, 20) << __func__ << " successfully getting compressed data, job id=" << compress_id << dendl;
     *finished = true;
     data.swap(it->second.data);
     jobs.erase(it);
+  } else if (status == ERROR) {
+    ldout(cct, 20) << __func__ << " compressed data failed, job id=" << compress_id << dendl;
+    jobs.erase(it);
+    return -EIO;
   } else if (blocking) {
     if (it->second.status.cas(WAIT, DONE)) {
       ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished, abort!"<< dendl;
-      compressor->compress(it->second.data, data);
+      if (compressor->compress(it->second.data, data)) {
+        ldout(cct, 1) << __func__ << " compress job id=" << compress_id << " failed!"<< dendl;
+        it->second.status.set(ERROR);
+        return -EIO;
+      }
       *finished = true;
     } else {
       job_lock.Unlock();
@@ -174,17 +184,27 @@ int AsyncCompressor::get_decompress_data(uint64_t decompress_id, bufferlist &dat
     ldout(cct, 10) << __func__ << " missing to get decompress job id=" << decompress_id << dendl;
     return -ENOENT;
   }
+  int status;
 
  retry:
-  if (it->second.status.read() == DONE) {
+  status = it->second.status.read();
+  if (status == DONE) {
     ldout(cct, 20) << __func__ << " successfully getting decompressed data, job id=" << decompress_id << dendl;
     *finished = true;
     data.swap(it->second.data);
     jobs.erase(it);
+  } else if (status == ERROR) {
+    ldout(cct, 20) << __func__ << " compressed data failed, job id=" << decompress_id << dendl;
+    jobs.erase(it);
+    return -EIO;
   } else if (blocking) {
     if (it->second.status.cas(WAIT, DONE)) {
       ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't started, abort!"<< dendl;
-      compressor->decompress(it->second.data, data);
+      if (compressor->decompress(it->second.data, data)) {
+        ldout(cct, 1) << __func__ << " decompress job id=" << decompress_id << " failed!"<< dendl;
+        it->second.status.set(ERROR);
+        return -EIO;
+      }
       *finished = true;
     } else {
       job_lock.Unlock();
index d34cbe6bf0907784e006525811b4b32c2796ba57..659f506be57331b9a800cbc20301bdab1ae84a3d 100644 (file)
@@ -35,7 +35,8 @@ class AsyncCompressor {
   enum {
     WAIT,
     WORKING,
-    DONE
+    DONE,
+    ERROR
   } status;
   struct Job {
     uint64_t id;
@@ -79,7 +80,6 @@ class AsyncCompressor {
           break;
         } else {
           Mutex::Locker (async_compressor->job_lock);
-          assert(item->status.read() == DONE);
           async_compressor->jobs.erase(item->id);
           item = NULL;
         }
@@ -89,16 +89,19 @@ class AsyncCompressor {
     void _process(Job *item, ThreadPool::TPHandle &handle) {
       assert(item->status.read() == WORKING);
       bufferlist out;
+      int r;
       if (item->is_compress)
-        async_compressor->compressor->compress(item->data, out);
+        r = async_compressor->compressor->compress(item->data, out);
       else
-        async_compressor->compressor->decompress(item->data, out);
-      item->data.swap(out);
-    }
-    void _process_finish(Job *item) {
-      assert(item->status.read() == WORKING);
-      item->status.set(DONE);
+        r = async_compressor->compressor->decompress(item->data, out);
+      if (!r) {
+        item->data.swap(out);
+        assert(item->status.cas(WORKING, DONE));
+      } else {
+        item->status.set(ERROR);
+      }
     }
+    void _process_finish(Job *item) {}
     void _clear() {}
   } compress_wq;
   friend class CompressWQ;
index 8dc3497c45f276d8a6c6ddf583fc717e36298e56..f5d86d5a2420db0c544251b029239077c8b1ff1f 100644 (file)
@@ -64,7 +64,6 @@ class SnappyCompressor : public Compressor {
     BufferlistSource source(src);
     size_t res_len = 0;
     // Trick, decompress only need first 32bits buffer
-    list<bufferptr>::const_iterator pb = src.buffers().begin();
     if (!snappy::GetUncompressedLength(src.get_contiguous(0, 8), 8, &res_len))
       return -1;
     bufferptr ptr(res_len);
index 7988868419d4b28818568ad92e826d266090f6d2..aca448c0a00cde457966ee3447dc1fd065631377 100644 (file)
@@ -81,11 +81,25 @@ TEST_F(AsyncCompressorTest, GrubWaitTest) {
   async_compressor->init();
 }
 
+TEST_F(AsyncCompressorTest, DecompressInjectTest) {
+  bufferlist compress_data, decompress_data, rawdata;
+  generate_random_data(rawdata, 1<<22);
+  bool finished;
+  uint64_t id = async_compressor->async_compress(rawdata);
+  ASSERT_EQ(0, async_compressor->get_compress_data(id, compress_data, true, &finished));
+  ASSERT_TRUE(finished == true);
+  char error[] = "asjdfkwejrljqwaelrj";
+  memcpy(compress_data.c_str()+1024, error, sizeof(error)-1);
+  id = async_compressor->async_decompress(compress_data);
+  ASSERT_EQ(-EIO, async_compressor->get_decompress_data(id, decompress_data, true, &finished));
+}
+
 class SyntheticWorkload {
   set<pair<uint64_t, uint64_t> > compress_jobs, decompress_jobs;
   AsyncCompressor *async_compressor;
   vector<bufferlist> rand_data, compress_data;
   gen_type rng;
+  static const uint64_t MAX_INFLIGHT = 128;
 
  public:
   SyntheticWorkload(AsyncCompressor *ac): async_compressor(ac), rng(time(NULL)) {
@@ -124,32 +138,35 @@ class SyntheticWorkload {
     bool finished;
     set<pair<uint64_t, uint64_t> >::iterator prev;
     uint64_t c_reap = 0, d_reap = 0;
-    for (set<pair<uint64_t, uint64_t> >::iterator it = compress_jobs.begin();
-         it != compress_jobs.end();) {
-      prev = it;
-      it++;
-      async_compressor->get_compress_data(prev->first, data, blocking, &finished);
-      if (finished) {
-        c_reap++;
-        if (compress_data[prev->second].length())
-          ASSERT_TRUE(compress_data[prev->second].contents_equal(data));
-        else
-          compress_data[prev->second].swap(data);
-        compress_jobs.erase(prev);
+    do {
+      for (set<pair<uint64_t, uint64_t> >::iterator it = compress_jobs.begin();
+           it != compress_jobs.end();) {
+        prev = it;
+        it++;
+        ASSERT_EQ(0, async_compressor->get_compress_data(prev->first, data, blocking, &finished));
+        if (finished) {
+          c_reap++;
+          if (compress_data[prev->second].length())
+            ASSERT_TRUE(compress_data[prev->second].contents_equal(data));
+          else
+            compress_data[prev->second].swap(data);
+          compress_jobs.erase(prev);
+        }
       }
-    }
 
-    for (set<pair<uint64_t, uint64_t> >::iterator it = decompress_jobs.begin();
-         it != decompress_jobs.end();) {
-      prev = it;
-      it++;
-      async_compressor->get_decompress_data(prev->first, data, blocking, &finished);
-      if (finished) {
-        d_reap++;
-        ASSERT_TRUE(rand_data[prev->second].contents_equal(data));
-        decompress_jobs.erase(prev);
+      for (set<pair<uint64_t, uint64_t> >::iterator it = decompress_jobs.begin();
+           it != decompress_jobs.end();) {
+        prev = it;
+        it++;
+        ASSERT_EQ(0, async_compressor->get_decompress_data(prev->first, data, blocking, &finished));
+        if (finished) {
+          d_reap++;
+          ASSERT_TRUE(rand_data[prev->second].contents_equal(data));
+          decompress_jobs.erase(prev);
+        }
       }
-    }
+      usleep(1000 * 500);
+    } while (compress_jobs.size() + decompress_jobs.size() > MAX_INFLIGHT);
     cerr << " reap compress jobs " << c_reap << " decompress jobs " << d_reap << std::endl;
   }
   void print_internal_state() {
@@ -164,7 +181,7 @@ TEST_F(AsyncCompressorTest, SyntheticTest) {
   gen_type rng(time(NULL));
   boost::uniform_int<> true_false(0, 99);
   int val;
-  for (int i = 0; i < 10000; ++i) {
+  for (int i = 0; i < 3000; ++i) {
     if (!(i % 10)) {
       cerr << "Op " << i << ": ";
       test_ac.print_internal_state();