From: J. Eric Ivancich Date: Tue, 28 Jun 2022 19:35:18 +0000 (-0400) Subject: rgw: implement initial flight server functionality X-Git-Tag: v18.1.0~508^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=eac2905d9f64e174570ee25129112ed7ed53993c;p=ceph.git rgw: implement initial flight server functionality Implements the ability for a flight to be created when the object is retrieved by an S3 get. Adds FlightServer abilities ListFlights, GetFlightInfo, GetSchema, and DoGet. Adds an interface for a store for flight information and adds an in-memory implemtation of it. This code is functionality is early-stage and lacks some planned efficiencies. Signed-off-by: J. Eric Ivancich --- diff --git a/src/rgw/rgw_appmain.cc b/src/rgw/rgw_appmain.cc index 77e29d5595458..361f622b992a7 100644 --- a/src/rgw/rgw_appmain.cc +++ b/src/rgw/rgw_appmain.cc @@ -440,7 +440,7 @@ int rgw::AppMain::init_frontends2(RGWLib* rgwlib) #ifdef WITH_ARROW_FLIGHT int port; config->get_val("port", 8077, &port); - fe = new rgw::flight::FlightFrontend(cct, config, store, port); + fe = new rgw::flight::FlightFrontend(env, config, port); #else derr << "WARNING: arrow_flight frontend requested, but not included in build; skipping" << dendl; continue; diff --git a/src/rgw/rgw_flight.cc b/src/rgw/rgw_flight.cc index be0a17c8f3200..2299b7412858f 100644 --- a/src/rgw/rgw_flight.cc +++ b/src/rgw/rgw_flight.cc @@ -2,12 +2,22 @@ // vim: ts=8 sw=2 smarttab ft=cpp #include +#include #include #include +#include #include "arrow/type.h" +#include "arrow/buffer.h" +#include "arrow/util/string_view.h" +#include "arrow/io/interfaces.h" +#include "arrow/ipc/reader.h" +#include "arrow/table.h" + #include "arrow/flight/server.h" +#include "parquet/arrow/reader.h" + #include "common/dout.h" #include "rgw_op.h" @@ -15,117 +25,692 @@ #include "rgw_flight_frontend.h" -#define dout_subsys ceph_subsys_rgw_flight +namespace rgw::flight { -#define INFO_F(dp) ldpp_dout(&dp, 20) << "INFO: " << __func__ << ": " -#define STATUS_F(dp) ldpp_dout(&dp, 10) << "STATUS: " << __func__ << ": " -#define WARN_F(dp) ldpp_dout(&dp, 0) << "WARNING: " << __func__ << ": " -#define ERROR_F(dp) ldpp_dout(&dp, 0) << "ERROR: " << __func__ << ": " +// Ticket and FlightKey -#define INFO INFO_F(dp) -#define STATUS STATUS_F(dp) -#define WARN WARN_F(dp) -#define ERROR ERROR_F(dp) +std::atomic next_flight_key = null_flight_key; +flt::Ticket FlightKeyToTicket(const FlightKey& key) { + flt::Ticket result; + result.ticket = std::to_string(key); + return result; +} -namespace rgw::flight { +arw::Result TicketToFlightKey(const flt::Ticket& t) { + try { + return (FlightKey) std::stoul(t.ticket); + } catch (std::invalid_argument const& ex) { + return arw::Status::Invalid( + "could not convert Ticket containing \"%s\" into a Flight Key", + t.ticket); + } catch (const std::out_of_range& ex) { + return arw::Status::Invalid( + "could not convert Ticket containing \"%s\" into a Flight Key due to range", + t.ticket); + } +} + +// FlightData + +FlightData::FlightData(const std::string& _uri, + const std::string& _tenant_name, + const std::string& _bucket_name, + const rgw_obj_key& _object_key, + uint64_t _num_records, + uint64_t _obj_size, + std::shared_ptr& _schema, + std::shared_ptr& _kv_metadata, + rgw_user _user_id) : + key(++next_flight_key), + /* expires(coarse_real_clock::now() + lifespan), */ + uri(_uri), + tenant_name(_tenant_name), + bucket_name(_bucket_name), + object_key(_object_key), + num_records(_num_records), + obj_size(_obj_size), + schema(_schema), + kv_metadata(_kv_metadata), + user_id(_user_id) +{ } + +/**** FlightStore ****/ + +FlightStore::FlightStore(const DoutPrefix& _dp) : + dp(_dp) +{ } + +FlightStore::~FlightStore() { } - std::atomic next_flight_key = 0; +/**** MemoryFlightStore ****/ - FlightData::FlightData(const req_state* state) : - key(next_flight_key++), - expires(coarse_real_clock::now() + lifespan) +MemoryFlightStore::MemoryFlightStore(const DoutPrefix& _dp) : + FlightStore(_dp) +{ } + +MemoryFlightStore::~MemoryFlightStore() { } + +FlightKey MemoryFlightStore::add_flight(FlightData&& flight) { + std::pair result; { -#if 0 - bucket = new rgw::sal::Bucket(*state->bucket); -#endif + const std::lock_guard lock(mtx); + result = map.insert( {flight.key, std::move(flight)} ); } + ceph_assertf(result.second, + "unable to add FlightData to MemoryFlightStore"); // temporary until error handling - FlightStore::~FlightStore() { - // empty + return result.first->second.key; +} + +arw::Result MemoryFlightStore::get_flight(const FlightKey& key) const { + const std::lock_guard lock(mtx); + auto i = map.find(key); + if (i == map.cend()) { + return arw::Status::KeyError("could not find Flight with Key %" PRIu32, + key); + } else { + return i->second; } +} - MemoryFlightStore::~MemoryFlightStore() { - // empty +// returns either the next FilghtData or, if at end, empty optional +std::optional MemoryFlightStore::after_key(const FlightKey& key) const { + std::optional result; + { + const std::lock_guard lock(mtx); + auto i = map.upper_bound(key); + if (i != map.end()) { + result = i->second; + } } + return result; +} + +int MemoryFlightStore::remove_flight(const FlightKey& key) { + return 0; +} + +int MemoryFlightStore::expire_flights() { + return 0; +} + +/**** FlightServer ****/ + +FlightServer::FlightServer(RGWProcessEnv& _env, + FlightStore* _flight_store, + const DoutPrefix& _dp) : + env(_env), + driver(env.driver), + dp(_dp), + flight_store(_flight_store) +{ } + +FlightServer::~FlightServer() +{ } - FlightKey MemoryFlightStore::add_flight(FlightData&& flight) { - FlightKey key = flight.key; - auto p = map.insert( {key, flight} ); - ceph_assertf(p.second, - "unable to add FlightData to MemoryFlightStore"); // temporary until error handling +arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context, + const flt::Criteria* criteria, + std::unique_ptr* listings) { - return key; + // function local class to implement FlightListing interface + class RGWFlightListing : public flt::FlightListing { + + FlightStore* flight_store; + FlightKey previous_key; + + public: + + RGWFlightListing(FlightStore* flight_store) : + flight_store(flight_store), + previous_key(null_flight_key) + { } + + arw::Status Next(std::unique_ptr* info) { + std::optional fd = flight_store->after_key(previous_key); + if (fd) { + previous_key = fd->key; + auto descriptor = + flt::FlightDescriptor::Path( + { fd->tenant_name, fd->bucket_name, fd->object_key.name, fd->object_key.instance, fd->object_key.ns }); + flt::FlightEndpoint endpoint; + endpoint.ticket = FlightKeyToTicket(fd->key); + std::vector endpoints { endpoint }; + + ARROW_ASSIGN_OR_RAISE(flt::FlightInfo info_obj, + flt::FlightInfo::Make(*fd->schema, descriptor, endpoints, fd->num_records, fd->obj_size)); + *info = std::make_unique(std::move(info_obj)); + return arw::Status::OK(); + } else { + *info = nullptr; + return arw::Status::OK(); + } + } + }; // class RGWFlightListing + + *listings = std::make_unique(flight_store); + return arw::Status::OK(); +} // FlightServer::ListFlights + + +arw::Status FlightServer::GetFlightInfo(const flt::ServerCallContext &context, + const flt::FlightDescriptor &request, + std::unique_ptr *info) { + return arw::Status::OK(); +} // FlightServer::GetFlightInfo + + +arw::Status FlightServer::GetSchema(const flt::ServerCallContext &context, + const flt::FlightDescriptor &request, + std::unique_ptr *schema) { + return arw::Status::OK(); +} // FlightServer::GetSchema + + // A Buffer that owns its memory and frees it when the Buffer is + // destructed +class OwnedBuffer : public arw::Buffer { + + uint8_t* buffer; + +protected: + + OwnedBuffer(uint8_t* _buffer, int64_t _size) : + Buffer(_buffer, _size), + buffer(_buffer) + { } + +public: + + ~OwnedBuffer() override { + delete[] buffer; } - // int MemoryFlightStore::add_flight(const FlightKey& key) { return 0; } - int MemoryFlightStore::get_flight(const FlightKey& key) { return 0; } - int MemoryFlightStore::remove_flight(const FlightKey& key) { return 0; } - int MemoryFlightStore::expire_flights() { return 0; } - FlightServer::FlightServer(boost::intrusive_ptr& _cct, - rgw::sal::Store* _store, - FlightStore* _flight_store) : - cct(_cct), - dp(cct.get(), dout_subsys, "rgw arrow_flight: "), - store(_store), - flight_store(_flight_store) - { - INFO << "FlightServer constructed" << dendl; + static arw::Result> make(int64_t size) { + uint8_t* buffer = new (std::nothrow) uint8_t[size]; + if (!buffer) { + return arw::Status::OutOfMemory("could not allocated buffer of size %" PRId64, size); + } + + OwnedBuffer* ptr = new OwnedBuffer(buffer, size); + std::shared_ptr result; + result.reset(ptr); + return result; } - FlightServer::~FlightServer() - { - INFO << "FlightServer destructed" << dendl; + // if what's read in is less than capacity + void set_size(int64_t size) { + size_ = size; } + // pointer that can be used to write into buffer + uint8_t* writeable_data() { + return buffer; + } +}; // class OwnedBuffer + +#if 0 // remove classes used for testing and incrementally building + +// make local to DoGet eventually +class LocalInputStream : public arw::io::InputStream { + + std::iostream::pos_type position; + std::fstream file; + std::shared_ptr kv_metadata; + const DoutPrefix dp; -class RGWFlightListing : public flt::FlightListing { public: - RGWFlightListing() { -#if 0 - const int64_t total_records = 2; - const int64_t total_bytes = 2048; - const std::vector endpoints; - const auto descriptor = flt::FlightDescriptor::Command("fake-cmd"); - arw::FieldVector fields; - const arw::Schema schema(fields, nullptr); - auto info1 = flt::FlightInfo::Make(schema, descriptor, endpoints, total_records, total_bytes); -#endif + LocalInputStream(std::shared_ptr _kv_metadata, + const DoutPrefix _dp) : + kv_metadata(_kv_metadata), + dp(_dp) + {} + + arw::Status Open() { + file.open("/tmp/green_tripdata_2022-04.parquet", std::ios::in); + if (!file.good()) { + return arw::Status::IOError("unable to open file"); + } + + INFO << "file opened successfully" << dendl; + position = file.tellg(); + return arw::Status::OK(); } - arw::Status Next(std::unique_ptr* info) { - *info = nullptr; + arw::Status Close() override { + file.close(); + INFO << "file closed" << dendl; return arw::Status::OK(); } -}; + arw::Result Tell() const override { + if (position < 0) { + return arw::Status::IOError( + "could not query file implementaiton with tellg"); + } else { + return int64_t(position); + } + } + + bool closed() const override { + return file.is_open(); + } + + arw::Result Read(int64_t nbytes, void* out) override { + INFO << "entered: asking for " << nbytes << " bytes" << dendl; + if (file.read(reinterpret_cast(out), + reinterpret_cast(nbytes))) { + const std::streamsize bytes_read = file.gcount(); + INFO << "Point A: read bytes " << bytes_read << dendl; + position = file.tellg(); + return bytes_read; + } else { + ERROR << "unable to read from file" << dendl; + return arw::Status::IOError("unable to read from offset %" PRId64, + int64_t(position)); + } + } + + arw::Result> Read(int64_t nbytes) override { + INFO << "entered: " << ": asking for " << nbytes << " bytes" << dendl; + + std::shared_ptr buffer; + ARROW_ASSIGN_OR_RAISE(buffer, OwnedBuffer::make(nbytes)); - arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context, - const flt::Criteria* criteria, - std::unique_ptr* listings) { - *listings = std::make_unique(); + if (file.read(reinterpret_cast(buffer->writeable_data()), + reinterpret_cast(nbytes))) { + const auto bytes_read = file.gcount(); + INFO << "Point B: read bytes " << bytes_read << dendl; + // buffer->set_size(bytes_read); + position = file.tellg(); + return buffer; + } else if (file.rdstate() & std::ifstream::failbit && + file.rdstate() & std::ifstream::eofbit) { + const auto bytes_read = file.gcount(); + INFO << "3 read bytes " << bytes_read << " and reached EOF" << dendl; + // buffer->set_size(bytes_read); + position = file.tellg(); + return buffer; + } else { + ERROR << "unable to read from file" << dendl; + return arw::Status::IOError("unable to read from offset %ld", position); + } + } + + arw::Result Peek(int64_t nbytes) override { + INFO << "called, not implemented" << dendl; + return arw::Status::NotImplemented("peek not currently allowed"); + } + + bool supports_zero_copy() const override { + return false; + } + + arw::Result> ReadMetadata() override { + INFO << "called" << dendl; + return kv_metadata; + } +}; // class LocalInputStream + +class LocalRandomAccessFile : public arw::io::RandomAccessFile { + + FlightData flight_data; + const DoutPrefix dp; + + std::iostream::pos_type position; + std::fstream file; + +public: + LocalRandomAccessFile(const FlightData& _flight_data, const DoutPrefix _dp) : + flight_data(_flight_data), + dp(_dp) + { } + + // implement InputStream + + arw::Status Open() { + file.open("/tmp/green_tripdata_2022-04.parquet", std::ios::in); + if (!file.good()) { + return arw::Status::IOError("unable to open file"); + } + + INFO << "file opened successfully" << dendl; + position = file.tellg(); return arw::Status::OK(); } - static FlightServer* fs; + arw::Status Close() override { + file.close(); + INFO << "file closed" << dendl; + return arw::Status::OK(); + } - void set_flight_server(FlightServer* _server) { - fs = _server; + arw::Result Tell() const override { + if (position < 0) { + return arw::Status::IOError( + "could not query file implementaiton with tellg"); + } else { + return int64_t(position); + } } - FlightServer* get_flight_server() { - return fs; + bool closed() const override { + return file.is_open(); } - FlightStore* get_flight_store() { - return fs ? fs->get_flight_store() : nullptr; + arw::Result Read(int64_t nbytes, void* out) override { + INFO << "entered: asking for " << nbytes << " bytes" << dendl; + if (file.read(reinterpret_cast(out), + reinterpret_cast(nbytes))) { + const std::streamsize bytes_read = file.gcount(); + INFO << "Point A: read bytes " << bytes_read << dendl; + position = file.tellg(); + return bytes_read; + } else { + ERROR << "unable to read from file" << dendl; + return arw::Status::IOError("unable to read from offset %" PRId64, + int64_t(position)); + } } - FlightKey propose_flight(const req_state* request) { - FlightKey key = get_flight_store()->add_flight(FlightData(request)); - return key; + arw::Result> Read(int64_t nbytes) override { + INFO << "entered: asking for " << nbytes << " bytes" << dendl; + + std::shared_ptr buffer; + ARROW_ASSIGN_OR_RAISE(buffer, OwnedBuffer::make(nbytes)); + + if (file.read(reinterpret_cast(buffer->writeable_data()), + reinterpret_cast(nbytes))) { + const auto bytes_read = file.gcount(); + INFO << "Point B: read bytes " << bytes_read << dendl; + // buffer->set_size(bytes_read); + position = file.tellg(); + return buffer; + } else if (file.rdstate() & std::ifstream::failbit && + file.rdstate() & std::ifstream::eofbit) { + const auto bytes_read = file.gcount(); + INFO << "3 read bytes " << bytes_read << " and reached EOF" << dendl; + // buffer->set_size(bytes_read); + position = file.tellg(); + return buffer; + } else { + ERROR << "unable to read from file" << dendl; + return arw::Status::IOError("unable to read from offset %ld", position); + } + } + + bool supports_zero_copy() const override { + return false; + } + + // implement Seekable + + arw::Result GetSize() override { + return flight_data.obj_size; } + arw::Result Peek(int64_t nbytes) override { + std::iostream::pos_type here = file.tellg(); + if (here == -1) { + return arw::Status::IOError( + "unable to determine current position ahead of peek"); + } + + ARROW_ASSIGN_OR_RAISE(OwningStringView result, + OwningStringView::make(nbytes)); + + // read + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, + Read(nbytes, (void*) result.writeable_data())); + (void) bytes_read; // silence unused variable warnings + + // return offset to original + ARROW_RETURN_NOT_OK(Seek(here)); + + return result; + } + + arw::Result> ReadMetadata() { + return flight_data.kv_metadata; + } + + arw::Future> ReadMetadataAsync( + const arw::io::IOContext& io_context) override { + return arw::Future>::MakeFinished(ReadMetadata()); + } + + // implement Seekable interface + + arw::Status Seek(int64_t position) { + file.seekg(position); + if (file.fail()) { + return arw::Status::IOError( + "error encountered during seek to %" PRId64, position); + } else { + return arw::Status::OK(); + } + } +}; // class LocalRandomAccessFile +#endif + +class RandomAccessObject : public arw::io::RandomAccessFile { + + FlightData flight_data; + const DoutPrefix dp; + + int64_t position; + bool is_closed; + std::unique_ptr op; + +public: + + RandomAccessObject(const FlightData& _flight_data, + std::unique_ptr& obj, + const DoutPrefix _dp) : + flight_data(_flight_data), + dp(_dp), + position(-1), + is_closed(false) + { + op = obj->get_read_op(); + } + + arw::Status Open() { + int ret = op->prepare(null_yield, &dp); + if (ret < 0) { + return arw::Status::IOError( + "unable to prepare object with error %d", ret); + } + INFO << "file opened successfully" << dendl; + position = 0; + return arw::Status::OK(); + } + + // implement InputStream + + arw::Status Close() override { + position = -1; + is_closed = true; + (void) op.reset(); + INFO << "object closed" << dendl; + return arw::Status::OK(); + } + + arw::Result Tell() const override { + if (position < 0) { + return arw::Status::IOError("could not determine position"); + } else { + return position; + } + } + + bool closed() const override { + return is_closed; + } + + arw::Result Read(int64_t nbytes, void* out) override { + INFO << "entered: asking for " << nbytes << " bytes" << dendl; + + if (position < 0) { + ERROR << "error, position indicated error" << dendl; + return arw::Status::IOError("object read op is in bad state"); + } + + // note: read function reads through end_position inclusive + int64_t end_position = position + nbytes - 1; + + bufferlist bl; + + const int64_t bytes_read = + op->read(position, end_position, bl, null_yield, &dp); + if (bytes_read < 0) { + const int64_t former_position = position; + position = -1; + ERROR << "read operation returned " << bytes_read << dendl; + return arw::Status::IOError( + "unable to read object at position %" PRId64 ", error code: %" PRId64, + former_position, + bytes_read); + } + + // TODO: see if there's a way to get rid of this copy, perhaps + // updating rgw::sal::read_op + bl.cbegin().copy(bytes_read, reinterpret_cast(out)); + + position += bytes_read; + + if (nbytes != bytes_read) { + INFO << "partial read: nbytes=" << nbytes << + ", bytes_read=" << bytes_read << dendl; + } + INFO << bytes_read << " bytes read" << dendl; + return bytes_read; + } + + arw::Result> Read(int64_t nbytes) override { + INFO << "entered: asking for " << nbytes << " bytes" << dendl; + + std::shared_ptr buffer; + ARROW_ASSIGN_OR_RAISE(buffer, OwnedBuffer::make(nbytes)); + + ARROW_ASSIGN_OR_RAISE(const int64_t bytes_read, + Read(nbytes, buffer->writeable_data())); + buffer->set_size(bytes_read); + + return buffer; + } + + bool supports_zero_copy() const override { + return false; + } + + // implement Seekable + + arw::Result GetSize() override { + INFO << "entered: " << flight_data.obj_size << " returned" << dendl; + return flight_data.obj_size; + } + + arw::Result Peek(int64_t nbytes) override { + INFO << "entered: " << nbytes << " bytes" << dendl; + + int64_t saved_position = position; + + ARROW_ASSIGN_OR_RAISE(OwningStringView buffer, + OwningStringView::make(nbytes)); + + ARROW_ASSIGN_OR_RAISE(const int64_t bytes_read, + Read(nbytes, (void*) buffer.writeable_data())); + + // restore position for a peek + position = saved_position; + + if (bytes_read < nbytes) { + // create new OwningStringView with moved buffer + return OwningStringView::shrink(std::move(buffer), bytes_read); + } else { + return buffer; + } + } + + arw::Result> ReadMetadata() { + return flight_data.kv_metadata; + } + + arw::Future> ReadMetadataAsync( + const arw::io::IOContext& io_context) override { + return arw::Future>::MakeFinished(ReadMetadata()); + } + + // implement Seekable interface + + arw::Status Seek(int64_t new_position) { + INFO << "entered: position: " << new_position << dendl; + if (position < 0) { + ERROR << "error, position indicated error" << dendl; + return arw::Status::IOError("object read op is in bad state"); + } else { + position = new_position; + return arw::Status::OK(); + } + } +}; // class RandomAccessObject + +arw::Status FlightServer::DoGet(const flt::ServerCallContext &context, + const flt::Ticket &request, + std::unique_ptr *stream) { + int ret; + + ARROW_ASSIGN_OR_RAISE(FlightKey key, TicketToFlightKey(request)); + ARROW_ASSIGN_OR_RAISE(FlightData fd, get_flight_store()->get_flight(key)); + + std::unique_ptr user = driver->get_user(fd.user_id); + if (user->empty()) { + INFO << "user is empty" << dendl; + } else { + // TODO: test what happens if user is not loaded + ret = user->load_user(&dp, null_yield); + if (ret < 0) { + ERROR << "load_user returned " << ret << dendl; + // TODO return something + } + INFO << "user is " << user->get_display_name() << dendl; + } + + std::unique_ptr bucket; + + ret = driver->get_bucket(&dp, &(*user), fd.tenant_name, fd.bucket_name, + &bucket, null_yield); + if (ret < 0) { + ERROR << "get_bucket returned " << ret << dendl; + // TODO return something + } + + std::unique_ptr object = bucket->get_object(fd.object_key); + + auto input = std::make_shared(fd, object, dp); + ARROW_RETURN_NOT_OK(input->Open()); + + std::unique_ptr reader; + ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, + arw::default_memory_pool(), + &reader)); + + std::shared_ptr table; + ARROW_RETURN_NOT_OK(reader->ReadTable(&table)); + + std::vector> batches; + arw::TableBatchReader batch_reader(*table); + ARROW_RETURN_NOT_OK(batch_reader.ReadAll(&batches)); + + ARROW_ASSIGN_OR_RAISE(auto owning_reader, + arw::RecordBatchReader::Make( + std::move(batches), table->schema())); + *stream = std::unique_ptr( + new flt::RecordBatchStream(owning_reader)); + + return arw::Status::OK(); +} // flightServer::DoGet + } // namespace rgw::flight diff --git a/src/rgw/rgw_flight.h b/src/rgw/rgw_flight.h index 1f4d37213762b..8f6c4ade7b783 100644 --- a/src/rgw/rgw_flight.h +++ b/src/rgw/rgw_flight.h @@ -14,88 +14,200 @@ #include "rgw_frontend.h" #include "arrow/type.h" #include "arrow/flight/server.h" +#include "arrow/util/string_view.h" #include "rgw_flight_frontend.h" -namespace arw = arrow; -namespace flt = arrow::flight; - -struct req_state; -namespace rgw::flight { +#define INFO_F(dp) ldpp_dout(&dp, 20) << "INFO: " << __func__ << ": " +#define STATUS_F(dp) ldpp_dout(&dp, 10) << "STATUS: " << __func__ << ": " +#define WARN_F(dp) ldpp_dout(&dp, 0) << "WARNING: " << __func__ << ": " +#define ERROR_F(dp) ldpp_dout(&dp, 0) << "ERROR: " << __func__ << ": " - static const coarse_real_clock::duration lifespan = std::chrono::hours(1); +#define INFO INFO_F(dp) +#define STATUS STATUS_F(dp) +#define WARN WARN_F(dp) +#define ERROR ERROR_F(dp) - struct FlightData { - FlightKey key; - coarse_real_clock::time_point expires; - rgw::sal::Bucket* bucket; -#if 0 - rgw::sal::Object object; - rgw::sal::User user; -#endif +namespace arw = arrow; +namespace flt = arrow::flight; - FlightData(const req_state* state); - }; +struct req_state; - // stores flights that have been created and helps expire them - class FlightStore { - public: - virtual ~FlightStore(); - virtual FlightKey add_flight(FlightData&& flight) = 0; - virtual int get_flight(const FlightKey& key) = 0; - virtual int remove_flight(const FlightKey& key) = 0; - virtual int expire_flights() = 0; - }; +namespace rgw::flight { - class MemoryFlightStore : public FlightStore { - std::map map; +static const coarse_real_clock::duration lifespan = std::chrono::hours(1); - public: +struct FlightData { + FlightKey key; + // coarse_real_clock::time_point expires; + std::string uri; + std::string tenant_name; + std::string bucket_name; + rgw_obj_key object_key; + // NB: what about object's namespace and instance? + uint64_t num_records; + uint64_t obj_size; + std::shared_ptr schema; + std::shared_ptr kv_metadata; - virtual ~MemoryFlightStore(); - FlightKey add_flight(FlightData&& flight) override; - int get_flight(const FlightKey& key) override; - int remove_flight(const FlightKey& key) override; - int expire_flights() override; - }; + rgw_user user_id; // TODO: this should be removed when we do + // proper flight authentication - class FlightServer : public flt::FlightServerBase { + FlightData(const std::string& _uri, + const std::string& _tenant_name, + const std::string& _bucket_name, + const rgw_obj_key& _object_key, + uint64_t _num_records, + uint64_t _obj_size, + std::shared_ptr& _schema, + std::shared_ptr& _kv_metadata, + rgw_user _user_id); +}; + +// stores flights that have been created and helps expire them +class FlightStore { + +protected: + + const DoutPrefix& dp; + +public: + + FlightStore(const DoutPrefix& dp); + virtual ~FlightStore(); + virtual FlightKey add_flight(FlightData&& flight) = 0; + + // TODO consider returning const shared pointers to FlightData in + // the following two functions + virtual arw::Result get_flight(const FlightKey& key) const = 0; + virtual std::optional after_key(const FlightKey& key) const = 0; + + virtual int remove_flight(const FlightKey& key) = 0; + virtual int expire_flights() = 0; +}; + +class MemoryFlightStore : public FlightStore { + std::map map; + mutable std::mutex mtx; // for map + +public: + + MemoryFlightStore(const DoutPrefix& dp); + virtual ~MemoryFlightStore(); + FlightKey add_flight(FlightData&& flight) override; + arw::Result get_flight(const FlightKey& key) const override; + std::optional after_key(const FlightKey& key) const override; + int remove_flight(const FlightKey& key) override; + int expire_flights() override; +}; + +class FlightServer : public flt::FlightServerBase { + + using Data1 = std::vector>; + + RGWProcessEnv& env; + rgw::sal::Driver* driver; + const DoutPrefix& dp; + FlightStore* flight_store; + + std::map data; + +public: + + static constexpr int default_port = 8077; + + FlightServer(RGWProcessEnv& env, + FlightStore* flight_store, + const DoutPrefix& dp); + ~FlightServer() override; + + FlightStore* get_flight_store() { + return flight_store; + } + + arw::Status ListFlights(const flt::ServerCallContext& context, + const flt::Criteria* criteria, + std::unique_ptr* listings) override; + + arw::Status GetFlightInfo(const flt::ServerCallContext &context, + const flt::FlightDescriptor &request, + std::unique_ptr *info) override; + + arw::Status GetSchema(const flt::ServerCallContext &context, + const flt::FlightDescriptor &request, + std::unique_ptr *schema) override; + + arw::Status DoGet(const flt::ServerCallContext &context, + const flt::Ticket &request, + std::unique_ptr *stream) override; +}; // class FlightServer + +class OwningStringView : public arw::util::string_view { + + uint8_t* buffer; + int64_t capacity; + int64_t consumed; - using Data1 = std::vector>; + OwningStringView(uint8_t* _buffer, int64_t _size) : + arw::util::string_view((const char*) _buffer, _size), + buffer(_buffer), + capacity(_size), + consumed(_size) + { } + + OwningStringView(OwningStringView&& from, int64_t new_size) : + buffer(nullptr), + capacity(from.capacity), + consumed(new_size) + { + // should be impossible due to static function check + ceph_assertf(consumed <= capacity, "new size cannot exceed capacity"); - boost::intrusive_ptr cct; - const DoutPrefix dp; - rgw::sal::Store* store; - FlightStore* flight_store; + std::swap(buffer, from.buffer); + from.capacity = 0; + from.consumed = 0; + } - std::map data; +public: - public: + OwningStringView(OwningStringView&&) = default; + OwningStringView& operator=(OwningStringView&&) = default; - static constexpr int default_port = 8077; + uint8_t* writeable_data() { + return buffer; + } - FlightServer(boost::intrusive_ptr& _cct, - rgw::sal::Store* _store, - FlightStore* _flight_store); - ~FlightServer() override; + ~OwningStringView() { + if (buffer) { + delete[] buffer; + } + } - FlightStore* get_flight_store() { - return flight_store; + static arw::Result make(int64_t size) { + uint8_t* buffer = new uint8_t[size]; + if (!buffer) { + return arw::Status::OutOfMemory("could not allocated buffer of size %" PRId64, size); + } + return OwningStringView(buffer, size); + } + + static arw::Result shrink(OwningStringView&& from, + int64_t new_size) { + if (new_size > from.capacity) { + return arw::Status::Invalid("new size cannot exceed capacity"); + } else { + return OwningStringView(std::move(from), new_size); } + } - arw::Status ListFlights(const flt::ServerCallContext& context, - const flt::Criteria* criteria, - std::unique_ptr* listings) override; - }; // class FlightServer +}; - // GLOBAL +// GLOBAL - void set_flight_server(FlightServer* _server); - FlightServer* get_flight_server(); - FlightStore* get_flight_store(); - FlightKey propose_flight(const req_state* request); +flt::Ticket FlightKeyToTicket(const FlightKey& key); +arw::Status TicketToFlightKey(const flt::Ticket& t, FlightKey& key); } // namespace rgw::flight diff --git a/src/rgw/rgw_flight_frontend.cc b/src/rgw/rgw_flight_frontend.cc index 1751e2515d33f..3da0f5d15e72a 100644 --- a/src/rgw/rgw_flight_frontend.cc +++ b/src/rgw/rgw_flight_frontend.cc @@ -2,102 +2,234 @@ // vim: ts=8 sw=2 smarttab ft=cpp +#include +#include +#include + #include "arrow/type.h" #include "arrow/flight/server.h" +#include "arrow/io/file.h" + +#include "parquet/arrow/reader.h" +#include "parquet/arrow/schema.h" +#include "parquet/stream_reader.h" #include "rgw_flight_frontend.h" #include "rgw_flight.h" -#define dout_subsys ceph_subsys_rgw_flight + +// logging +constexpr unsigned dout_subsys = ceph_subsys_rgw_flight; +constexpr const char* dout_prefix_str = "rgw arrow_flight: "; + namespace rgw::flight { - FlightFrontend::FlightFrontend(boost::intrusive_ptr& _cct, - RGWFrontendConfig* _config, - rgw::sal::Store* _store, - int _port) : - cct(_cct), - dp(cct.get(), dout_subsys, "rgw arrow_flight: "), - config(_config), - port(_port) - { - FlightStore* flight_store = new MemoryFlightStore(); - flight_server = new FlightServer(_cct, _store, flight_store); +const FlightKey null_flight_key = 0; + +FlightFrontend::FlightFrontend(RGWProcessEnv& _env, + RGWFrontendConfig* _config, + int _port) : + env(_env), + config(_config), + port(_port), + dp(env.driver->ctx(), dout_subsys, dout_prefix_str) +{ + env.flight_store = new MemoryFlightStore(dp); + env.flight_server = new FlightServer(env, env.flight_store, dp); + INFO << "flight server started" << dendl; +} + +FlightFrontend::~FlightFrontend() { + delete env.flight_server; + delete env.flight_store; + INFO << "flight server shut down" << dendl; +} + +int FlightFrontend::init() { + if (port <= 0) { + port = FlightServer::default_port; + } + const std::string url = + std::string("grpc+tcp://localhost:") + std::to_string(port); + flt::Location location; + arw::Status s = flt::Location::Parse(url, &location); + if (!s.ok()) { + ERROR << "couldn't parse url=" << url << ", status=" << s << dendl; + return -EINVAL; } - FlightFrontend::~FlightFrontend() { - delete flight_server; + flt::FlightServerOptions options(location); + options.verify_client = false; + s = env.flight_server->Init(options); + if (!s.ok()) { + ERROR << "couldn't init flight server; status=" << s << dendl; + return -EINVAL; } - int FlightFrontend::init() { - if (port <= 0) { - port = FlightServer::default_port; - } - const std::string url = - std::string("grpc+tcp://localhost:") + std::to_string(port); - flt::Location location; - arw::Status s = flt::Location::Parse(url, &location); - if (!s.ok()) { - return -EINVAL; - } - - flt::FlightServerOptions options(location); - options.verify_client = false; - s = flight_server->Init(options); - if (!s.ok()) { - return -EINVAL; - } - - dout(20) << "STATUS: " << __func__ << - ": FlightServer inited; will use port " << port << dendl; + INFO << "FlightServer inited; will use port " << port << dendl; + return 0; +} + +int FlightFrontend::run() { + try { + flight_thread = make_named_thread(server_thread_name, + &FlightServer::Serve, + env.flight_server); + + INFO << "FlightServer thread started, id=" << + flight_thread.get_id() << + ", joinable=" << flight_thread.joinable() << dendl; return 0; + } catch (std::system_error& e) { + ERROR << "FlightServer thread failed to start" << dendl; + return -e.code().value(); } - - int FlightFrontend::run() { - try { - flight_thread = make_named_thread(server_thread_name, - &FlightServer::Serve, - flight_server); - set_flight_server(flight_server); - - dout(20) << "INFO: " << __func__ << - ": FlightServer thread started, id=" << flight_thread.get_id() << - ", joinable=" << flight_thread.joinable() << dendl; - return 0; - } catch (std::system_error& e) { - derr << "ERROR: " << __func__ << - ": FlightServer thread failed to start" << dendl; - return -ENOSPC; - } +} + +void FlightFrontend::stop() { + env.flight_server->Shutdown(); + env.flight_server->Wait(); + INFO << "FlightServer shut down" << dendl; +} + +void FlightFrontend::join() { + flight_thread.join(); + INFO << "FlightServer thread joined" << dendl; +} + +void FlightFrontend::pause_for_new_config() { + // ignore since config changes won't alter flight_server +} + +void FlightFrontend::unpause_with_new_config() { + // ignore since config changes won't alter flight_server +} + +/* ************************************************************ */ + +FlightGetObj_Filter::FlightGetObj_Filter(const req_state* request, + RGWGetObj_Filter* next) : + RGWGetObj_Filter(next), + penv(request->penv), + dp(request->cct->get(), dout_subsys, dout_prefix_str), + current_offset(0), + expected_size(request->obj_size), + uri(request->decoded_uri), + tenant_name(request->bucket->get_tenant()), + bucket_name(request->bucket->get_name()), + object_key(request->object->get_key()), + // note: what about object namespace and instance? + schema_status(arrow::StatusCode::Cancelled, + "schema determination incomplete"), + user_id(request->user->get_id()) +{ +#warning "TODO: fix use of tmpnam" + char name[L_tmpnam]; + const char* namep = std::tmpnam(name); + if (!namep) { + // } + temp_file_name = namep; - void FlightFrontend::stop() { - set_flight_server(nullptr); - flight_server->Shutdown(); - flight_server->Wait(); - dout(20) << "INFO: " << __func__ << ": FlightServer shut down" << dendl; - } + temp_file.open(temp_file_name); +} - void FlightFrontend::join() { - flight_thread.join(); - dout(20) << "INFO: " << __func__ << ": FlightServer thread joined" << dendl; +FlightGetObj_Filter::~FlightGetObj_Filter() { + if (temp_file.is_open()) { + temp_file.close(); } - - void FlightFrontend::pause_for_new_config() { - // ignore since config changes won't alter flight_server + std::error_code error; + std::filesystem::remove(temp_file_name, error); + if (error) { + ERROR << "FlightGetObj_Filter got error when removing temp file; " + "error=" << error.value() << + ", temp_file_name=" << temp_file_name << dendl; + } else { + INFO << "parquet/arrow schema determination status: " << + schema_status << dendl; } - - void FlightFrontend::unpause_with_new_config(rgw::sal::Store* store, - rgw_auth_registry_ptr_t auth_registry) { - // ignore since config changes won't alter flight_server +} + +int FlightGetObj_Filter::handle_data(bufferlist& bl, + off_t bl_ofs, off_t bl_len) { + INFO << "flight handling data from offset " << + current_offset << " (" << bl_ofs << ") of size " << bl_len << dendl; + + current_offset += bl_len; + + if (temp_file.is_open()) { + bl.write_stream(temp_file); + + if (current_offset >= expected_size) { + INFO << "data read is completed, current_offset=" << + current_offset << ", expected_size=" << expected_size << dendl; + temp_file.close(); + + std::shared_ptr kv_metadata; + std::shared_ptr aw_schema; + int64_t num_rows = 0; + + auto process_metadata = [&aw_schema, &num_rows, &kv_metadata, this]() -> arrow::Status { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr file, + arrow::io::ReadableFile::Open(temp_file_name)); + const std::shared_ptr metadata = parquet::ReadMetaData(file); + + file->Close(); + + num_rows = metadata->num_rows(); + kv_metadata = metadata->key_value_metadata(); + const parquet::SchemaDescriptor* pq_schema = metadata->schema(); + ARROW_RETURN_NOT_OK(parquet::arrow::FromParquetSchema(pq_schema, &aw_schema)); + + return arrow::Status::OK(); + }; + + schema_status = process_metadata(); + if (!schema_status.ok()) { + ERROR << "reading metadata to access schema, error=" << schema_status << dendl; + } else { + // INFO << "arrow_schema=" << *aw_schema << dendl; + FlightStore* store = penv.flight_store; + auto key = + store->add_flight(FlightData(uri, tenant_name, bucket_name, + object_key, num_rows, + expected_size, aw_schema, + kv_metadata, user_id)); + (void) key; // suppress unused variable warning + } + } // if last block + } // if file opened + + // chain to next filter in stream + int ret = RGWGetObj_Filter::handle_data(bl, bl_ofs, bl_len); + + return ret; +} + +#if 0 +void code_snippets() { + INFO << "num_columns:" << md->num_columns() << + " num_schema_elements:" << md->num_schema_elements() << + " num_rows:" << md->num_rows() << + " num_row_groups:" << md->num_row_groups() << dendl; + + + INFO << "file schema: name=" << schema1->name() << ", ToString:" << schema1->ToString() << ", num_columns=" << schema1->num_columns() << dendl; + for (int c = 0; c < schema1->num_columns(); ++c) { + const parquet::ColumnDescriptor* cd = schema1->Column(c); + // const parquet::ConvertedType::type t = cd->converted_type; + const std::shared_ptr lt = cd->logical_type(); + INFO << "column " << c << ": name=" << cd->name() << ", ToString=" << cd->ToString() << ", logical_type=" << lt->ToString() << dendl; } - int FlightGetObj_Filter::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) { - // do work here - dout(0) << "ERIC: " << __func__ << ": flight handling data from offset " << bl_ofs << " of size " << bl_len << dendl; - - // chain upwards - return RGWGetObj_Filter::handle_data(bl, bl_ofs, bl_len); + INFO << "There are " << md->num_rows() << " rows and " << md->num_row_groups() << " row groups" << dendl; + for (int rg = 0; rg < md->num_row_groups(); ++rg) { + INFO << "Row Group " << rg << dendl; + auto rg_md = md->RowGroup(rg); + auto schema2 = rg_md->schema(); } +} +#endif } // namespace rgw::flight diff --git a/src/rgw/rgw_flight_frontend.h b/src/rgw/rgw_flight_frontend.h index c74878a884b4f..b820ca22b0699 100644 --- a/src/rgw/rgw_flight_frontend.h +++ b/src/rgw/rgw_flight_frontend.h @@ -8,62 +8,71 @@ #include "rgw_frontend.h" #include "rgw_op.h" +#include "arrow/status.h" + namespace rgw::flight { - using FlightKey = uint32_t; +using FlightKey = uint32_t; +extern const FlightKey null_flight_key; - class FlightServer; +class FlightServer; - class FlightFrontend : public RGWFrontend { +class FlightFrontend : public RGWFrontend { - static constexpr std::string_view server_thread_name = - "Arrow Flight Server thread"; + static constexpr std::string_view server_thread_name = + "Arrow Flight Server thread"; - boost::intrusive_ptr& cct; - const DoutPrefix dp; - FlightServer* flight_server; // pointer so header file doesn't need to pull in too much - std::thread flight_thread; - RGWFrontendConfig* config; - int port; + RGWProcessEnv& env; + std::thread flight_thread; + RGWFrontendConfig* config; + int port; - public: + const DoutPrefix dp; - // port <= 0 -> let server decide; typically 8077 - FlightFrontend(boost::intrusive_ptr& cct, - RGWFrontendConfig* config, - rgw::sal::Store* store, - int port = -1); - ~FlightFrontend() override; - int init() override; - int run() override; - void stop() override; - void join() override; +public: - void pause_for_new_config() override; - void unpause_with_new_config(rgw::sal::Store* store, - rgw_auth_registry_ptr_t auth_registry) override; + // port <= 0 means let server decide; typically 8077 + FlightFrontend(RGWProcessEnv& env, + RGWFrontendConfig* config, + int port = -1); + ~FlightFrontend() override; + int init() override; + int run() override; + void stop() override; + void join() override; - }; // class FlightFrontend + void pause_for_new_config() override; + void unpause_with_new_config() override; +}; // class FlightFrontend - class FlightGetObj_Filter : public RGWGetObj_Filter { +class FlightGetObj_Filter : public RGWGetObj_Filter { - FlightKey key; + const RGWProcessEnv& penv; + const DoutPrefix dp; + FlightKey key; + uint64_t current_offset; + uint64_t expected_size; + std::string uri; + std::string tenant_name; + std::string bucket_name; + rgw_obj_key object_key; + std::string temp_file_name; + std::ofstream temp_file; + arrow::Status schema_status; + rgw_user user_id; // TODO: this should be removed when we do + // proper flight authentication - public: +public: - FlightGetObj_Filter(const FlightKey& _key, RGWGetObj_Filter* next) : - RGWGetObj_Filter(next), - key(_key) - { - // empty - } + FlightGetObj_Filter(const req_state* request, RGWGetObj_Filter* next); + ~FlightGetObj_Filter(); - int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; + int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; #if 0 - // this would allow the range to be modified if necessary; - int fixup_range(off_t& ofs, off_t& end) override; + // this would allow the range to be modified if necessary; + int fixup_range(off_t& ofs, off_t& end) override; #endif - }; +}; } // namespace rgw::flight diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 9c23882744546..e5589ae15089b 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -2238,10 +2238,8 @@ void RGWGetObj::execute(optional_yield y) #ifdef WITH_ARROW_FLIGHT if (ofs == 0) { - rgw::flight::FlightKey key = rgw::flight::propose_flight(s); - ldpp_dout(this, 0) << "ERIC: added arrow flight with key=" << key << dendl; - - flight_filter.emplace(key, filter); + // insert a GetObj_Filter to monitor and create flight + flight_filter.emplace(s, filter); filter = &*flight_filter; } #endif diff --git a/src/rgw/rgw_process_env.h b/src/rgw/rgw_process_env.h index 7e62b6afcd491..3193ff15121f5 100644 --- a/src/rgw/rgw_process_env.h +++ b/src/rgw/rgw_process_env.h @@ -20,6 +20,13 @@ namespace rgw::sal { class LuaManager; } +#ifdef WITH_ARROW_FLIGHT +namespace rgw::flight { + class FlightServer; + class FlightStore; +} +#endif + struct RGWLuaProcessEnv { std::string luarocks_path; rgw::lua::Background* background = nullptr; @@ -33,4 +40,11 @@ struct RGWProcessEnv { OpsLogSink *olog = nullptr; std::unique_ptr auth_registry; ActiveRateLimiter* ratelimiting = nullptr; + +#ifdef WITH_ARROW_FLIGHT + // managed by rgw:flight::FlightFrontend in rgw_flight_frontend.cc + rgw::flight::FlightServer* flight_server; + rgw::flight::FlightStore* flight_store; +#endif }; +