]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw:add a s3 API of make torrent for a object
authorzhouruisong <236131368@qq.com>
Wed, 8 Jun 2016 08:45:03 +0000 (16:45 +0800)
committerzhouruisong <236131368@qq.com>
Wed, 6 Jul 2016 08:58:47 +0000 (16:58 +0800)
When you execute the command gettorrent of a object, a torrent file will be produced and returned.
The torrent also will be save into a pool named default.rgw.torrent.
If the torrent of a object exists in default.rgw.torrent, it will be returned.

Signed-off-by: zhouruisong <236131368@qq.com>
src/common/config_opts.h
src/rgw/Makefile.am
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_rest.cc
src/rgw/rgw_torrent.cc [new file with mode: 0644]
src/rgw/rgw_torrent.h [new file with mode: 0644]

index 10d91b4c0107be1ad042ac71733e94de69b13789..c3d056d175f007f4061ac872a0c8d79b3f18452f 100644 (file)
@@ -1457,6 +1457,14 @@ OPTION(rgw_list_bucket_min_readahead, OPT_INT, 1000) // minimum number of entrie
 OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter
 OPTION(throttler_perf_counter, OPT_BOOL, true) // enable/disable throttler perf counter
 
+/* The following are tunables for torrent data */
+OPTION(rgw_torrent_tracker, OPT_STR, "")    // torrent field annouce and annouce list
+OPTION(rgw_torrent_createby, OPT_STR, "")    // torrent field created by
+OPTION(rgw_torrent_comment, OPT_STR, "")    // torrent field comment
+OPTION(rgw_torrent_encoding, OPT_STR, "")    // torrent field encoding
+OPTION(rgw_torrent_origin, OPT_STR, "")    // torrent origin
+OPTION(rgw_torrent_sha_unit, OPT_INT, 512*1024)    //torrent field piece length 521K
+
 // This will be set to true when it is safe to start threads.
 // Once it is true, it will never change.
 OPTION(internal_safe_to_start_threads, OPT_BOOL, false)
index 96c9c1a7bc3fd6cf91533cf9249cc6b0813e4fd2..613fa6a21711b7d181676619fbaea126f820b655 100644 (file)
@@ -89,7 +89,8 @@ librgw_la_SOURCES = \
        rgw/librgw.cc \
        rgw/rgw_xml.cc \
        rgw/rgw_xml_enc.cc \
-       rgw/rgw_website.cc
+       rgw/rgw_website.cc \
+        rgw/rgw_torrent.cc
 
 if WITH_OPENLDAP
 librgw_la_SOURCES += rgw/rgw_ldap.cc
@@ -269,6 +270,7 @@ noinst_HEADERS += \
        rgw/rgw_civetweb_log.h \
        rgw/rgw_website.h \
        rgw/rgw_rest_s3website.h \
+        rgw/rgw_torrent.h \
        civetweb/civetweb.h \
        civetweb/include/civetweb.h \
        civetweb/include/civetweb_conf.h \
index b3833e1e2e8292c852e2fd99bb8ffd882c5cd4b2..f4f0760909209f0644d97b845f3afe3c8026521e 100644 (file)
@@ -1234,6 +1234,7 @@ int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len)
     gc_invalidate_time = start_time;
     gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
   }
+
   return send_response_data(bl, bl_ofs, bl_len);
 }
 
@@ -1333,6 +1334,28 @@ void RGWGetObj::execute()
   if (op_ret < 0)
     goto done_err;
 
+  /* start gettorrent */
+  if (torrent.get_flag())
+  {
+    torrent.init(s, store);
+    torrent.get_torrent_file(op_ret, read_op, total_len, bl, obj);
+    if (op_ret < 0)
+    {
+      ldout(s->cct, 0) << "ERROR: failed to get_torrent_file ret= " << op_ret
+                       << dendl;
+      goto done_err;
+    }
+    op_ret = send_response_data(bl, 0, total_len);
+    if (op_ret < 0)
+    {
+      ldout(s->cct, 0) << "ERROR: failed to send_response_data ret= " << op_ret 
+                       << dendl;
+      goto done_err;
+    }
+    return;
+  }
+  /* end gettorrent */
+
   attr_iter = attrs.find(RGW_ATTR_USER_MANIFEST);
   if (attr_iter != attrs.end() && !skip_manifest) {
     op_ret = handle_user_manifest(attr_iter->second.c_str());
@@ -2676,6 +2699,9 @@ void RGWPutObj::execute()
       len = data.length();
     }
 
