/*
* completion callback for async writepages
*/
+typedef void (*ceph_osdc_full_callback_t)(struct ceph_osd_client *, void *);
typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *,
struct ceph_msg *);
typedef void (*ceph_osdc_unsafe_callback_t)(struct ceph_osd_request *, bool);
u64 event_count;
struct workqueue_struct *notify_wq;
+
+ ceph_osdc_full_callback_t map_cb;
+ void *map_p;
};
extern int ceph_osdc_setup(void);
extern int ceph_osdc_start_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req,
bool nofail);
+extern u32 ceph_osdc_cancel_writes(struct ceph_osd_client *osdc, int r);
extern void ceph_osdc_cancel_request(struct ceph_osd_request *req);
extern int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
void *data, struct ceph_osd_event **pevent);
extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
extern void ceph_osdc_put_event(struct ceph_osd_event *event);
+
+extern void ceph_osdc_register_map_cb(struct ceph_osd_client *osdc,
+ ceph_osdc_full_callback_t cb, void *data);
#endif
return NULL;
}
+/*
+ * Drop all pending write/modify requests and complete
+ * them with the `r` as return code.
+ *
+ * Returns the highest OSD map epoch of a request that was
+ * cancelled, or 0 if none were cancelled.
+ */
+u32 ceph_osdc_cancel_writes(
+ struct ceph_osd_client *osdc,
+ int r)
+{
+ struct ceph_osd_request *req;
+ struct rb_node *n = osdc->requests.rb_node;
+ u32 latest_epoch = 0;
+
+ dout("enter cancel_writes r=%d", r);
+
+ mutex_lock(&osdc->request_mutex);
+
+ while (n) {
+ req = rb_entry(n, struct ceph_osd_request, r_node);
+ n = rb_next(n);
+
+ ceph_osdc_get_request(req);
+ if (req->r_flags && CEPH_OSD_FLAG_WRITE) {
+ req->r_result = r;
+ complete_all(&req->r_completion);
+ complete_all(&req->r_safe_completion);
+
+ if (req->r_callback) {
+ // Requires callbacks used for write ops are
+ // amenable to being called with NULL msg
+ // (e.g. writepages_finish)
+ req->r_callback(req, NULL);
+ }
+
+ __unregister_request(osdc, req);
+
+ if (*req->r_request_osdmap_epoch > latest_epoch) {
+ latest_epoch = *req->r_request_osdmap_epoch;
+ }
+ }
+ ceph_osdc_put_request(req);
+ }
+
+ mutex_unlock(&osdc->request_mutex);
+
+ dout("complete cancel_writes latest_epoch=%ul", latest_epoch);
+
+ return latest_epoch;
+}
+EXPORT_SYMBOL(ceph_osdc_cancel_writes);
+
static void __kick_linger_request(struct ceph_osd_request *req)
{
struct ceph_osd_client *osdc = req->r_osdc;
downgrade_write(&osdc->map_sem);
ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
+ if (osdc->map_cb) {
+ osdc->map_cb(osdc, osdc->map_p);
+ }
+
/*
* subscribe to subsequent osdmap updates if full to ensure
* we find out when we are no longer full and stop returning
up_write(&osdc->map_sem);
}
+void ceph_osdc_register_map_cb(struct ceph_osd_client *osdc,
+ ceph_osdc_full_callback_t cb, void *data)
+{
+ osdc->map_cb = cb;
+ osdc->map_p = data;
+}
+EXPORT_SYMBOL(ceph_osdc_register_map_cb);
+
/*
* watch/notify callback event infrastructure
*
spin_lock_init(&osdc->event_lock);
osdc->event_tree = RB_ROOT;
osdc->event_count = 0;
+ osdc->map_cb = NULL;
+ osdc->map_p = NULL;
schedule_delayed_work(&osdc->osds_timeout_work,
round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));