]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: iterate over slo parts
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 22 Sep 2015 07:35:03 +0000 (00:35 -0700)
committerRadoslaw Zarzynski <rzarzynski@mirantis.com>
Tue, 8 Dec 2015 16:58:07 +0000 (17:58 +0100)
Read slo parts when a read request was sent.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
Signed-off-by: Radoslaw Zarzynski <rzarzynski@mirantis.com>
Conflicts:
src/rgw/rgw_op.cc

src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_rest_swift.cc

index 5b5776697bf6a91380c9d969a16b02812c3f1bd3..93e7a63c888d128914c5b6dd529fde149f3392de 100644 (file)
@@ -814,6 +814,78 @@ static int iterate_user_manifest_parts(CephContext * const cct,
   return 0;
 }
 
+struct rgw_slo_part {
+  RGWAccessControlPolicy *bucket_policy;
+  rgw_bucket bucket;
+  string obj_name;
+  uint64_t size;
+
+  rgw_slo_part() : bucket_policy(NULL), size(0) {}
+};
+
+static int iterate_slo_parts(CephContext *cct, RGWRados *store, off_t ofs, off_t end,
+                                       map<uint64_t, rgw_slo_part>& slo_parts,
+                                       int (*cb)(rgw_bucket& bucket, RGWObjEnt& ent, RGWAccessControlPolicy *bucket_policy,
+                                                 off_t start_ofs, off_t end_ofs, void *param), void *cb_param)
+{
+  uint64_t obj_ofs = 0, len_count = 0;
+  bool found_start = false, found_end = false;
+  string delim;
+  vector<RGWObjEnt> objs;
+
+  if (slo_parts.empty()) {
+    return 0;
+  }
+
+
+  utime_t start_time = ceph_clock_now(cct);
+
+  map<uint64_t, rgw_slo_part>::iterator iter = slo_parts.upper_bound(ofs);
+  if (iter != slo_parts.begin()) {
+    --iter;
+  }
+
+  for (; iter != slo_parts.end() && !found_end; ++iter) {
+    rgw_slo_part& part = iter->second;
+    RGWObjEnt ent;
+
+    ent.key.name = part.obj_name;
+    ent.size = part.size;
+
+    uint64_t cur_total_len = obj_ofs;
+    uint64_t start_ofs = 0, end_ofs = ent.size;
+
+    if (!found_start && cur_total_len + ent.size > (uint64_t)ofs) {
+      start_ofs = ofs - obj_ofs;
+      found_start = true;
+    }
+
+    obj_ofs += ent.size;
+
+    if (!found_end && obj_ofs > (uint64_t)end) {
+      end_ofs = end - cur_total_len + 1;
+      found_end = true;
+    }
+
+    perfcounter->tinc(l_rgw_get_lat,
+                      (ceph_clock_now(cct) - start_time));
+
+    if (found_start) {
+      len_count += end_ofs - start_ofs;
+
+      if (cb) {
+        int r = cb(part.bucket, ent, part.bucket_policy, start_ofs, end_ofs, cb_param);
+        if (r < 0)
+          return r;
+      }
+    }
+
+    start_time = ceph_clock_now(cct);
+  }
+
+  return 0;
+}
+
 static int get_obj_user_manifest_iterate_cb(rgw_bucket& bucket,
                                             const RGWObjEnt& ent,
                                             RGWAccessControlPolicy * const bucket_policy,
@@ -894,6 +966,93 @@ int RGWGetObj::handle_user_manifest(const char *prefix)
   return 0;
 }
 
+int RGWGetObj::handle_slo_manifest(bufferlist& bl)
+{
+  RGWSLOInfo slo_info;
+  bufferlist::iterator bliter = bl.begin();
+  try {
+    ::decode(slo_info, bliter);
+  } catch (buffer::error& err) {
+    ldout(s->cct, 0) << "ERROR: failed to decode slo manifest" << dendl;
+    return -EIO;
+  }
+  ldout(s->cct, 2) << "RGWGetObj::handle_slo_manifest()" << dendl;
+
+  list<RGWAccessControlPolicy> allocated_policies;
+  map<string, RGWAccessControlPolicy *> policies;
+  map<string, rgw_bucket> buckets;
+
+  uint64_t ofs = 0, start_ofs = 0;
+  map<uint64_t, rgw_slo_part> slo_parts;
+
+  total_len = 0;
+
+  for (vector<rgw_slo_entry>::iterator iter = slo_info.entries.begin(); iter != slo_info.entries.end(); ++iter) {
+    string& path = iter->path;
+    int pos = path.find('/', 1); /* skip first / */
+    if (pos < 0)
+      return -EINVAL;
+
+    string bucket_name = path.substr(1, pos - 1);
+    string obj_name = path.substr(pos + 1);
+
+    rgw_bucket bucket;
+    RGWAccessControlPolicy *bucket_policy;
+
+    if (bucket_name.compare(s->bucket.name) != 0) {
+      map<string, RGWAccessControlPolicy *>::iterator piter = policies.find(bucket_name);
+      if (piter != policies.end()) {
+        bucket_policy = piter->second;
+        bucket = buckets[bucket_name];
+      } else {
+        allocated_policies.push_back(RGWAccessControlPolicy(s->cct));
+        RGWAccessControlPolicy& _bucket_policy = allocated_policies.back();
+
+        RGWBucketInfo bucket_info;
+        map<string, bufferlist> bucket_attrs;
+        RGWObjectCtx obj_ctx(store);
+        int r = store->get_bucket_info(obj_ctx, bucket_name, bucket_info, NULL, &bucket_attrs);
+        if (r < 0) {
+          ldout(s->cct, 0) << "could not get bucket info for bucket=" << bucket_name << dendl;
+          return r;
+        }
+        bucket = bucket_info.bucket;
+        rgw_obj_key no_obj;
+        bucket_policy = &_bucket_policy;
+        r = read_policy(store, s, bucket_info, bucket_attrs, bucket_policy, bucket, no_obj);
+        if (r < 0) {
+          ldout(s->cct, 0) << "failed to read bucket policy for bucket " << bucket << dendl;
+          return r;
+        }
+        buckets[bucket_name] = bucket;
+        policies[bucket_name] = bucket_policy;
+      }
+    } else {
+      bucket = s->bucket;
+      bucket_policy = s->bucket_acl;
+    }
+
+    rgw_slo_part part;
+    part.bucket_policy = bucket_policy;
+    part.bucket = bucket;
+    part.obj_name = obj_name;
+    part.size = iter->size_bytes;
+    ldout(s->cct, 20) << "slo_part: ofs=" << ofs << " bucket=" << part.bucket << " obj=" << part.obj_name << " size=" << iter->size_bytes << dendl;
+
+    slo_parts[total_len] = part;
+    total_len += part.size;
+  }
+
+  s->obj_size = slo_info.total_size;
+  ldout(s->cct, 20) << "s->obj_size=" << s->obj_size << dendl;
+
+  int r = iterate_slo_parts(s->cct, store, start_ofs, end, slo_parts, get_obj_user_manifest_iterate_cb, (void *)this);
+  if (r < 0)
+    return r;
+
+  return 0;
+}
+
 class RGWGetObj_CB : public RGWGetDataCB
 {
   RGWGetObj *op;
@@ -1022,6 +1181,15 @@ void RGWGetObj::execute()
     }
     return;
   }
+  attr_iter = attrs.find(RGW_ATTR_SLO_MANIFEST);
+  if (attr_iter != attrs.end()) {
+    is_slo = true;
+    ret = handle_slo_manifest(attr_iter->second);
+    if (ret < 0) {
+      ldout(s->cct, 0) << "ERROR: failed to handle slo manifest ret=" << ret << dendl;
+    }
+    return;
+  }
 
   /* Check whether the object has expired. Swift API documentation
    * stands that we should return 404 Not Found in such case. */
index 0a7d7cdb01fb4481dd5e747eda17154f1952cdcb..3ecfbed9859da8e5459230c027737789d1049afc 100644 (file)
@@ -142,6 +142,7 @@ protected:
   bool skip_manifest;
   rgw_obj obj;
   utime_t gc_invalidate_time;
+  bool is_slo;
 
   int init_common();
 public:
@@ -165,6 +166,7 @@ public:
     range_parsed = false;
     skip_manifest = false;
     ret = 0;
+    is_slo = false;
  }
 
   bool prefetch_data();
@@ -181,6 +183,7 @@ public:
                               off_t start_ofs,
                               off_t end_ofs);
   int handle_user_manifest(const char *prefix);
+  int handle_slo_manifest(bufferlist& bl);
 
   int get_data_cb(bufferlist& bl, off_t ofs, off_t len);
 
@@ -454,7 +457,10 @@ WRITE_CLASS_ENCODER(rgw_slo_entry)
 
 struct RGWSLOInfo {
   vector<rgw_slo_entry> entries;
-  char *raw_data; /* in memory only */
+  uint64_t total_size;
+
+  /* in memory only */
+  char *raw_data;
   int raw_data_len;
 
   RGWSLOInfo() : raw_data(NULL), raw_data_len(0) {}
@@ -465,12 +471,14 @@ struct RGWSLOInfo {
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
     ::encode(entries, bl);
+    ::encode(total_size, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::iterator& bl) {
      DECODE_START(1, bl);
      ::decode(entries, bl);
+     ::decode(total_size, bl);
      DECODE_FINISH(bl);
   }
 };
index f8c39f12dc0b1d3a2b3cd1a3f149074d1fde5eb3..0ca9ce9996cd875fda6f89d008f6161486cebd55 100644 (file)
@@ -589,6 +589,13 @@ int RGWPutObj_ObjStore_SWIFT::get_params()
       return -EINVAL;
     }
 
+    uint64_t total_size = 0;
+    for (vector<rgw_slo_entry>::iterator iter = slo_info->entries.begin(); iter != slo_info->entries.end(); ++iter) {
+      total_size += iter->size_bytes;
+      ldout(s->cct, 20) << "slo_part: " << iter->path << " size=" << iter->size_bytes << dendl;
+    }
+    slo_info->total_size = total_size;
+
     ofs = slo_info->raw_data_len;
   }
 
@@ -913,6 +920,9 @@ int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl, off_t bl_ofs, o
   dump_content_length(s, total_len);
   dump_last_modified(s, lastmod);
   s->cio->print("X-Timestamp: %lld.00000\r\n", (long long)lastmod);
+  if (is_slo) {
+    s->cio->print("X-Static-Large-Object: True\r\n");
+  }
 
   if (!ret) {
     map<string, bufferlist>::iterator iter = attrs.find(RGW_ATTR_ETAG);