]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: pass original object attrs through extra request data
authorYehuda Sadeh <yehuda@inktank.com>
Fri, 14 Jun 2013 04:59:37 +0000 (21:59 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Fri, 14 Jun 2013 04:59:37 +0000 (21:59 -0700)
introduce a new mechanism that sends extra data with object
info, and use it to encode source object metadata.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/rgw/rgw_common.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_rest_client.cc
src/rgw/rgw_rest_client.h
src/rgw/rgw_rest_conn.cc
src/rgw/rgw_rest_conn.h
src/rgw/rgw_rest_s3.cc

index dfd128294323241d0542759b3a619c85dfdc0b25..151b7667556241971c6c7a86b985cfc4dba84a1b 100644 (file)
@@ -43,6 +43,9 @@ using ceph::crypto::MD5;
 
 #define RGW_ATTR_PREFIX  "user.rgw."
 
+#define RGW_HTTP_RGWX_ATTR_PREFIX "RGWX_ATTR_"
+#define RGW_HTTP_RGWX_ATTR_PREFIX_OUT "Rgwx-Attr-"
+
 #define RGW_AMZ_META_PREFIX "x-amz-meta-"
 
 #define RGW_SYS_PARAM_PREFIX "rgwx-"
index 809be0343c029c37a3739e2aab44f86ce1f8813b..88fbc891e438a1553f6683844f9b8b7e68a35039 100644 (file)
@@ -604,6 +604,20 @@ int RGWPutObjProcessor_Aio::throttle_data(void *handle)
 }
 
 int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle) {
+  if (extra_data_len) {
+    size_t extra_len = bl.length();
+    if (extra_len > extra_data_len)
+      extra_len = extra_data_len;
+
+    /* is there a better way to split a bl into two bufferlists? */
+    bufferlist extra;
+    bl.splice(0, extra_len, &extra);
+    extra_data_bl.append(extra);
+
+    extra_data_len -= extra_len;
+    if (bl.length() == 0)
+      return 0;
+  }
   if (!ofs && !immutable_head()) {
     first_chunk.claim(bl);
     *phandle = NULL;
@@ -2125,11 +2139,32 @@ public:
     return 0;
   }
 
+  void set_extra_data_len(uint64_t len) {
+    RGWGetDataCB::set_extra_data_len(len);
+    processor->set_extra_data_len(len);
+  }
+
   int complete(string& etag, time_t *mtime, map<string, bufferlist>& attrs) {
     return processor->complete(etag, mtime, attrs);
   }
 };
 
+/*
+ * prepare attrset, either replace it with new attrs, or keep it (other than acls).
+ */
+static void set_copy_attrs(map<string, bufferlist>& src_attrs, map<string, bufferlist>& attrs, bool replace_attrs)
+{
+  if (replace_attrs) {
+    if (!attrs[RGW_ATTR_ETAG].length())
+      attrs[RGW_ATTR_ETAG] = src_attrs[RGW_ATTR_ETAG];
+
+    src_attrs = attrs;
+  } else {
+    /* copying attrs from source, however acls should not be copied */
+    src_attrs[RGW_ATTR_ACL] = attrs[RGW_ATTR_ACL];
+  }
+}
+
 /**
  * Copy an object.
  * dest_obj: the object to copy into
@@ -2182,19 +2217,17 @@ int RGWRados::copy_obj(void *ctx,
 
   void *handle = NULL;
 
-  map<string, bufferlist> attrset;
+  map<string, bufferlist> src_attrs;
   off_t ofs = 0;
   off_t end = -1;
   if (!remote_src) {
-    ret = prepare_get_obj(ctx, src_obj, &ofs, &end, &attrset,
+    ret = prepare_get_obj(ctx, src_obj, &ofs, &end, &src_attrs,
                   mod_ptr, unmod_ptr, &lastmod, if_match, if_nomatch, &total_len, &obj_size, NULL, &handle, err);
     if (ret < 0)
       return ret;
   } else {
     /* source is in a different region, copy it there */
 
-    map<string, bufferlist> src_attrs;
-
     RGWRESTStreamReadRequest *in_stream_req;
     string tag;
     append_rand_alpha(cct, tag, tag, 32);