+    /* save data for producing torrent data */
+    torrent.save_data(data_in);
+
     /* do we need this operation to be synchronous? if we're dealing with an object with immutable
      * head, e.g., multipart object we need to make sure we're the first one writing to this object
      */
@@ -2828,6 +2854,18 @@ void RGWPutObj::execute()
 
   op_ret = processor->complete(etag, &mtime, real_time(), attrs, delete_at,
                              if_match, if_nomatch);
+  /* produce torrent */
+  if (ofs == torrent.get_data_len())
+  {
+    torrent.init(s, store);
+    torrent.set_create_date(mtime);
+    op_ret =  torrent.handle_data();
+    if (0 != op_ret)
+    {
+      ldout(s->cct, 0) << "ERROR: torrent.handle_data() returned " << op_ret << dendl;
+      goto done;
+    }
+  }
 
 done:
   dispose_processor(processor);
index 49018f457c78f2048a5148b41b957c6d49f4ae41..8d201a27963e4b55335270ae674e18914ca7993f 100644 (file)
 #include "rgw_acl.h"
 #include "rgw_cors.h"
 #include "rgw_quota.h"
+#include "rgw_torrent.h"
 
 #include "include/assert.h"
 
 using namespace std;
+using ceph::crypto::SHA1;
 
 struct req_state;
 class RGWHandler;
@@ -103,6 +105,7 @@ RGWOp() : s(nullptr), dialect_handler(nullptr), store(nullptr),
 
 class RGWGetObj : public RGWOp {
 protected:
+  seed torrent; // get torrent
   const char *range_str;
   const char *if_mod;
   const char *if_unmod;
@@ -642,6 +645,7 @@ class RGWPutObj : public RGWOp {
   friend class RGWPutObjProcessor;
 
 protected:
+  seed torrent;
   off_t ofs;
   const char *supplied_md5_b64;
   const char *supplied_etag;
index 655bbd290e59e7ed9cdda698cd147810db4a51b1..59c96a77d143abdbd84539d268435292abcf3570 100644 (file)
@@ -790,6 +790,19 @@ int RGWGetObj_ObjStore::get_params()
     mod_pg_ver = s->info.env->get_int("HTTP_DEST_PG_VER", 0);
   }
 
+  /* start gettorrent */
+  const char *query_str = s->info.env->get("QUERY_STRING");
+  if (0 == strcmp(query_str, GET_TORRENT))
+  {
+    int ret = 0;
+    ret = torrent.get_params();
+    if (ret < 0)
+    {
+      return ret;
+    }
+  }
+  /* end gettorrent */
+
   return 0;
 }
 