@@ -2207,32 +2240,40 @@ int RGWRados::copy_obj(void *ctx,
 
     RGWRadosPutObj cb(&processor);
   
-    int ret = rest_conn->get_obj(user_id, src_obj, &cb, &in_stream_req);
+    int ret = rest_conn->get_obj(user_id, src_obj, true, &cb, &in_stream_req);
     if (ret < 0)
       return ret;
 
     string etag;
 
-    ret = rest_conn->complete_request(in_stream_req, etag, mtime);
+    map<string, string> req_headers;
+    ret = rest_conn->complete_request(in_stream_req, etag, mtime, req_headers);
     if (ret < 0)
       return ret;
 
-    ret = cb.complete(etag, mtime, attrs);
+    bufferlist& extra_data_bl = processor.get_extra_data();
+    if (extra_data_bl.length()) {
+      bufferlist::iterator iter = extra_data_bl.begin();
+      try {
+        ::decode(src_attrs, iter);
+      } catch (buffer::error& err) {
+        ldout(cct, 0) << "ERROR: failed to decode extra metadata info" << dendl;
+        return -EIO;
+      }
+
+      src_attrs.erase(RGW_ATTR_MANIFEST); // not interested in original object layout
+    }
+
+    set_copy_attrs(src_attrs, attrs, replace_attrs);
+
+    ret = cb.complete(etag, mtime, src_attrs);
     if (ret < 0)
       return ret;
 
     return 0;
   }
 
-  if (replace_attrs) {
-    if (!attrs[RGW_ATTR_ETAG].length())
-      attrs[RGW_ATTR_ETAG] = attrset[RGW_ATTR_ETAG];
-
-    attrset = attrs;
-  } else {
-    /* copying attrs from source, however acls should not be copied */
-    attrset[RGW_ATTR_ACL] = attrs[RGW_ATTR_ACL];
-  }
+  set_copy_attrs(src_attrs, attrs, replace_attrs);
 
   RGWObjManifest manifest;
   RGWObjState *astate = NULL;
@@ -2268,7 +2309,7 @@ int RGWRados::copy_obj(void *ctx,
 
     RGWRESTStreamWriteRequest *out_stream_req;
   
-    int ret = rest_conn->put_obj_init(user_id, dest_obj, astate->size, attrset, &out_stream_req);
+    int ret = rest_conn->put_obj_init(user_id, dest_obj, astate->size, src_attrs, &out_stream_req);
     if (ret < 0)
       return ret;
 
@@ -2284,7 +2325,7 @@ int RGWRados::copy_obj(void *ctx,
 
     return 0;
   } else if (copy_data) { /* refcounting tail wouldn't work here, just copy the data */
-    return copy_obj_data(ctx, handle, end, dest_obj, src_obj, mtime, attrset, category, ptag, err);
+    return copy_obj_data(ctx, handle, end, dest_obj, src_obj, mtime, src_attrs, category, ptag, err);
   }
 
   map<uint64_t, RGWObjManifestPart>::iterator miter = astate->manifest.objs.begin();
@@ -2359,7 +2400,7 @@ int RGWRados::copy_obj(void *ctx,
   ep.manifest = pmanifest;
   ep.ptag = &tag;
 
-  ret = put_obj_meta(ctx, dest_obj, end + 1, attrset, category, PUT_OBJ_CREATE, ep);
+  ret = put_obj_meta(ctx, dest_obj, end + 1, src_attrs, category, PUT_OBJ_CREATE, ep);
 
   if (mtime)
     obj_stat(ctx, dest_obj, NULL, mtime, NULL, NULL, NULL, NULL);
index 9952feafa25503707e81e63e261b43888ab03951..7c9dcfd0f89f03d1858df76fd1e547511135732b 100644 (file)
@@ -64,9 +64,15 @@ struct RGWUsageIter {
 };
 
 class RGWGetDataCB {
+protected:
+  uint64_t extra_data_len;
 public:
   virtual int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) = 0;
+  RGWGetDataCB() : extra_data_len(0) {}
   virtual ~RGWGetDataCB() {}
+  virtual void set_extra_data_len(uint64_t len) {
+    extra_data_len = len;
+  }
 };
 
 class RGWAccessListFilter {
@@ -253,6 +259,9 @@ class RGWPutObjProcessor_Atomic : public RGWPutObjProcessor_Aio
   off_t cur_part_ofs;
   off_t next_part_ofs;
   int cur_part_id;
+
+  uint64_t extra_data_len;
+  bufferlist extra_data_bl;
 protected:
   rgw_bucket bucket;
   string obj_str;
@@ -277,11 +286,16 @@ public:
                                 cur_part_ofs(0),
                                 next_part_ofs(_p),
                                 cur_part_id(0),
+                                extra_data_len(0),
                                 bucket(_b),
                                 obj_str(_o),
                                 unique_tag(_t) {}
   int prepare(RGWRados *store, void *obj_ctx);
+  void set_extra_data_len(uint64_t len) {
+    extra_data_len = len;
+  }
   int handle_data(bufferlist& bl, off_t ofs, void **phandle);
+  bufferlist& get_extra_data() { return extra_data_bl; }
 };
 
 
index 907ac0c423b43302b7ba2d3cfbad3ff1eb6274e0..b8ee5aed8d60ce84623b0d53f1a2a65e09de1466 100644 (file)
@@ -54,6 +54,9 @@ int RGWRESTSimpleRequest::receive_header(void *ptr, size_t len)
           }
           buf[i] = '\0';
           out_headers[buf] = l;
+          int r = handle_header(buf, l);
+          if (r < 0)
+            return r;
         }
       }
     }
@@ -506,7 +509,6 @@ void set_str_from_headers(map<string, string>& out_headers, const string& header
   }
 }
 
-
 int RGWRESTStreamWriteRequest::complete(string& etag, time_t *mtime)
 {
   int ret = complete_request(handle);
@@ -568,9 +570,6 @@ int RGWRESTStreamReadRequest::get_obj(RGWAccessKey& key, rgw_obj& obj)
     headers.push_back(make_pair<string, string>(iter->first, iter->second));
   }
 
-  // cb = new RGWRESTStreamInCB(this);
-
-
   int r = process(new_info.method, new_url.c_str());
   if (r < 0)
     return r;
@@ -578,7 +577,7 @@ int RGWRESTStreamReadRequest::get_obj(RGWAccessKey& key, rgw_obj& obj)
   return 0;
 }
 
-int RGWRESTStreamReadRequest::complete(string& etag, time_t *mtime)
+int RGWRESTStreamReadRequest::complete(string& etag, time_t *mtime, map<string, string>& attrs)
 {
   set_str_from_headers(out_headers, "ETAG", etag);
   if (mtime) {
@@ -595,9 +594,45 @@ int RGWRESTStreamReadRequest::complete(string& etag, time_t *mtime)
     }
   }
 
+  map<string, string>::iterator iter;
+  for (iter = out_headers.begin(); iter != out_headers.end(); ++iter) {
+    const string& attr_name = iter->first;
+    if (attr_name.compare(0, sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1, RGW_HTTP_RGWX_ATTR_PREFIX) == 0) {
+      string name = attr_name.substr(sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1);
+      const char *src = name.c_str();
+      char buf[name.size() + 1];
+      char *dest = buf;
+      for (; *src; ++src, ++dest) {
+        switch(*src) {
+          case '_':
+            *dest = '-';
+            break;
+          default:
+            *dest = tolower(*src);
+        }
+      }
+      *dest = '\0';
+      attrs[buf] = iter->second;
+    }
+  }
   return status;
 }
 
+int RGWRESTStreamReadRequest::handle_header(const string& name, const string& val)
+{
+  if (name == "RGWX_EMBEDDED_METADATA_LEN") {
+    string err;
+    long len = strict_strtol(val.c_str(), 10, &err);
+    if (!err.empty()) {
+      ldout(cct, 0) << "ERROR: failed converting embedded metadata len (" << val << ") to int " << dendl;
+      return -EINVAL;
+    }
+
+    cb->set_extra_data_len(len);
+  }
+  return 0;
+}
+
 int RGWRESTStreamReadRequest::receive_data(void *ptr, size_t len)
 {
   bufferptr bp((const char *)ptr, len);
@@ -608,36 +643,6 @@ int RGWRESTStreamReadRequest::receive_data(void *ptr, size_t len)
     return ret;
   ofs += len;
   return len;
-#if 0
-  return cb->handle_data(bl
-  const char *p = (const char *)ptr;
-  size_t orig_len = len;
-  while (len > 0) {
-    size_t read_len = RGW_MAX_CHUNK_SIZE - chunk_ofs;
-    if (read_len > len)
-      read_len = len;
-
-    bufferptr bp((const char *)p, read_len);
-    in_data.append(bp);
-
-    p += read_len;
-    len -= read_len;
-    chunk_ofs += read_len;
-    if (chunk_ofs == RGW_MAX_CHUNK_SIZE) {
-      chunk_ofs = 0;
-      size_t data_len = in_data.length();
-      int r = cb->handle_data(in_data, ofs, data_len);
-      if (r < 0)
-        return r;
-
-      ofs += data_len;
-
-      in_data.clear();
-    }
-  }
-
-  return orig_len;
-#endif
 }
 
 int RGWRESTStreamReadRequest::send_data(void *ptr, size_t len)
index 973a88e7972b1c59dd6e496bb1e7cd678193dc9f..2d1d0d1db537e60e22583cb727d743ffe6290175 100644 (file)
@@ -24,6 +24,7 @@ protected:
   size_t max_response; /* we need this as we don't stream out response */
   bufferlist response;
 
+  virtual int handle_header(const string& name, const string& val) { return 0; }
   void append_param(string& dest, const string& name, const string& val);
   void get_params_str(map<string, string>& extra_args, string& dest);
 
@@ -78,6 +79,8 @@ class RGWRESTStreamReadRequest : public RGWRESTSimpleRequest {
   bufferlist in_data;
   size_t chunk_ofs;
   size_t ofs;
+protected:
+  int handle_header(const string& name, const string& val);
 public:
   int send_data(void *ptr, size_t len);
   int receive_data(void *ptr, size_t len);
@@ -88,7 +91,7 @@ public:
                 chunk_ofs(0), ofs(0) {}
   ~RGWRESTStreamReadRequest() {}
   int get_obj(RGWAccessKey& key, rgw_obj& obj);
-  int complete(string& etag, time_t *mtime);
+  int complete(string& etag, time_t *mtime, map<string, string>& attrs);
 
   void set_in_cb(RGWGetDataCB *_cb) { cb = _cb; }
 };
index fb3de5b423d4da36b5ba3e082977ce591d713e25..1cd57b40acf7c04def9b1c0a5a9659b167922817 100644 (file)
@@ -69,7 +69,7 @@ int RGWRegionConnection::complete_request(RGWRESTStreamWriteRequest *req, string
   return ret;
 }
 
-int RGWRegionConnection::get_obj(const string& uid, rgw_obj& obj, RGWGetDataCB *cb, RGWRESTStreamReadRequest **req)
+int RGWRegionConnection::get_obj(const string& uid, rgw_obj& obj, bool prepend_metadata, RGWGetDataCB *cb, RGWRESTStreamReadRequest **req)
 {
   string url;
   int ret = get_url(url);
@@ -79,13 +79,17 @@ int RGWRegionConnection::get_obj(const string& uid, rgw_obj& obj, RGWGetDataCB *
   list<pair<string, string> > params;
   params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "uid", uid));
   params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "region", region));