@@ -1000,6 +1013,15 @@ int RGWPutObj_ObjStore::verify_params()
 
 int RGWPutObj_ObjStore::get_params()
 {
+  /* start gettorrent */
+  int ret = 0;
+  ret = torrent.get_params();
+  if (ret < 0)
+  {
+    return ret;
+  }
+  torrent.set_info_name((s->object).name);
+  /* end gettorrent */
   supplied_md5_b64 = s->info.env->get("HTTP_CONTENT_MD5");
 
   return 0;
diff --git a/src/rgw/rgw_torrent.cc b/src/rgw/rgw_torrent.cc
new file mode 100644 (file)
index 0000000..77fd971
--- /dev/null
@@ -0,0 +1,423 @@
+#include <errno.h>
+#include <stdlib.h>
+
+#include <sstream>
+
+#include "rgw_torrent.h"
+#include "include/rados/librados.hpp"
+
+#define dout_subsys ceph_subsys_rgw
+
+using namespace std;
+using ceph::crypto::MD5;
+using namespace librados;
+using namespace boost;
+using ceph::crypto::SHA1;
+
+seed::seed()
+{
+  seed::info.piece_length = 0;
+  seed::info.len = 0;
+  sha_len = 0;
+  is_torrent = false; 
+}
+
+seed::~seed()
+{
+  seed::info.sha1_bl.clear();
+  bl.clear();
+  torrent_bl.clear();
+  s = NULL;
+  store = NULL;
+}
+
+void seed::init(struct req_state *p_req, RGWRados *p_store)
+{
+  s = p_req;
+  store = p_store;
+}
+
+void seed::get_torrent_file(int &op_ret, RGWRados::Object::Read &read_op, uint64_t &total_len, 
+  bufferlist &bl_data, rgw_obj &obj)
+{
+  string oid, key;
+  rgw_bucket bucket;
+  map<string, bufferlist> m;
+  set<string> obj_key;
+  get_obj_bucket_and_oid_loc(obj, bucket, oid, key);
+  ldout(s->cct, 0) << "NOTICE: head obj oid= " << oid << dendl;
+
+  obj_key.insert(RGW_OBJ_TORRENT);
+  op_ret = read_op.state.io_ctx.omap_get_vals_by_keys(oid, obj_key, &m);
+  if (op_ret < 0)
+  {
+    ldout(s->cct, 0) << "ERROR: failed to omap_get_vals_by_keys op_ret = " << op_ret << dendl;
+    return;
+  }
+
+  map<string, bufferlist>::iterator iter;
+  for (iter = m.begin(); iter != m.end(); ++iter)
+  {
+    bufferlist bl_tmp = iter->second;
+    char *pbuff = bl_tmp.c_str();
+    bl.append(pbuff, bl_tmp.length());
+  }
+
+  bl_data = bl;
+  total_len = bl.length();
+  return;
+}
+
+bool seed::get_flag()
+{
+  return is_torrent;
+}
+
+void seed::save_data(bufferlist &bl)
+{
+  info.len += bl.length();
+  torrent_bl.push_back(bl);
+}
+
+off_t seed::get_data_len()
+{
+  return info.len;
+}
+
+void seed::set_create_date(ceph::real_time value)
+{
+  utime_t date = ceph::real_clock::to_timespec(value);
+  create_date = date.sec();
+}
+
+void seed::set_info_pieces(char *buff)
+{
+  info.sha1_bl.append(buff, CEPH_CRYPTO_SHA1_DIGESTSIZE);
+}
+
+void seed::set_info_name(string value)
+{
+  info.name = value;
+}
+
+void seed::sha1(SHA1 *h, bufferlist &bl, off_t bl_len)
+{
+  off_t num = bl_len/info.piece_length;
+  off_t remain = 0;
+  remain = bl_len%info.piece_length;
+
+  char *pstr = bl.c_str();
+  char sha[25];
+
+  /* get sha1 */
+  for (off_t i = 0; i < num; i++)
+  {
+    memset(sha, 0x00, sizeof(sha));
+    h->Update((byte *)pstr, info.piece_length);
+    h->Final((byte *)sha);
+    set_info_pieces(sha);
+    pstr += info.piece_length;
+  }
+
+  /* process remain */
+  if (0 != remain)
+  {
+    memset(sha, 0x00, sizeof(sha));
+    h->Update((byte *)pstr, remain);
+    h->Final((byte *)sha);
+    set_info_pieces(sha);
+  }
+}
+
+int seed::sha1_process()
+{
+  uint64_t remain = info.len%info.piece_length;
+  uint8_t  remain_len = ((remain > 0)? 1 : 0);
+  sha_len = (info.len/info.piece_length + remain_len)*CEPH_CRYPTO_SHA1_DIGESTSIZE;
+
+  SHA1 h;
+  list<bufferlist>::iterator iter = torrent_bl.begin();
+  for (; iter != torrent_bl.end(); iter++)
+  {
+    bufferlist &bl_info = *iter;
+    sha1(&h, bl_info, (*iter).length());
+  }
+
+  return 0;
+}
+
+int seed::handle_data()
+{
+  int ret = 0;
+
+  /* sha1 process */
+  ret = sha1_process();
+  if (0 != ret)
+  {
+    ldout(s->cct, 0) << "ERROR: failed to sha1_process() ret= "<< ret << dendl;
+    return ret;
+  }
+
+  /* produce torrent data */
+  do_encode();
+
+  /* save torrent data into OMAP */
+  ret = save_torrent_file();
+  if (0 != ret)
+  {
+    ldout(s->cct, 0) << "ERROR: failed to save_torrent_file() ret= "<< ret << dendl;
+    return ret;
+  }
+
+  return 0;
+}
+
+int seed::get_params()
+{
+  is_torrent = true;
+  info.piece_length = g_conf->rgw_torrent_sha_unit;
+  create_by = g_conf->rgw_torrent_createby;
+  encoding = g_conf->rgw_torrent_encoding;
+  origin = g_conf->rgw_torrent_origin;
+  comment = g_conf->rgw_torrent_comment;
+  announce = g_conf->rgw_torrent_tracker;
+
+  /* tracker and tracker list is empty, set announce to origin */
+  if (announce.empty() && !origin.empty())
+  {
+    announce = origin;
+  }
+
+  return 0;
+}
+
+void seed::set_announce()
+{
+  const char *pstr = announce.c_str();
+  char *pstart = const_cast<char *>(pstr);
+  uint64_t len = announce.length();
+  char *pend = pstart + len;
+  char *location;
+  char flag = ',';
+
+  list<string> announce_list;  // used to get announce list from conf
+  while (pstart < pend)
+  {
+    location = NULL;
+    location = strchr(pstart, flag);
+    if (!location)
+    {
+      string ann(pstart);
+      announce_list.push_back(ann);
+      break;
+    }
+
+    char temp[100] = { 0 };
+    snprintf(temp, location - pstart + 1, "%s", pstart);
+    string anno(temp);
+    announce_list.push_back(anno);
+    pstart = location + 1;
+  }
+
+  list<string>::iterator iter = announce_list.begin();
+  do_flush_bufflist(BE_STR, &(*iter), ANNOUNCE);
+  iter++;
+
+  do_flush_bufflist(BE_STR, NULL, ANNOUNCE_LIST);
+  get_type(BE_LIST);
+  for (; iter != announce_list.end(); ++iter)
+  {
+    get_type(BE_LIST);
+    do_flush_bufflist(BE_STR, &(*iter), "");
+    add_e();
+  }
+  add_e();
+}
+
+void seed::do_encode()
+{
+  get_type(BE_DICT);
+  set_announce();
+
+  if (!comment.empty())
+  {
+    do_flush_bufflist(BE_STR, &comment, COMMENT);
+  }
+  if (!create_by.empty())
+  {
+    do_flush_bufflist(BE_STR, &create_by, CREATED_BY);
+  }
+  do_flush_bufflist(BE_TIME_T, &create_date, CREATION_DATE);
+  if (!encoding.empty())
+  {
+    do_flush_bufflist(BE_STR, &encoding, ENCODING);
+  }
+  
+  do_flush_bufflist(BE_STR, NULL, INFO_PIECES);
+  get_type(BE_DICT);
+  do_flush_bufflist(BE_INT64, &info.len, LENGTH);
+  do_flush_bufflist(BE_STR, &info.name, NAME);
+  do_flush_bufflist(BE_INT64, &info.piece_length, PIECE_LENGTH);
+  do_flush_bufflist(BE_STR, &info.sha1_bl, PIECES);
+  add_e();
+
+  add_e();
+}
+
+void seed::add_e()
+{
+  const char *p = "e";
+  bl.append(p, 1);
+}
+
+void seed::get_type(be_type type)
+{
+  switch(type)
+  {
+    case BE_STR:
+    {
+      return;
+    }
+    case BE_INT:
+    case BE_INT64:
+    case BE_TIME_T:
+    {
+      const char *p = "i";
+      bl.append(p, 1);
+      break;
+    }
+    case BE_LIST:
+    {
+      const char *p = "l";
+      bl.append(p, 1);
+      break;
+    }
+    case BE_DICT:
+    {
+      const char *p = "d";
+      bl.append(p, 1);
+      break;
+    }
+  }
+}
+
+void seed::do_flush_bufflist(be_type type, void* src_data, const char *field)
+{
+  uint64_t totalLen = 0;
+  uint64_t field_len_Len = 0;
+  char cfield_len[100] = { 0 };
+  int field_len = 0;
+  field_len = strlen(field);
+
+  if (0 != field_len)
+  {
+    totalLen += field_len;
+    sprintf(cfield_len, "%d", field_len); 
+    field_len_Len = strlen(cfield_len);
+    totalLen += field_len_Len;
+    char info[100] = { 0 };
+    sprintf(info, "%d:%s", field_len, field);
+    bl.append(info, totalLen + 1);
+    get_type(type);
+  }
+  else
+  {
+    get_type(type);
+  }
+
+  if (NULL == src_data)
+  {
+    return;
+  }
+
+  switch (type)
+  {
+    case BE_STR:
+    {
+      flush_bufflist(type, src_data, field);
+      break;
+    }
+
+    case BE_INT:
+    case BE_INT64:
+    case BE_TIME_T:
+    {
+      flush_bufflist(type, src_data);
+      break;
+    }
+
+    default:
+    {
+      break;
+    }
+  }
+}
+
+void seed::flush_bufflist(be_type type, void *src_data, const char *field)
+{
+  char infolen[100] = { 0 };
+  switch (type)
+  {
+    case BE_STR:
+    {
+      /* set pieces value */
+      if (0 == strcmp(PIECES, field))
+      {
+        sprintf(infolen, "%ld:", sha_len); 
+        bl.append(infolen, strlen(infolen));
+        bl.append(info.sha1_bl.c_str(), sha_len);
+        break;
+      }
+
+      const char *pstr = (static_cast<string *>(src_data))->c_str();
+      int len = strlen(pstr);
+      sprintf(infolen, "%d:", len); 
+      bl.append(infolen, strlen(infolen));
+      bl.append(pstr, len);
+      break;
+    }
+    case BE_INT:
+    case BE_INT64:
+    case BE_TIME_T:
+    {
+      if (type == BE_INT)
+      {
+        int *p = static_cast<int *>(src_data);
+        sprintf(infolen, "%d", *p); 
+      }
+      else if (type == BE_INT64)
+      {
+        uint64_t *p = static_cast<uint64_t *>(src_data);
+        sprintf(infolen, "%ld", *p); 
+      }
+      else
+      {
+        long *p = static_cast<long *>(src_data);
+        sprintf(infolen, "%ld", *p); 
+      }
+      bl.append(infolen, strlen(infolen));
+      add_e();
+      break;
+    }
+    default:
+    {
+      break;
+    }
+  }
+}
+
+int seed::save_torrent_file()
+{
+  int op_ret = 0;
+  string key = RGW_OBJ_TORRENT;
+  rgw_obj obj(s->bucket, s->object.name);    
+
+  op_ret = store->omap_set(obj, key, bl);
+  if (op_ret < 0)
+  {
+    ldout(s->cct, 0) << "ERROR: failed to omap_set() op_ret = " << op_ret << dendl;
+    return op_ret;
+  }
+
+  return op_ret;
+}
diff --git a/src/rgw/rgw_torrent.h b/src/rgw/rgw_torrent.h
new file mode 100644 (file)
index 0000000..7412cac
--- /dev/null
@@ -0,0 +1,100 @@
+#ifndef CEPH_RGW_TORRENT_H
+#define CEPH_RGW_TORRENT_H
+
+#include <string>
+#include <list>
+#include <map>
+#include <set>
+
+#include "common/ceph_time.h"
+
+#include "rgw_rados.h"
+#include "rgw_common.h"
+
+using namespace std;
+using ceph::crypto::SHA1;
+
+struct req_state;
+
+#define RGW_OBJ_TORRENT    "rgw.torrent"
+
+#define ANNOUNCE           "announce"
+#define ANNOUNCE_LIST      "announce-list"
+#define COMMENT            "comment"
+#define CREATED_BY         "created by"
+#define CREATION_DATE      "creation date"
+#define ENCODING           "encoding"
+#define LENGTH             "length"
+#define NAME               "name"
+#define PIECE_LENGTH       "piece length"
+#define PIECES             "pieces"
+#define INFO_PIECES        "info"
+#define GET_TORRENT        "get_torrent"
+
+typedef enum
+{
+  BE_STR,
+  BE_INT,
+  BE_INT64,
+  BE_TIME_T,
+  BE_LIST,
+  BE_DICT,
+} be_type;
+
+/* torrent file struct */
+class seed
+{
+private:
+  struct
+  {
+    long piece_length;    // each piece length
+    bufferlist sha1_bl;  // save sha1
+    string name;    // file name
+    off_t len;    // file total bytes
+  }info;
+
+  string  announce;    // tracker
+  string origin; // origin
+  time_t create_date;    // time of the file created
+  string comment;  // comment
+  string create_by;    // app name and version
+  string encoding;    // if encode use gbk rather than gtf-8 use this field
+  uint64_t sha_len;  // sha1 length
+  bool is_torrent;  // flag
+  bufferlist bl;  // bufflist ready to send
+  list<bufferlist> torrent_bl;   // meate data
+
+  struct req_state *s;
+  RGWRados *store;
+
+public:
+  seed();
+  ~seed();
+
+  int get_params();
+  void init(struct req_state *p_req, RGWRados *p_store);
+  void get_torrent_file(int &op_ret, RGWRados::Object::Read &read_op, 
+    uint64_t &total_len, bufferlist &bl_data, rgw_obj &obj);
+  
+  off_t get_data_len();
+  bool get_flag();
+
+  int handle_data();
+  void save_data(bufferlist &bl);
+  void set_create_date(ceph::real_time value);
+  void set_info_name(string value); 
+
+private:
+  void add_e();
+  void do_encode ();
+  void set_announce();
+  void set_exist(bool exist);
+  void get_type(be_type type);
+  void set_info_pieces(char *buff);
+  int sha1_process();
+  void sha1(SHA1 *h, bufferlist &bl, off_t bl_len);
+  void flush_bufflist(be_type type, void *src_data, const char *field = NULL);
+  void do_flush_bufflist(be_type type, void* src_data, const char *field);
+  int save_torrent_file();
+};
+#endif /* CEPH_RGW_TORRENT_H */