+  if (prepend_metadata) {
+    params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "prepend-metadata", region));
+  }
   *req = new RGWRESTStreamReadRequest(cct, url, cb, NULL, &params);
   return (*req)->get_obj(key, obj);
 }
 
-int RGWRegionConnection::complete_request(RGWRESTStreamReadRequest *req, string& etag, time_t *mtime)
+int RGWRegionConnection::complete_request(RGWRESTStreamReadRequest *req, string& etag, time_t *mtime,
+                                          map<string, string>& attrs)
 {
-  int ret = req->complete(etag, mtime);
+  int ret = req->complete(etag, mtime, attrs);
   delete req;
 
   return ret;
index f0ef6ed024745835bd8942dcd490c9377023d15a..a1b2a07286e17236a81f0018ebb8fec203357ee3 100644 (file)
@@ -28,8 +28,9 @@ public:
                    map<string, bufferlist>& attrs, RGWRESTStreamWriteRequest **req);
   int complete_request(RGWRESTStreamWriteRequest *req, string& etag, time_t *mtime);
 
-  int get_obj(const string& uid, rgw_obj& obj, RGWGetDataCB *cb, RGWRESTStreamReadRequest **req);
-  int complete_request(RGWRESTStreamReadRequest *req, string& etag, time_t *mtime);
+  int get_obj(const string& uid, rgw_obj& obj, bool prepend_metadata, RGWGetDataCB *cb, RGWRESTStreamReadRequest **req);
+  int complete_request(RGWRESTStreamReadRequest *req, string& etag, time_t *mtime,
+                       map<string, string>& attrs);
 };
 
 #endif
index 4aaf1cd3bae6066cf06d639a52eab7a26dd2d0b6..a8f440c13e717b982b02b5fb8fb7535c2386d13a 100644 (file)
@@ -73,6 +73,9 @@ int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs, off_
   string content_type_str;
   map<string, string> response_attrs;
   map<string, string>::iterator riter;
+  bufferlist metadata_bl;
+
+  bool prepend_metadata = false;
 
   if (ret)
     goto done;
@@ -83,6 +86,14 @@ int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs, off_
   if (range_str)
     dump_range(s, start, end, s->obj_size);
 
+  if (s->system_request &&
+      s->info.args.exists(RGW_SYS_PARAM_PREFIX "prepend-metadata")) {
+    prepend_metadata = true;
+    ::encode(attrs, metadata_bl);
+    s->cio->print("Rgwx-Embedded-Metadata-Len: %lld\r\n", (long long)metadata_bl.length());
+    total_len += metadata_bl.length();
+  }
+
   dump_content_length(s, total_len);
   dump_last_modified(s, lastmod);
 
@@ -143,6 +154,10 @@ done:
   if (!content_type)
     content_type = "binary/octet-stream";
   end_header(s, content_type);
+
+  if (metadata_bl.length()) {
+    s->cio->write(metadata_bl.c_str(), metadata_bl.length());
+  }
   sent_header = true;
 
 send_data: