summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Documentation/ABI/testing/sysfs-bus-rbd18
-rw-r--r--drivers/block/rbd.c1784
-rw-r--r--drivers/block/rbd_types.h27
-rw-r--r--fs/ceph/addr.c19
-rw-r--r--fs/ceph/caps.c2
-rw-r--r--fs/ceph/file.c4
-rw-r--r--fs/ceph/ioctl.c8
-rw-r--r--fs/ceph/mds_client.c3
-rw-r--r--fs/ceph/super.c37
-rw-r--r--include/linux/ceph/mon_client.h1
-rw-r--r--include/linux/ceph/osd_client.h2
-rw-r--r--include/linux/ceph/osdmap.h6
-rw-r--r--net/ceph/mon_client.c7
-rw-r--r--net/ceph/osd_client.c47
-rw-r--r--net/ceph/osdmap.c18
-rw-r--r--net/ceph/pagelist.c5
16 files changed, 1299 insertions, 689 deletions
diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd
index 3c17b62899f..1cf2adf46b1 100644
--- a/Documentation/ABI/testing/sysfs-bus-rbd
+++ b/Documentation/ABI/testing/sysfs-bus-rbd
@@ -25,6 +25,10 @@ client_id
The ceph unique client id that was assigned for this specific session.
+features
+
+ A hexadecimal encoding of the feature bits for this image.
+
major
The block device major number.
@@ -33,6 +37,11 @@ name
The name of the rbd image.
+image_id
+
+ The unique id for the rbd image. (For rbd image format 1
+ this is empty.)
+
pool
The name of the storage pool where this rbd image resides.
@@ -57,12 +66,6 @@ current_snap
The current snapshot for which the device is mapped.
-create_snap
-
- Create a snapshot:
-
- $ echo <snap-name> > /sys/bus/rbd/devices/<dev-id>/snap_create
-
snap_*
A directory per each snapshot
@@ -79,4 +82,7 @@ snap_size
The size of the image when this snapshot was taken.
+snap_features
+
+ A hexadecimal encoding of the feature bits for this snapshot.
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 54a55f03115..bb3d9be3b1b 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -41,6 +41,8 @@
#include "rbd_types.h"
+#define RBD_DEBUG /* Activate rbd_assert() calls */
+
/*
* The basic unit of block I/O is a sector. It is interpreted in a
* number of contexts in Linux (blk, bio, genhd), but the default is
@@ -50,16 +52,24 @@
#define SECTOR_SHIFT 9
#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
+/* It might be useful to have this defined elsewhere too */
+
+#define U64_MAX ((u64) (~0ULL))
+
#define RBD_DRV_NAME "rbd"
#define RBD_DRV_NAME_LONG "rbd (rados block device)"
#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
#define RBD_MAX_SNAP_NAME_LEN 32
+#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
#define RBD_MAX_OPT_LEN 1024
#define RBD_SNAP_HEAD_NAME "-"
+#define RBD_IMAGE_ID_LEN_MAX 64
+#define RBD_OBJ_PREFIX_LEN_MAX 64
+
/*
* An RBD device name will be "rbd#", where the "rbd" comes from
* RBD_DRV_NAME above, and # is a unique integer identifier.
@@ -69,21 +79,22 @@
#define DEV_NAME_LEN 32
#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
-#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
+#define RBD_READ_ONLY_DEFAULT false
/*
* block device image metadata (in-memory version)
*/
struct rbd_image_header {
- u64 image_size;
+ /* These four fields never change for a given rbd image */
char *object_prefix;
+ u64 features;
__u8 obj_order;
__u8 crypt_type;
__u8 comp_type;
- struct ceph_snap_context *snapc;
- size_t snap_names_len;
- u32 total_snaps;
+ /* The remaining fields need to be updated occasionally */
+ u64 image_size;
+ struct ceph_snap_context *snapc;
char *snap_names;
u64 *snap_sizes;
@@ -91,7 +102,7 @@ struct rbd_image_header {
};
struct rbd_options {
- int notify_timeout;
+ bool read_only;
};
/*
@@ -99,7 +110,6 @@ struct rbd_options {
*/
struct rbd_client {
struct ceph_client *client;
- struct rbd_options *rbd_opts;
struct kref kref;
struct list_head node;
};
@@ -141,6 +151,16 @@ struct rbd_snap {
u64 size;
struct list_head node;
u64 id;
+ u64 features;
+};
+
+struct rbd_mapping {
+ char *snap_name;
+ u64 snap_id;
+ u64 size;
+ u64 features;
+ bool snap_exists;
+ bool read_only;
};
/*
@@ -151,8 +171,9 @@ struct rbd_device {
int major; /* blkdev assigned major */
struct gendisk *disk; /* blkdev's gendisk and rq */
- struct request_queue *q;
+ u32 image_format; /* Either 1 or 2 */
+ struct rbd_options rbd_opts;
struct rbd_client *rbd_client;
char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
@@ -160,6 +181,8 @@ struct rbd_device {
spinlock_t lock; /* queue lock */
struct rbd_image_header header;
+ char *image_id;
+ size_t image_id_len;
char *image_name;
size_t image_name_len;
char *header_name;
@@ -171,13 +194,8 @@ struct rbd_device {
/* protects updating the header */
struct rw_semaphore header_rwsem;
- /* name of the snapshot this device reads from */
- char *snap_name;
- /* id of the snapshot this device reads from */
- u64 snap_id; /* current snapshot id */
- /* whether the snap_id this device reads from still exists */
- bool snap_exists;
- int read_only;
+
+ struct rbd_mapping mapping;
struct list_head node;
@@ -196,12 +214,10 @@ static DEFINE_SPINLOCK(rbd_dev_list_lock);
static LIST_HEAD(rbd_client_list); /* clients */
static DEFINE_SPINLOCK(rbd_client_list_lock);
-static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
+static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
+static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
+
static void rbd_dev_release(struct device *dev);
-static ssize_t rbd_snap_add(struct device *dev,
- struct device_attribute *attr,
- const char *buf,
- size_t count);
static void __rbd_remove_snap_dev(struct rbd_snap *snap);
static ssize_t rbd_add(struct bus_type *bus, const char *buf,
@@ -229,6 +245,18 @@ static struct device rbd_root_dev = {
.release = rbd_root_dev_release,
};
+#ifdef RBD_DEBUG
+#define rbd_assert(expr) \
+ if (unlikely(!(expr))) { \
+ printk(KERN_ERR "\nAssertion failure in %s() " \
+ "at line %d:\n\n" \
+ "\trbd_assert(%s);\n\n", \
+ __func__, __LINE__, #expr); \
+ BUG(); \
+ }
+#else /* !RBD_DEBUG */
+# define rbd_assert(expr) ((void) 0)
+#endif /* !RBD_DEBUG */
static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
{
@@ -246,11 +274,11 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
{
struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
- if ((mode & FMODE_WRITE) && rbd_dev->read_only)
+ if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
return -EROFS;
rbd_get_dev(rbd_dev);
- set_device_ro(bdev, rbd_dev->read_only);
+ set_device_ro(bdev, rbd_dev->mapping.read_only);
return 0;
}
@@ -274,8 +302,7 @@ static const struct block_device_operations rbd_bd_ops = {
* Initialize an rbd client instance.
* We own *ceph_opts.
*/
-static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
- struct rbd_options *rbd_opts)
+static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
{
struct rbd_client *rbdc;
int ret = -ENOMEM;
@@ -299,8 +326,6 @@ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
if (ret < 0)
goto out_err;
- rbdc->rbd_opts = rbd_opts;
-
spin_lock(&rbd_client_list_lock);
list_add_tail(&rbdc->node, &rbd_client_list);
spin_unlock(&rbd_client_list_lock);
@@ -322,36 +347,52 @@ out_opt:
}
/*
- * Find a ceph client with specific addr and configuration.
+ * Find a ceph client with specific addr and configuration. If
+ * found, bump its reference count.
*/
-static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
+static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
{
struct rbd_client *client_node;
+ bool found = false;
if (ceph_opts->flags & CEPH_OPT_NOSHARE)
return NULL;
- list_for_each_entry(client_node, &rbd_client_list, node)
- if (!ceph_compare_options(ceph_opts, client_node->client))
- return client_node;
- return NULL;
+ spin_lock(&rbd_client_list_lock);
+ list_for_each_entry(client_node, &rbd_client_list, node) {
+ if (!ceph_compare_options(ceph_opts, client_node->client)) {
+ kref_get(&client_node->kref);
+ found = true;
+ break;
+ }
+ }
+ spin_unlock(&rbd_client_list_lock);
+
+ return found ? client_node : NULL;
}
/*
* mount options
*/
enum {
- Opt_notify_timeout,
Opt_last_int,
/* int args above */
Opt_last_string,
/* string args above */
+ Opt_read_only,
+ Opt_read_write,
+ /* Boolean args above */
+ Opt_last_bool,
};
static match_table_t rbd_opts_tokens = {
- {Opt_notify_timeout, "notify_timeout=%d"},
/* int args above */
/* string args above */
+ {Opt_read_only, "mapping.read_only"},
+ {Opt_read_only, "ro"}, /* Alternate spelling */
+ {Opt_read_write, "read_write"},
+ {Opt_read_write, "rw"}, /* Alternate spelling */
+ /* Boolean args above */
{-1, NULL}
};
@@ -376,16 +417,22 @@ static int parse_rbd_opts_token(char *c, void *private)
} else if (token > Opt_last_int && token < Opt_last_string) {
dout("got string token %d val %s\n", token,
argstr[0].from);
+ } else if (token > Opt_last_string && token < Opt_last_bool) {
+ dout("got Boolean token %d\n", token);
} else {
dout("got token %d\n", token);
}
switch (token) {
- case Opt_notify_timeout:
- rbd_opts->notify_timeout = intval;
+ case Opt_read_only:
+ rbd_opts->read_only = true;
+ break;
+ case Opt_read_write:
+ rbd_opts->read_only = false;
break;
default:
- BUG_ON(token);
+ rbd_assert(false);
+ break;
}
return 0;
}
@@ -394,48 +441,33 @@ static int parse_rbd_opts_token(char *c, void *private)
* Get a ceph client with specific addr and configuration, if one does
* not exist create it.
*/
-static struct rbd_client *rbd_get_client(const char *mon_addr,
- size_t mon_addr_len,
- char *options)
+static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
+ size_t mon_addr_len, char *options)
{
- struct rbd_client *rbdc;
+ struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
struct ceph_options *ceph_opts;
- struct rbd_options *rbd_opts;
-
- rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
- if (!rbd_opts)
- return ERR_PTR(-ENOMEM);
+ struct rbd_client *rbdc;
- rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
+ rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
ceph_opts = ceph_parse_options(options, mon_addr,
mon_addr + mon_addr_len,
parse_rbd_opts_token, rbd_opts);
- if (IS_ERR(ceph_opts)) {
- kfree(rbd_opts);
- return ERR_CAST(ceph_opts);
- }
+ if (IS_ERR(ceph_opts))
+ return PTR_ERR(ceph_opts);
- spin_lock(&rbd_client_list_lock);
- rbdc = __rbd_client_find(ceph_opts);
+ rbdc = rbd_client_find(ceph_opts);
if (rbdc) {
/* using an existing client */
- kref_get(&rbdc->kref);
- spin_unlock(&rbd_client_list_lock);
-
ceph_destroy_options(ceph_opts);
- kfree(rbd_opts);
-
- return rbdc;
+ } else {
+ rbdc = rbd_client_create(ceph_opts);
+ if (IS_ERR(rbdc))
+ return PTR_ERR(rbdc);
}
- spin_unlock(&rbd_client_list_lock);
-
- rbdc = rbd_client_create(ceph_opts, rbd_opts);
+ rbd_dev->rbd_client = rbdc;
- if (IS_ERR(rbdc))
- kfree(rbd_opts);
-
- return rbdc;
+ return 0;
}
/*
@@ -453,7 +485,6 @@ static void rbd_client_release(struct kref *kref)
spin_unlock(&rbd_client_list_lock);
ceph_destroy_client(rbdc->client);
- kfree(rbdc->rbd_opts);
kfree(rbdc);
}
@@ -479,10 +510,38 @@ static void rbd_coll_release(struct kref *kref)
kfree(coll);
}
+static bool rbd_image_format_valid(u32 image_format)
+{
+ return image_format == 1 || image_format == 2;
+}
+
static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
{
- return !memcmp(&ondisk->text,
- RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
+ size_t size;
+ u32 snap_count;
+
+ /* The header has to start with the magic rbd header text */
+ if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
+ return false;
+
+ /*
+ * The size of a snapshot header has to fit in a size_t, and
+ * that limits the number of snapshots.
+ */
+ snap_count = le32_to_cpu(ondisk->snap_count);
+ size = SIZE_MAX - sizeof (struct ceph_snap_context);
+ if (snap_count > size / sizeof (__le64))
+ return false;
+
+ /*
+ * Not only that, but the size of the entire the snapshot
+ * header must also be representable in a size_t.
+ */
+ size -= snap_count * sizeof (__le64);
+ if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
+ return false;
+
+ return true;
}
/*
@@ -490,179 +549,203 @@ static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
* header.
*/
static int rbd_header_from_disk(struct rbd_image_header *header,
- struct rbd_image_header_ondisk *ondisk,
- u32 allocated_snaps)
+ struct rbd_image_header_ondisk *ondisk)
{
u32 snap_count;
+ size_t len;
+ size_t size;
+ u32 i;
- if (!rbd_dev_ondisk_valid(ondisk))
- return -ENXIO;
+ memset(header, 0, sizeof (*header));
snap_count = le32_to_cpu(ondisk->snap_count);
- if (snap_count > (SIZE_MAX - sizeof(struct ceph_snap_context))
- / sizeof (u64))
- return -EINVAL;
- header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
- snap_count * sizeof(u64),
- GFP_KERNEL);
- if (!header->snapc)
+
+ len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
+ header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
+ if (!header->object_prefix)
return -ENOMEM;
+ memcpy(header->object_prefix, ondisk->object_prefix, len);
+ header->object_prefix[len] = '\0';
if (snap_count) {
- header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
- header->snap_names = kmalloc(header->snap_names_len,
- GFP_KERNEL);
+ u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
+
+ /* Save a copy of the snapshot names */
+
+ if (snap_names_len > (u64) SIZE_MAX)
+ return -EIO;
+ header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
if (!header->snap_names)
- goto err_snapc;
- header->snap_sizes = kmalloc(snap_count * sizeof(u64),
- GFP_KERNEL);
+ goto out_err;
+ /*
+ * Note that rbd_dev_v1_header_read() guarantees
+ * the ondisk buffer we're working with has
+ * snap_names_len bytes beyond the end of the
+ * snapshot id array, this memcpy() is safe.
+ */
+ memcpy(header->snap_names, &ondisk->snaps[snap_count],
+ snap_names_len);
+
+ /* Record each snapshot's size */
+
+ size = snap_count * sizeof (*header->snap_sizes);
+ header->snap_sizes = kmalloc(size, GFP_KERNEL);
if (!header->snap_sizes)
- goto err_names;
+ goto out_err;
+ for (i = 0; i < snap_count; i++)
+ header->snap_sizes[i] =
+ le64_to_cpu(ondisk->snaps[i].image_size);
} else {
WARN_ON(ondisk->snap_names_len);
- header->snap_names_len = 0;
header->snap_names = NULL;
header->snap_sizes = NULL;
}
- header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
- GFP_KERNEL);
- if (!header->object_prefix)
- goto err_sizes;
-
- memcpy(header->object_prefix, ondisk->block_name,
- sizeof(ondisk->block_name));
- header->object_prefix[sizeof (ondisk->block_name)] = '\0';
-
- header->image_size = le64_to_cpu(ondisk->image_size);
+ header->features = 0; /* No features support in v1 images */
header->obj_order = ondisk->options.order;
header->crypt_type = ondisk->options.crypt_type;
header->comp_type = ondisk->options.comp_type;
+ /* Allocate and fill in the snapshot context */
+
+ header->image_size = le64_to_cpu(ondisk->image_size);
+ size = sizeof (struct ceph_snap_context);
+ size += snap_count * sizeof (header->snapc->snaps[0]);
+ header->snapc = kzalloc(size, GFP_KERNEL);
+ if (!header->snapc)
+ goto out_err;
+
atomic_set(&header->snapc->nref, 1);
header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
header->snapc->num_snaps = snap_count;
- header->total_snaps = snap_count;
-
- if (snap_count && allocated_snaps == snap_count) {
- int i;
-
- for (i = 0; i < snap_count; i++) {
- header->snapc->snaps[i] =
- le64_to_cpu(ondisk->snaps[i].id);
- header->snap_sizes[i] =
- le64_to_cpu(ondisk->snaps[i].image_size);
- }
-
- /* copy snapshot names */
- memcpy(header->snap_names, &ondisk->snaps[snap_count],
- header->snap_names_len);
- }
+ for (i = 0; i < snap_count; i++)
+ header->snapc->snaps[i] =
+ le64_to_cpu(ondisk->snaps[i].id);
return 0;
-err_sizes:
+out_err:
kfree(header->snap_sizes);
header->snap_sizes = NULL;
-err_names:
kfree(header->snap_names);
header->snap_names = NULL;
-err_snapc:
- kfree(header->snapc);
- header->snapc = NULL;
+ kfree(header->object_prefix);
+ header->object_prefix = NULL;
return -ENOMEM;
}
-static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
- u64 *seq, u64 *size)
+static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
{
- int i;
- char *p = header->snap_names;
- for (i = 0; i < header->total_snaps; i++) {
- if (!strcmp(snap_name, p)) {
+ struct rbd_snap *snap;
- /* Found it. Pass back its id and/or size */
+ list_for_each_entry(snap, &rbd_dev->snaps, node) {
+ if (!strcmp(snap_name, snap->name)) {
+ rbd_dev->mapping.snap_id = snap->id;
+ rbd_dev->mapping.size = snap->size;
+ rbd_dev->mapping.features = snap->features;
- if (seq)
- *seq = header->snapc->snaps[i];
- if (size)
- *size = header->snap_sizes[i];
- return i;
+ return 0;
}
- p += strlen(p) + 1; /* Skip ahead to the next name */
}
+
return -ENOENT;
}
-static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
+static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
{
int ret;
- down_write(&rbd_dev->header_rwsem);
-
- if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
+ if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
sizeof (RBD_SNAP_HEAD_NAME))) {
- rbd_dev->snap_id = CEPH_NOSNAP;
- rbd_dev->snap_exists = false;
- rbd_dev->read_only = 0;
- if (size)
- *size = rbd_dev->header.image_size;
+ rbd_dev->mapping.snap_id = CEPH_NOSNAP;
+ rbd_dev->mapping.size = rbd_dev->header.image_size;
+ rbd_dev->mapping.features = rbd_dev->header.features;
+ rbd_dev->mapping.snap_exists = false;
+ rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
+ ret = 0;
} else {
- u64 snap_id = 0;
-
- ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
- &snap_id, size);
+ ret = snap_by_name(rbd_dev, snap_name);
if (ret < 0)
goto done;
- rbd_dev->snap_id = snap_id;
- rbd_dev->snap_exists = true;
- rbd_dev->read_only = 1;
+ rbd_dev->mapping.snap_exists = true;
+ rbd_dev->mapping.read_only = true;
}
-
- ret = 0;
+ rbd_dev->mapping.snap_name = snap_name;
done:
- up_write(&rbd_dev->header_rwsem);
return ret;
}
static void rbd_header_free(struct rbd_image_header *header)
{
kfree(header->object_prefix);
+ header->object_prefix = NULL;
kfree(header->snap_sizes);
+ header->snap_sizes = NULL;
kfree(header->snap_names);
+ header->snap_names = NULL;
ceph_put_snap_context(header->snapc);
+ header->snapc = NULL;
}
-/*
- * get the actual striped segment name, offset and length
- */
-static u64 rbd_get_segment(struct rbd_image_header *header,
- const char *object_prefix,
- u64 ofs, u64 len,
- char *seg_name, u64 *segofs)
+static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
+{
+ char *name;
+ u64 segment;
+ int ret;
+
+ name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
+ if (!name)
+ return NULL;
+ segment = offset >> rbd_dev->header.obj_order;
+ ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
+ rbd_dev->header.object_prefix, segment);
+ if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
+ pr_err("error formatting segment name for #%llu (%d)\n",
+ segment, ret);
+ kfree(name);
+ name = NULL;
+ }
+
+ return name;
+}
+
+static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
{
- u64 seg = ofs >> header->obj_order;
+ u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
- if (seg_name)
- snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
- "%s.%012llx", object_prefix, seg);
+ return offset & (segment_size - 1);
+}
+
+static u64 rbd_segment_length(struct rbd_device *rbd_dev,
+ u64 offset, u64 length)
+{
+ u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
- ofs = ofs & ((1 << header->obj_order) - 1);
- len = min_t(u64, len, (1 << header->obj_order) - ofs);
+ offset &= segment_size - 1;
- if (segofs)
- *segofs = ofs;
+ rbd_assert(length <= U64_MAX - offset);
+ if (offset + length > segment_size)
+ length = segment_size - offset;
- return len;
+ return length;
}
static int rbd_get_num_segments(struct rbd_image_header *header,
u64 ofs, u64 len)
{
- u64 start_seg = ofs >> header->obj_order;
- u64 end_seg = (ofs + len - 1) >> header->obj_order;
+ u64 start_seg;
+ u64 end_seg;
+
+ if (!len)
+ return 0;
+ if (len - 1 > U64_MAX - ofs)
+ return -ERANGE;
+
+ start_seg = ofs >> header->obj_order;
+ end_seg = (ofs + len - 1) >> header->obj_order;
+
return end_seg - start_seg + 1;
}
@@ -724,7 +807,9 @@ static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
struct bio_pair **bp,
int len, gfp_t gfpmask)
{
- struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
+ struct bio *old_chain = *old;
+ struct bio *new_chain = NULL;
+ struct bio *tail;
int total = 0;
if (*bp) {
@@ -733,9 +818,12 @@ static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
}
while (old_chain && (total < len)) {
+ struct bio *tmp;
+
tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
if (!tmp)
goto err_out;
+ gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
if (total + old_chain->bi_size > len) {
struct bio_pair *bp;
@@ -763,24 +851,18 @@ static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
}
tmp->bi_bdev = NULL;
- gfpmask &= ~__GFP_WAIT;
tmp->bi_next = NULL;
-
- if (!new_chain) {
- new_chain = tail = tmp;
- } else {
+ if (new_chain)
tail->bi_next = tmp;
- tail = tmp;
- }
+ else
+ new_chain = tmp;
+ tail = tmp;
old_chain = old_chain->bi_next;
total += tmp->bi_size;
}
- BUG_ON(total < len);
-
- if (tail)
- tail->bi_next = NULL;
+ rbd_assert(total == len);
*old = old_chain;
@@ -938,8 +1020,9 @@ static int rbd_do_request(struct request *rq,
layout->fl_stripe_count = cpu_to_le32(1);
layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
- ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
- req, ops);
+ ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
+ req, ops);
+ rbd_assert(ret == 0);
ceph_osdc_build_request(req, ofs, &len,
ops,
@@ -1030,8 +1113,8 @@ static int rbd_req_sync_op(struct rbd_device *rbd_dev,
int flags,
struct ceph_osd_req_op *ops,
const char *object_name,
- u64 ofs, u64 len,
- char *buf,
+ u64 ofs, u64 inbound_size,
+ char *inbound,
struct ceph_osd_request **linger_req,
u64 *ver)
{
@@ -1039,15 +1122,15 @@ static int rbd_req_sync_op(struct rbd_device *rbd_dev,
struct page **pages;
int num_pages;
- BUG_ON(ops == NULL);
+ rbd_assert(ops != NULL);
- num_pages = calc_pages_for(ofs , len);
+ num_pages = calc_pages_for(ofs, inbound_size);
pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
if (IS_ERR(pages))
return PTR_ERR(pages);
ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
- object_name, ofs, len, NULL,
+ object_name, ofs, inbound_size, NULL,
pages, num_pages,
flags,
ops,
@@ -1057,8 +1140,8 @@ static int rbd_req_sync_op(struct rbd_device *rbd_dev,
if (ret < 0)
goto done;
- if ((flags & CEPH_OSD_FLAG_READ) && buf)
- ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
+ if ((flags & CEPH_OSD_FLAG_READ) && inbound)
+ ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
done:
ceph_release_page_vector(pages, num_pages);
@@ -1085,14 +1168,11 @@ static int rbd_do_op(struct request *rq,
struct ceph_osd_req_op *ops;
u32 payload_len;
- seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
+ seg_name = rbd_segment_name(rbd_dev, ofs);
if (!seg_name)
return -ENOMEM;
-
- seg_len = rbd_get_segment(&rbd_dev->header,
- rbd_dev->header.object_prefix,
- ofs, len,
- seg_name, &seg_ofs);
+ seg_len = rbd_segment_length(rbd_dev, ofs, len);
+ seg_ofs = rbd_segment_offset(rbd_dev, ofs);
payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
@@ -1104,7 +1184,7 @@ static int rbd_do_op(struct request *rq,
/* we've taken care of segment sizes earlier when we
cloned the bios. We should never have a segment
truncated at this point */
- BUG_ON(seg_len < len);
+ rbd_assert(seg_len == len);
ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
seg_name, seg_ofs, seg_len,
@@ -1306,89 +1386,36 @@ static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
return ret;
}
-struct rbd_notify_info {
- struct rbd_device *rbd_dev;
-};
-
-static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
-{
- struct rbd_device *rbd_dev = (struct rbd_device *)data;
- if (!rbd_dev)
- return;
-
- dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
- rbd_dev->header_name, (unsigned long long) notify_id,
- (unsigned int) opcode);
-}
-
/*
- * Request sync osd notify
- */
-static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
-{
- struct ceph_osd_req_op *ops;
- struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
- struct ceph_osd_event *event;
- struct rbd_notify_info info;
- int payload_len = sizeof(u32) + sizeof(u32);
- int ret;
-
- ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
- if (!ops)
- return -ENOMEM;
-
- info.rbd_dev = rbd_dev;
-
- ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
- (void *)&info, &event);
- if (ret < 0)
- goto fail;
-
- ops[0].watch.ver = 1;
- ops[0].watch.flag = 1;
- ops[0].watch.cookie = event->cookie;
- ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
- ops[0].watch.timeout = 12;
-
- ret = rbd_req_sync_op(rbd_dev, NULL,
- CEPH_NOSNAP,
- CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
- ops,
- rbd_dev->header_name,
- 0, 0, NULL, NULL, NULL);
- if (ret < 0)
- goto fail_event;
-
- ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
- dout("ceph_osdc_wait_event returned %d\n", ret);
- rbd_destroy_ops(ops);
- return 0;
-
-fail_event:
- ceph_osdc_cancel_event(event);
-fail:
- rbd_destroy_ops(ops);
- return ret;
-}
-
-/*
- * Request sync osd read
+ * Synchronous osd object method call
*/
static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
const char *object_name,
const char *class_name,
const char *method_name,
- const char *data,
- int len,
+ const char *outbound,
+ size_t outbound_size,
+ char *inbound,
+ size_t inbound_size,
+ int flags,
u64 *ver)
{
struct ceph_osd_req_op *ops;
int class_name_len = strlen(class_name);
int method_name_len = strlen(method_name);
+ int payload_size;
int ret;
- ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
- class_name_len + method_name_len + len);
+ /*
+ * Any input parameters required by the method we're calling
+ * will be sent along with the class and method names as
+ * part of the message payload. That data and its size are
+ * supplied via the indata and indata_len fields (named from
+ * the perspective of the server side) in the OSD request
+ * operation.
+ */
+ payload_size = class_name_len + method_name_len + outbound_size;
+ ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
if (!ops)
return -ENOMEM;
@@ -1397,14 +1424,14 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
ops[0].cls.method_name = method_name;
ops[0].cls.method_len = (__u8) method_name_len;
ops[0].cls.argc = 0;
- ops[0].cls.indata = data;
- ops[0].cls.indata_len = len;
+ ops[0].cls.indata = outbound;
+ ops[0].cls.indata_len = outbound_size;
ret = rbd_req_sync_op(rbd_dev, NULL,
CEPH_NOSNAP,
- CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
- ops,
- object_name, 0, 0, NULL, NULL, ver);
+ flags, ops,
+ object_name, 0, inbound_size, inbound,
+ NULL, ver);
rbd_destroy_ops(ops);
@@ -1446,10 +1473,6 @@ static void rbd_rq_fn(struct request_queue *q)
struct rbd_req_coll *coll;
struct ceph_snap_context *snapc;
- /* peek at request from block layer */
- if (!rq)
- break;
-
dout("fetched request\n");
/* filter out block requests we don't understand */
@@ -1464,7 +1487,7 @@ static void rbd_rq_fn(struct request_queue *q)
size = blk_rq_bytes(rq);
ofs = blk_rq_pos(rq) * SECTOR_SIZE;
rq_bio = rq->bio;
- if (do_write && rbd_dev->read_only) {
+ if (do_write && rbd_dev->mapping.read_only) {
__blk_end_request_all(rq, -EROFS);
continue;
}
@@ -1473,7 +1496,8 @@ static void rbd_rq_fn(struct request_queue *q)
down_read(&rbd_dev->header_rwsem);
- if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
+ if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
+ !rbd_dev->mapping.snap_exists) {
up_read(&rbd_dev->header_rwsem);
dout("request for non-existent snapshot");
spin_lock_irq(q->queue_lock);
@@ -1490,6 +1514,12 @@ static void rbd_rq_fn(struct request_queue *q)
size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
+ if (num_segs <= 0) {
+ spin_lock_irq(q->queue_lock);
+ __blk_end_request_all(rq, num_segs);
+ ceph_put_snap_context(snapc);
+ continue;
+ }
coll = rbd_alloc_coll(num_segs);
if (!coll) {
spin_lock_irq(q->queue_lock);
@@ -1501,10 +1531,7 @@ static void rbd_rq_fn(struct request_queue *q)
do {
/* a bio clone to be passed down to OSD req */
dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
- op_size = rbd_get_segment(&rbd_dev->header,
- rbd_dev->header.object_prefix,
- ofs, size,
- NULL, NULL);
+ op_size = rbd_segment_length(rbd_dev, ofs, size);
kref_get(&coll->kref);
bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
op_size, GFP_ATOMIC);
@@ -1524,7 +1551,7 @@ static void rbd_rq_fn(struct request_queue *q)
coll, cur_seg);
else
rbd_req_read(rq, rbd_dev,
- rbd_dev->snap_id,
+ rbd_dev->mapping.snap_id,
ofs,
op_size, bio,
coll, cur_seg);
@@ -1580,8 +1607,6 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
if (!disk)
return;
- rbd_header_free(&rbd_dev->header);
-
if (disk->flags & GENHD_FL_UP)
del_gendisk(disk);
if (disk->queue)
@@ -1590,105 +1615,96 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
}
/*
- * reload the ondisk the header
+ * Read the complete header for the given rbd device.
+ *
+ * Returns a pointer to a dynamically-allocated buffer containing
+ * the complete and validated header. Caller can pass the address
+ * of a variable that will be filled in with the version of the
+ * header object at the time it was read.
+ *
+ * Returns a pointer-coded errno if a failure occurs.
*/
-static int rbd_read_header(struct rbd_device *rbd_dev,
- struct rbd_image_header *header)
+static struct rbd_image_header_ondisk *
+rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
{
- ssize_t rc;
- struct rbd_image_header_ondisk *dh;
+ struct rbd_image_header_ondisk *ondisk = NULL;
u32 snap_count = 0;
- u64 ver;
- size_t len;
+ u64 names_size = 0;
+ u32 want_count;
+ int ret;
/*
- * First reads the fixed-size header to determine the number
- * of snapshots, then re-reads it, along with all snapshot
- * records as well as their stored names.
+ * The complete header will include an array of its 64-bit
+ * snapshot ids, followed by the names of those snapshots as
+ * a contiguous block of NUL-terminated strings. Note that
+ * the number of snapshots could change by the time we read
+ * it in, in which case we re-read it.
*/
- len = sizeof (*dh);
- while (1) {
- dh = kmalloc(len, GFP_KERNEL);
- if (!dh)
- return -ENOMEM;
-
- rc = rbd_req_sync_read(rbd_dev,
- CEPH_NOSNAP,
+ do {
+ size_t size;
+
+ kfree(ondisk);
+
+ size = sizeof (*ondisk);
+ size += snap_count * sizeof (struct rbd_image_snap_ondisk);
+ size += names_size;
+ ondisk = kmalloc(size, GFP_KERNEL);
+ if (!ondisk)
+ return ERR_PTR(-ENOMEM);
+
+ ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
rbd_dev->header_name,
- 0, len,
- (char *)dh, &ver);
- if (rc < 0)
- goto out_dh;
-
- rc = rbd_header_from_disk(header, dh, snap_count);
- if (rc < 0) {
- if (rc == -ENXIO)
- pr_warning("unrecognized header format"
- " for image %s\n",
- rbd_dev->image_name);
- goto out_dh;
+ 0, size,
+ (char *) ondisk, version);
+
+ if (ret < 0)
+ goto out_err;
+ if (WARN_ON((size_t) ret < size)) {
+ ret = -ENXIO;
+ pr_warning("short header read for image %s"
+ " (want %zd got %d)\n",
+ rbd_dev->image_name, size, ret);
+ goto out_err;
+ }
+ if (!rbd_dev_ondisk_valid(ondisk)) {
+ ret = -ENXIO;
+ pr_warning("invalid header for image %s\n",
+ rbd_dev->image_name);
+ goto out_err;
}
- if (snap_count == header->total_snaps)
- break;
+ names_size = le64_to_cpu(ondisk->snap_names_len);
+ want_count = snap_count;
+ snap_count = le32_to_cpu(ondisk->snap_count);
+ } while (snap_count != want_count);
- snap_count = header->total_snaps;
- len = sizeof (*dh) +
- snap_count * sizeof(struct rbd_image_snap_ondisk) +
- header->snap_names_len;
+ return ondisk;
- rbd_header_free(header);
- kfree(dh);
- }
- header->obj_version = ver;
+out_err:
+ kfree(ondisk);
-out_dh:
- kfree(dh);
- return rc;
+ return ERR_PTR(ret);
}
/*
- * create a snapshot
+ * reload the ondisk the header
*/
-static int rbd_header_add_snap(struct rbd_device *rbd_dev,
- const char *snap_name,
- gfp_t gfp_flags)
+static int rbd_read_header(struct rbd_device *rbd_dev,
+ struct rbd_image_header *header)
{
- int name_len = strlen(snap_name);
- u64 new_snapid;
+ struct rbd_image_header_ondisk *ondisk;
+ u64 ver = 0;
int ret;
- void *data, *p, *e;
- struct ceph_mon_client *monc;
-
- /* we should create a snapshot only if we're pointing at the head */
- if (rbd_dev->snap_id != CEPH_NOSNAP)
- return -EINVAL;
- monc = &rbd_dev->rbd_client->client->monc;
- ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
- dout("created snapid=%llu\n", (unsigned long long) new_snapid);
- if (ret < 0)
- return ret;
-
- data = kmalloc(name_len + 16, gfp_flags);
- if (!data)
- return -ENOMEM;
+ ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
+ if (IS_ERR(ondisk))
+ return PTR_ERR(ondisk);
+ ret = rbd_header_from_disk(header, ondisk);
+ if (ret >= 0)
+ header->obj_version = ver;
+ kfree(ondisk);
- p = data;
- e = data + name_len + 16;
-
- ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
- ceph_encode_64_safe(&p, e, new_snapid, bad);
-
- ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
- "rbd", "snap_add",
- data, p - data, NULL);
-
- kfree(data);
-
- return ret < 0 ? ret : 0;
-bad:
- return -ERANGE;
+ return ret;
}
static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
@@ -1715,11 +1731,15 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
down_write(&rbd_dev->header_rwsem);
/* resized? */
- if (rbd_dev->snap_id == CEPH_NOSNAP) {
+ if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
- dout("setting size to %llu sectors", (unsigned long long) size);
- set_capacity(rbd_dev->disk, size);
+ if (size != (sector_t) rbd_dev->mapping.size) {
+ dout("setting size to %llu sectors",
+ (unsigned long long) size);
+ rbd_dev->mapping.size = (u64) size;
+ set_capacity(rbd_dev->disk, size);
+ }
}
/* rbd_dev->header.object_prefix shouldn't change */
@@ -1732,16 +1752,16 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
*hver = h.obj_version;
rbd_dev->header.obj_version = h.obj_version;
rbd_dev->header.image_size = h.image_size;
- rbd_dev->header.total_snaps = h.total_snaps;
rbd_dev->header.snapc = h.snapc;
rbd_dev->header.snap_names = h.snap_names;
- rbd_dev->header.snap_names_len = h.snap_names_len;
rbd_dev->header.snap_sizes = h.snap_sizes;
/* Free the extra copy of the object prefix */
WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
kfree(h.object_prefix);
- ret = __rbd_init_snaps_header(rbd_dev);
+ ret = rbd_dev_snaps_update(rbd_dev);
+ if (!ret)
+ ret = rbd_dev_snaps_register(rbd_dev);
up_write(&rbd_dev->header_rwsem);
@@ -1763,29 +1783,12 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
{
struct gendisk *disk;
struct request_queue *q;
- int rc;
u64 segment_size;
- u64 total_size = 0;
-
- /* contact OSD, request size info about the object being mapped */
- rc = rbd_read_header(rbd_dev, &rbd_dev->header);
- if (rc)
- return rc;
-
- /* no need to lock here, as rbd_dev is not registered yet */
- rc = __rbd_init_snaps_header(rbd_dev);
- if (rc)
- return rc;
-
- rc = rbd_header_set_snap(rbd_dev, &total_size);
- if (rc)
- return rc;
/* create gendisk info */
- rc = -ENOMEM;
disk = alloc_disk(RBD_MINORS_PER_MAJOR);
if (!disk)
- goto out;
+ return -ENOMEM;
snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
rbd_dev->dev_id);
@@ -1795,7 +1798,6 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
disk->private_data = rbd_dev;
/* init rq */
- rc = -ENOMEM;
q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
if (!q)
goto out_disk;
@@ -1816,20 +1818,14 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
q->queuedata = rbd_dev;
rbd_dev->disk = disk;
- rbd_dev->q = q;
- /* finally, announce the disk to the world */
- set_capacity(disk, total_size / SECTOR_SIZE);
- add_disk(disk);
+ set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
- pr_info("%s: added with size 0x%llx\n",
- disk->disk_name, (unsigned long long)total_size);
return 0;
-
out_disk:
put_disk(disk);
-out:
- return rc;
+
+ return -ENOMEM;
}
/*
@@ -1854,6 +1850,19 @@ static ssize_t rbd_size_show(struct device *dev,
return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
}
+/*
+ * Note this shows the features for whatever's mapped, which is not
+ * necessarily the base image.
+ */
+static ssize_t rbd_features_show(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+
+ return sprintf(buf, "0x%016llx\n",
+ (unsigned long long) rbd_dev->mapping.features);
+}
+
static ssize_t rbd_major_show(struct device *dev,
struct device_attribute *attr, char *buf)
{
@@ -1895,13 +1904,25 @@ static ssize_t rbd_name_show(struct device *dev,
return sprintf(buf, "%s\n", rbd_dev->image_name);
}
+static ssize_t rbd_image_id_show(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+
+ return sprintf(buf, "%s\n", rbd_dev->image_id);
+}
+
+/*
+ * Shows the name of the currently-mapped snapshot (or
+ * RBD_SNAP_HEAD_NAME for the base image).
+ */
static ssize_t rbd_snap_show(struct device *dev,
struct device_attribute *attr,
char *buf)
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
- return sprintf(buf, "%s\n", rbd_dev->snap_name);
+ return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
}
static ssize_t rbd_image_refresh(struct device *dev,
@@ -1918,25 +1939,27 @@ static ssize_t rbd_image_refresh(struct device *dev,
}
static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
+static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
+static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
-static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
static struct attribute *rbd_attrs[] = {
&dev_attr_size.attr,
+ &dev_attr_features.attr,
&dev_attr_major.attr,
&dev_attr_client_id.attr,
&dev_attr_pool.attr,
&dev_attr_pool_id.attr,
&dev_attr_name.attr,
+ &dev_attr_image_id.attr,
&dev_attr_current_snap.attr,
&dev_attr_refresh.attr,
- &dev_attr_create_snap.attr,
NULL
};
@@ -1982,12 +2005,24 @@ static ssize_t rbd_snap_id_show(struct device *dev,
return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
}
+static ssize_t rbd_snap_features_show(struct device *dev,
+ struct device_attribute *attr,
+ char *buf)
+{
+ struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
+
+ return sprintf(buf, "0x%016llx\n",
+ (unsigned long long) snap->features);
+}
+
static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
+static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
static struct attribute *rbd_snap_attrs[] = {
&dev_attr_snap_size.attr,
&dev_attr_snap_id.attr,
+ &dev_attr_snap_features.attr,
NULL,
};
@@ -2012,10 +2047,21 @@ static struct device_type rbd_snap_device_type = {
.release = rbd_snap_dev_release,
};
+static bool rbd_snap_registered(struct rbd_snap *snap)
+{
+ bool ret = snap->dev.type == &rbd_snap_device_type;
+ bool reg = device_is_registered(&snap->dev);
+
+ rbd_assert(!ret ^ reg);
+
+ return ret;
+}
+
static void __rbd_remove_snap_dev(struct rbd_snap *snap)
{
list_del(&snap->node);
- device_unregister(&snap->dev);
+ if (device_is_registered(&snap->dev))
+ device_unregister(&snap->dev);
}
static int rbd_register_snap_dev(struct rbd_snap *snap,
@@ -2028,13 +2074,17 @@ static int rbd_register_snap_dev(struct rbd_snap *snap,
dev->parent = parent;
dev->release = rbd_snap_dev_release;
dev_set_name(dev, "snap_%s", snap->name);
+ dout("%s: registering device for snapshot %s\n", __func__, snap->name);
+
ret = device_register(dev);
return ret;
}
static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
- int i, const char *name)
+ const char *snap_name,
+ u64 snap_id, u64 snap_size,
+ u64 snap_features)
{
struct rbd_snap *snap;
int ret;
@@ -2044,17 +2094,13 @@ static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
return ERR_PTR(-ENOMEM);
ret = -ENOMEM;
- snap->name = kstrdup(name, GFP_KERNEL);
+ snap->name = kstrdup(snap_name, GFP_KERNEL);
if (!snap->name)
goto err;
- snap->size = rbd_dev->header.snap_sizes[i];
- snap->id = rbd_dev->header.snapc->snaps[i];
- if (device_is_registered(&rbd_dev->dev)) {
- ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
- if (ret < 0)
- goto err;
- }
+ snap->id = snap_id;
+ snap->size = snap_size;
+ snap->features = snap_features;
return snap;
@@ -2065,128 +2111,439 @@ err:
return ERR_PTR(ret);
}
+static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
+ u64 *snap_size, u64 *snap_features)
+{
+ char *snap_name;
+
+ rbd_assert(which < rbd_dev->header.snapc->num_snaps);
+
+ *snap_size = rbd_dev->header.snap_sizes[which];
+ *snap_features = 0; /* No features for v1 */
+
+ /* Skip over names until we find the one we are looking for */
+
+ snap_name = rbd_dev->header.snap_names;
+ while (which--)
+ snap_name += strlen(snap_name) + 1;
+
+ return snap_name;
+}
+
/*
- * search for the previous snap in a null delimited string list
+ * Get the size and object order for an image snapshot, or if
+ * snap_id is CEPH_NOSNAP, gets this information for the base
+ * image.
*/
-const char *rbd_prev_snap_name(const char *name, const char *start)
+static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
+ u8 *order, u64 *snap_size)
{
- if (name < start + 2)
- return NULL;
+ __le64 snapid = cpu_to_le64(snap_id);
+ int ret;
+ struct {
+ u8 order;
+ __le64 size;
+ } __attribute__ ((packed)) size_buf = { 0 };
+
+ ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+ "rbd", "get_size",
+ (char *) &snapid, sizeof (snapid),
+ (char *) &size_buf, sizeof (size_buf),
+ CEPH_OSD_FLAG_READ, NULL);
+ dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+ if (ret < 0)
+ return ret;
+
+ *order = size_buf.order;
+ *snap_size = le64_to_cpu(size_buf.size);
+
+ dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
+ (unsigned long long) snap_id, (unsigned int) *order,
+ (unsigned long long) *snap_size);
+
+ return 0;
+}
+
+static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
+{
+ return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
+ &rbd_dev->header.obj_order,
+ &rbd_dev->header.image_size);
+}
+
+static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
+{
+ void *reply_buf;
+ int ret;
+ void *p;
+
+ reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
+ if (!reply_buf)
+ return -ENOMEM;
+
+ ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+ "rbd", "get_object_prefix",
+ NULL, 0,
+ reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
+ CEPH_OSD_FLAG_READ, NULL);
+ dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+ if (ret < 0)
+ goto out;
+
+ p = reply_buf;
+ rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
+ p + RBD_OBJ_PREFIX_LEN_MAX,
+ NULL, GFP_NOIO);
+
+ if (IS_ERR(rbd_dev->header.object_prefix)) {
+ ret = PTR_ERR(rbd_dev->header.object_prefix);
+ rbd_dev->header.object_prefix = NULL;
+ } else {
+ dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
+ }
- name -= 2;
- while (*name) {
- if (name == start)
- return start;
- name--;
+out:
+ kfree(reply_buf);
+
+ return ret;
+}
+
+static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
+ u64 *snap_features)
+{
+ __le64 snapid = cpu_to_le64(snap_id);
+ struct {
+ __le64 features;
+ __le64 incompat;
+ } features_buf = { 0 };
+ int ret;
+
+ ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+ "rbd", "get_features",
+ (char *) &snapid, sizeof (snapid),
+ (char *) &features_buf, sizeof (features_buf),
+ CEPH_OSD_FLAG_READ, NULL);
+ dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+ if (ret < 0)
+ return ret;
+ *snap_features = le64_to_cpu(features_buf.features);
+
+ dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
+ (unsigned long long) snap_id,
+ (unsigned long long) *snap_features,
+ (unsigned long long) le64_to_cpu(features_buf.incompat));
+
+ return 0;
+}
+
+static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
+{
+ return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
+ &rbd_dev->header.features);
+}
+
+static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
+{
+ size_t size;
+ int ret;
+ void *reply_buf;
+ void *p;
+ void *end;
+ u64 seq;
+ u32 snap_count;
+ struct ceph_snap_context *snapc;
+ u32 i;
+
+ /*
+ * We'll need room for the seq value (maximum snapshot id),
+ * snapshot count, and array of that many snapshot ids.
+ * For now we have a fixed upper limit on the number we're
+ * prepared to receive.
+ */
+ size = sizeof (__le64) + sizeof (__le32) +
+ RBD_MAX_SNAP_COUNT * sizeof (__le64);
+ reply_buf = kzalloc(size, GFP_KERNEL);
+ if (!reply_buf)
+ return -ENOMEM;
+
+ ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+ "rbd", "get_snapcontext",
+ NULL, 0,
+ reply_buf, size,
+ CEPH_OSD_FLAG_READ, ver);
+ dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+ if (ret < 0)
+ goto out;
+
+ ret = -ERANGE;
+ p = reply_buf;
+ end = (char *) reply_buf + size;
+ ceph_decode_64_safe(&p, end, seq, out);
+ ceph_decode_32_safe(&p, end, snap_count, out);
+
+ /*
+ * Make sure the reported number of snapshot ids wouldn't go
+ * beyond the end of our buffer. But before checking that,
+ * make sure the computed size of the snapshot context we
+ * allocate is representable in a size_t.
+ */
+ if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
+ / sizeof (u64)) {
+ ret = -EINVAL;
+ goto out;
}
- return name + 1;
+ if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
+ goto out;
+
+ size = sizeof (struct ceph_snap_context) +
+ snap_count * sizeof (snapc->snaps[0]);
+ snapc = kmalloc(size, GFP_KERNEL);
+ if (!snapc) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ atomic_set(&snapc->nref, 1);
+ snapc->seq = seq;
+ snapc->num_snaps = snap_count;
+ for (i = 0; i < snap_count; i++)
+ snapc->snaps[i] = ceph_decode_64(&p);
+
+ rbd_dev->header.snapc = snapc;
+
+ dout(" snap context seq = %llu, snap_count = %u\n",
+ (unsigned long long) seq, (unsigned int) snap_count);
+
+out:
+ kfree(reply_buf);
+
+ return 0;
}
-/*
- * compare the old list of snapshots that we have to what's in the header
- * and update it accordingly. Note that the header holds the snapshots
- * in a reverse order (from newest to oldest) and we need to go from
- * older to new so that we don't get a duplicate snap name when
- * doing the process (e.g., removed snapshot and recreated a new
- * one with the same name.
- */
-static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
+static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
{
- const char *name, *first_name;
- int i = rbd_dev->header.total_snaps;
- struct rbd_snap *snap, *old_snap = NULL;
- struct list_head *p, *n;
+ size_t size;
+ void *reply_buf;
+ __le64 snap_id;
+ int ret;
+ void *p;
+ void *end;
+ size_t snap_name_len;
+ char *snap_name;
+
+ size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
+ reply_buf = kmalloc(size, GFP_KERNEL);
+ if (!reply_buf)
+ return ERR_PTR(-ENOMEM);
- first_name = rbd_dev->header.snap_names;
- name = first_name + rbd_dev->header.snap_names_len;
+ snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
+ ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+ "rbd", "get_snapshot_name",
+ (char *) &snap_id, sizeof (snap_id),
+ reply_buf, size,
+ CEPH_OSD_FLAG_READ, NULL);
+ dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+ if (ret < 0)
+ goto out;
- list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
- u64 cur_id;
+ p = reply_buf;
+ end = (char *) reply_buf + size;
+ snap_name_len = 0;
+ snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
+ GFP_KERNEL);
+ if (IS_ERR(snap_name)) {
+ ret = PTR_ERR(snap_name);
+ goto out;
+ } else {
+ dout(" snap_id 0x%016llx snap_name = %s\n",
+ (unsigned long long) le64_to_cpu(snap_id), snap_name);
+ }
+ kfree(reply_buf);
- old_snap = list_entry(p, struct rbd_snap, node);
+ return snap_name;
+out:
+ kfree(reply_buf);
- if (i)
- cur_id = rbd_dev->header.snapc->snaps[i - 1];
+ return ERR_PTR(ret);
+}
- if (!i || old_snap->id < cur_id) {
- /*
- * old_snap->id was skipped, thus was
- * removed. If this rbd_dev is mapped to
- * the removed snapshot, record that it no
- * longer exists, to prevent further I/O.
- */
- if (rbd_dev->snap_id == old_snap->id)
- rbd_dev->snap_exists = false;
- __rbd_remove_snap_dev(old_snap);
- continue;
- }
- if (old_snap->id == cur_id) {
- /* we have this snapshot already */
- i--;
- name = rbd_prev_snap_name(name, first_name);
+static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
+ u64 *snap_size, u64 *snap_features)
+{
+ __le64 snap_id;
+ u8 order;
+ int ret;
+
+ snap_id = rbd_dev->header.snapc->snaps[which];
+ ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
+ if (ret)
+ return ERR_PTR(ret);
+ ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
+ if (ret)
+ return ERR_PTR(ret);
+
+ return rbd_dev_v2_snap_name(rbd_dev, which);
+}
+
+static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
+ u64 *snap_size, u64 *snap_features)
+{
+ if (rbd_dev->image_format == 1)
+ return rbd_dev_v1_snap_info(rbd_dev, which,
+ snap_size, snap_features);
+ if (rbd_dev->image_format == 2)
+ return rbd_dev_v2_snap_info(rbd_dev, which,
+ snap_size, snap_features);
+ return ERR_PTR(-EINVAL);
+}
+
+/*
+ * Scan the rbd device's current snapshot list and compare it to the
+ * newly-received snapshot context. Remove any existing snapshots
+ * not present in the new snapshot context. Add a new snapshot for
+ * any snaphots in the snapshot context not in the current list.
+ * And verify there are no changes to snapshots we already know
+ * about.
+ *
+ * Assumes the snapshots in the snapshot context are sorted by
+ * snapshot id, highest id first. (Snapshots in the rbd_dev's list
+ * are also maintained in that order.)
+ */
+static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
+{
+ struct ceph_snap_context *snapc = rbd_dev->header.snapc;
+ const u32 snap_count = snapc->num_snaps;
+ struct list_head *head = &rbd_dev->snaps;
+ struct list_head *links = head->next;
+ u32 index = 0;
+
+ dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
+ while (index < snap_count || links != head) {
+ u64 snap_id;
+ struct rbd_snap *snap;
+ char *snap_name;
+ u64 snap_size = 0;
+ u64 snap_features = 0;
+
+ snap_id = index < snap_count ? snapc->snaps[index]
+ : CEPH_NOSNAP;
+ snap = links != head ? list_entry(links, struct rbd_snap, node)
+ : NULL;
+ rbd_assert(!snap || snap->id != CEPH_NOSNAP);
+
+ if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
+ struct list_head *next = links->next;
+
+ /* Existing snapshot not in the new snap context */
+
+ if (rbd_dev->mapping.snap_id == snap->id)
+ rbd_dev->mapping.snap_exists = false;
+ __rbd_remove_snap_dev(snap);
+ dout("%ssnap id %llu has been removed\n",
+ rbd_dev->mapping.snap_id == snap->id ?
+ "mapped " : "",
+ (unsigned long long) snap->id);
+
+ /* Done with this list entry; advance */
+
+ links = next;
continue;
}
- for (; i > 0;
- i--, name = rbd_prev_snap_name(name, first_name)) {
- if (!name) {
- WARN_ON(1);
- return -EINVAL;
+
+ snap_name = rbd_dev_snap_info(rbd_dev, index,
+ &snap_size, &snap_features);
+ if (IS_ERR(snap_name))
+ return PTR_ERR(snap_name);
+
+ dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
+ (unsigned long long) snap_id);
+ if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
+ struct rbd_snap *new_snap;
+
+ /* We haven't seen this snapshot before */
+
+ new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
+ snap_id, snap_size, snap_features);
+ if (IS_ERR(new_snap)) {
+ int err = PTR_ERR(new_snap);
+
+ dout(" failed to add dev, error %d\n", err);
+
+ return err;
}
- cur_id = rbd_dev->header.snapc->snaps[i];
- /* snapshot removal? handle it above */
- if (cur_id >= old_snap->id)
- break;
- /* a new snapshot */
- snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
- if (IS_ERR(snap))
- return PTR_ERR(snap);
-
- /* note that we add it backward so using n and not p */
- list_add(&snap->node, n);
- p = &snap->node;
+
+ /* New goes before existing, or at end of list */
+
+ dout(" added dev%s\n", snap ? "" : " at end\n");
+ if (snap)
+ list_add_tail(&new_snap->node, &snap->node);
+ else
+ list_add_tail(&new_snap->node, head);
+ } else {
+ /* Already have this one */
+
+ dout(" already present\n");
+
+ rbd_assert(snap->size == snap_size);
+ rbd_assert(!strcmp(snap->name, snap_name));
+ rbd_assert(snap->features == snap_features);
+
+ /* Done with this list entry; advance */
+
+ links = links->next;
}
+
+ /* Advance to the next entry in the snapshot context */
+
+ index++;
}
- /* we're done going over the old snap list, just add what's left */
- for (; i > 0; i--) {
- name = rbd_prev_snap_name(name, first_name);
- if (!name) {
- WARN_ON(1);
- return -EINVAL;
+ dout("%s: done\n", __func__);
+
+ return 0;
+}
+
+/*
+ * Scan the list of snapshots and register the devices for any that
+ * have not already been registered.
+ */
+static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
+{
+ struct rbd_snap *snap;
+ int ret = 0;
+
+ dout("%s called\n", __func__);
+ if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
+ return -EIO;
+
+ list_for_each_entry(snap, &rbd_dev->snaps, node) {
+ if (!rbd_snap_registered(snap)) {
+ ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
+ if (ret < 0)
+ break;
}
- snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
- if (IS_ERR(snap))
- return PTR_ERR(snap);
- list_add(&snap->node, &rbd_dev->snaps);
}
+ dout("%s: returning %d\n", __func__, ret);
- return 0;
+ return ret;
}
static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
{
- int ret;
struct device *dev;
- struct rbd_snap *snap;
+ int ret;
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
- dev = &rbd_dev->dev;
+ dev = &rbd_dev->dev;
dev->bus = &rbd_bus_type;
dev->type = &rbd_device_type;
dev->parent = &rbd_root_dev;
dev->release = rbd_dev_release;
dev_set_name(dev, "%d", rbd_dev->dev_id);
ret = device_register(dev);
- if (ret < 0)
- goto out;
- list_for_each_entry(snap, &rbd_dev->snaps, node) {
- ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
- if (ret < 0)
- break;
- }
-out:
mutex_unlock(&ctl_mutex);
+
return ret;
}
@@ -2211,33 +2568,37 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
return ret;
}
-static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
+static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
/*
* Get a unique rbd identifier for the given new rbd_dev, and add
* the rbd_dev to the global list. The minimum rbd id is 1.
*/
-static void rbd_id_get(struct rbd_device *rbd_dev)
+static void rbd_dev_id_get(struct rbd_device *rbd_dev)
{
- rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
+ rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
spin_lock(&rbd_dev_list_lock);
list_add_tail(&rbd_dev->node, &rbd_dev_list);
spin_unlock(&rbd_dev_list_lock);
+ dout("rbd_dev %p given dev id %llu\n", rbd_dev,
+ (unsigned long long) rbd_dev->dev_id);
}
/*
* Remove an rbd_dev from the global list, and record that its
* identifier is no longer in use.
*/
-static void rbd_id_put(struct rbd_device *rbd_dev)
+static void rbd_dev_id_put(struct rbd_device *rbd_dev)
{
struct list_head *tmp;
int rbd_id = rbd_dev->dev_id;
int max_id;
- BUG_ON(rbd_id < 1);
+ rbd_assert(rbd_id > 0);
+ dout("rbd_dev %p released dev id %llu\n", rbd_dev,
+ (unsigned long long) rbd_dev->dev_id);
spin_lock(&rbd_dev_list_lock);
list_del_init(&rbd_dev->node);
@@ -2245,7 +2606,7 @@ static void rbd_id_put(struct rbd_device *rbd_dev)
* If the id being "put" is not the current maximum, there
* is nothing special we need to do.
*/
- if (rbd_id != atomic64_read(&rbd_id_max)) {
+ if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
spin_unlock(&rbd_dev_list_lock);
return;
}
@@ -2266,12 +2627,13 @@ static void rbd_id_put(struct rbd_device *rbd_dev)
spin_unlock(&rbd_dev_list_lock);
/*
- * The max id could have been updated by rbd_id_get(), in
+ * The max id could have been updated by rbd_dev_id_get(), in
* which case it now accurately reflects the new maximum.
* Be careful not to overwrite the maximum value in that
* case.
*/
- atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
+ atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
+ dout(" max dev id has been reset\n");
}
/*
@@ -2360,28 +2722,31 @@ static inline char *dup_token(const char **buf, size_t *lenp)
}
/*
- * This fills in the pool_name, image_name, image_name_len, snap_name,
- * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
- * on the list of monitor addresses and other options provided via
- * /sys/bus/rbd/add.
+ * This fills in the pool_name, image_name, image_name_len, rbd_dev,
+ * rbd_md_name, and name fields of the given rbd_dev, based on the
+ * list of monitor addresses and other options provided via
+ * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated
+ * copy of the snapshot name to map if successful, or a
+ * pointer-coded error otherwise.
*
* Note: rbd_dev is assumed to have been initially zero-filled.
*/
-static int rbd_add_parse_args(struct rbd_device *rbd_dev,
- const char *buf,
- const char **mon_addrs,
- size_t *mon_addrs_size,
- char *options,
- size_t options_size)
+static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
+ const char *buf,
+ const char **mon_addrs,
+ size_t *mon_addrs_size,
+ char *options,
+ size_t options_size)
{
size_t len;
- int ret;
+ char *err_ptr = ERR_PTR(-EINVAL);
+ char *snap_name;
/* The first four tokens are required */
len = next_token(&buf);
if (!len)
- return -EINVAL;
+ return err_ptr;
*mon_addrs_size = len + 1;
*mon_addrs = buf;
@@ -2389,9 +2754,9 @@ static int rbd_add_parse_args(struct rbd_device *rbd_dev,
len = copy_token(&buf, options, options_size);
if (!len || len >= options_size)
- return -EINVAL;
+ return err_ptr;
- ret = -ENOMEM;
+ err_ptr = ERR_PTR(-ENOMEM);
rbd_dev->pool_name = dup_token(&buf, NULL);
if (!rbd_dev->pool_name)
goto out_err;
@@ -2400,41 +2765,227 @@ static int rbd_add_parse_args(struct rbd_device *rbd_dev,
if (!rbd_dev->image_name)
goto out_err;
- /* Create the name of the header object */
+ /* Snapshot name is optional */
+ len = next_token(&buf);
+ if (!len) {
+ buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
+ len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
+ }
+ snap_name = kmalloc(len + 1, GFP_KERNEL);
+ if (!snap_name)
+ goto out_err;
+ memcpy(snap_name, buf, len);
+ *(snap_name + len) = '\0';
- rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
- + sizeof (RBD_SUFFIX),
- GFP_KERNEL);
- if (!rbd_dev->header_name)
+dout(" SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
+
+ return snap_name;
+
+out_err:
+ kfree(rbd_dev->image_name);
+ rbd_dev->image_name = NULL;
+ rbd_dev->image_name_len = 0;
+ kfree(rbd_dev->pool_name);
+ rbd_dev->pool_name = NULL;
+
+ return err_ptr;
+}
+
+/*
+ * An rbd format 2 image has a unique identifier, distinct from the
+ * name given to it by the user. Internally, that identifier is
+ * what's used to specify the names of objects related to the image.
+ *
+ * A special "rbd id" object is used to map an rbd image name to its
+ * id. If that object doesn't exist, then there is no v2 rbd image
+ * with the supplied name.
+ *
+ * This function will record the given rbd_dev's image_id field if
+ * it can be determined, and in that case will return 0. If any
+ * errors occur a negative errno will be returned and the rbd_dev's
+ * image_id field will be unchanged (and should be NULL).
+ */
+static int rbd_dev_image_id(struct rbd_device *rbd_dev)
+{
+ int ret;
+ size_t size;
+ char *object_name;
+ void *response;
+ void *p;
+
+ /*
+ * First, see if the format 2 image id file exists, and if
+ * so, get the image's persistent id from it.
+ */
+ size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
+ object_name = kmalloc(size, GFP_NOIO);
+ if (!object_name)
+ return -ENOMEM;
+ sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
+ dout("rbd id object name is %s\n", object_name);
+
+ /* Response will be an encoded string, which includes a length */
+
+ size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
+ response = kzalloc(size, GFP_NOIO);
+ if (!response) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ ret = rbd_req_sync_exec(rbd_dev, object_name,
+ "rbd", "get_id",
+ NULL, 0,
+ response, RBD_IMAGE_ID_LEN_MAX,
+ CEPH_OSD_FLAG_READ, NULL);
+ dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+ if (ret < 0)
+ goto out;
+
+ p = response;
+ rbd_dev->image_id = ceph_extract_encoded_string(&p,
+ p + RBD_IMAGE_ID_LEN_MAX,
+ &rbd_dev->image_id_len,
+ GFP_NOIO);
+ if (IS_ERR(rbd_dev->image_id)) {
+ ret = PTR_ERR(rbd_dev->image_id);
+ rbd_dev->image_id = NULL;
+ } else {
+ dout("image_id is %s\n", rbd_dev->image_id);
+ }
+out:
+ kfree(response);
+ kfree(object_name);
+
+ return ret;
+}
+
+static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
+{
+ int ret;
+ size_t size;
+
+ /* Version 1 images have no id; empty string is used */
+
+ rbd_dev->image_id = kstrdup("", GFP_KERNEL);
+ if (!rbd_dev->image_id)
+ return -ENOMEM;
+ rbd_dev->image_id_len = 0;
+
+ /* Record the header object name for this rbd image. */
+
+ size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
+ rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
+ if (!rbd_dev->header_name) {
+ ret = -ENOMEM;
goto out_err;
+ }
sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
+ /* Populate rbd image metadata */
+
+ ret = rbd_read_header(rbd_dev, &rbd_dev->header);
+ if (ret < 0)
+ goto out_err;
+ rbd_dev->image_format = 1;
+
+ dout("discovered version 1 image, header name is %s\n",
+ rbd_dev->header_name);
+
+ return 0;
+
+out_err:
+ kfree(rbd_dev->header_name);
+ rbd_dev->header_name = NULL;
+ kfree(rbd_dev->image_id);
+ rbd_dev->image_id = NULL;
+
+ return ret;
+}
+
+static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
+{
+ size_t size;
+ int ret;
+ u64 ver = 0;
+
/*
- * The snapshot name is optional. If none is is supplied,
- * we use the default value.
+ * Image id was filled in by the caller. Record the header
+ * object name for this rbd image.
*/
- rbd_dev->snap_name = dup_token(&buf, &len);
- if (!rbd_dev->snap_name)
+ size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
+ rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
+ if (!rbd_dev->header_name)
+ return -ENOMEM;
+ sprintf(rbd_dev->header_name, "%s%s",
+ RBD_HEADER_PREFIX, rbd_dev->image_id);
+
+ /* Get the size and object order for the image */
+
+ ret = rbd_dev_v2_image_size(rbd_dev);
+ if (ret < 0)
goto out_err;
- if (!len) {
- /* Replace the empty name with the default */
- kfree(rbd_dev->snap_name);
- rbd_dev->snap_name
- = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
- if (!rbd_dev->snap_name)
- goto out_err;
- memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
- sizeof (RBD_SNAP_HEAD_NAME));
- }
+ /* Get the object prefix (a.k.a. block_name) for the image */
- return 0;
+ ret = rbd_dev_v2_object_prefix(rbd_dev);
+ if (ret < 0)
+ goto out_err;
+
+ /* Get the features for the image */
+ ret = rbd_dev_v2_features(rbd_dev);
+ if (ret < 0)
+ goto out_err;
+
+ /* crypto and compression type aren't (yet) supported for v2 images */
+
+ rbd_dev->header.crypt_type = 0;
+ rbd_dev->header.comp_type = 0;
+
+ /* Get the snapshot context, plus the header version */
+
+ ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
+ if (ret)
+ goto out_err;
+ rbd_dev->header.obj_version = ver;
+
+ rbd_dev->image_format = 2;
+
+ dout("discovered version 2 image, header name is %s\n",
+ rbd_dev->header_name);
+
+ return -ENOTSUPP;
out_err:
kfree(rbd_dev->header_name);
- kfree(rbd_dev->image_name);
- kfree(rbd_dev->pool_name);
- rbd_dev->pool_name = NULL;
+ rbd_dev->header_name = NULL;
+ kfree(rbd_dev->header.object_prefix);
+ rbd_dev->header.object_prefix = NULL;
+
+ return ret;
+}
+
+/*
+ * Probe for the existence of the header object for the given rbd
+ * device. For format 2 images this includes determining the image
+ * id.
+ */
+static int rbd_dev_probe(struct rbd_device *rbd_dev)
+{
+ int ret;
+
+ /*
+ * Get the id from the image id object. If it's not a
+ * format 2 image, we'll get ENOENT back, and we'll assume
+ * it's a format 1 image.
+ */
+ ret = rbd_dev_image_id(rbd_dev);
+ if (ret)
+ ret = rbd_dev_v1_probe(rbd_dev);
+ else
+ ret = rbd_dev_v2_probe(rbd_dev);
+ if (ret)
+ dout("probe failed, returning %d\n", ret);
return ret;
}
@@ -2449,16 +3000,17 @@ static ssize_t rbd_add(struct bus_type *bus,
size_t mon_addrs_size = 0;
struct ceph_osd_client *osdc;
int rc = -ENOMEM;
+ char *snap_name;
if (!try_module_get(THIS_MODULE))
return -ENODEV;
options = kmalloc(count, GFP_KERNEL);
if (!options)
- goto err_nomem;
+ goto err_out_mem;
rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
if (!rbd_dev)
- goto err_nomem;
+ goto err_out_mem;
/* static rbd_device initialization */
spin_lock_init(&rbd_dev->lock);
@@ -2466,27 +3018,18 @@ static ssize_t rbd_add(struct bus_type *bus,
INIT_LIST_HEAD(&rbd_dev->snaps);
init_rwsem(&rbd_dev->header_rwsem);
- /* generate unique id: find highest unique id, add one */
- rbd_id_get(rbd_dev);
-
- /* Fill in the device name, now that we have its id. */
- BUILD_BUG_ON(DEV_NAME_LEN
- < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
- sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
-
/* parse add command */
- rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
- options, count);
- if (rc)
- goto err_put_id;
-
- rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
- options);
- if (IS_ERR(rbd_dev->rbd_client)) {
- rc = PTR_ERR(rbd_dev->rbd_client);
- goto err_put_id;
+ snap_name = rbd_add_parse_args(rbd_dev, buf,
+ &mon_addrs, &mon_addrs_size, options, count);
+ if (IS_ERR(snap_name)) {
+ rc = PTR_ERR(snap_name);
+ goto err_out_mem;
}
+ rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
+ if (rc < 0)
+ goto err_out_args;
+
/* pick the pool */
osdc = &rbd_dev->rbd_client->client->osdc;
rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
@@ -2494,23 +3037,53 @@ static ssize_t rbd_add(struct bus_type *bus,
goto err_out_client;
rbd_dev->pool_id = rc;
- /* register our block device */
- rc = register_blkdev(0, rbd_dev->name);
+ rc = rbd_dev_probe(rbd_dev);
if (rc < 0)
goto err_out_client;
+ rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+
+ /* no need to lock here, as rbd_dev is not registered yet */
+ rc = rbd_dev_snaps_update(rbd_dev);
+ if (rc)
+ goto err_out_header;
+
+ rc = rbd_dev_set_mapping(rbd_dev, snap_name);
+ if (rc)
+ goto err_out_header;
+
+ /* generate unique id: find highest unique id, add one */
+ rbd_dev_id_get(rbd_dev);
+
+ /* Fill in the device name, now that we have its id. */
+ BUILD_BUG_ON(DEV_NAME_LEN
+ < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
+ sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
+
+ /* Get our block major device number. */
+
+ rc = register_blkdev(0, rbd_dev->name);
+ if (rc < 0)
+ goto err_out_id;
rbd_dev->major = rc;
- rc = rbd_bus_add_dev(rbd_dev);
+ /* Set up the blkdev mapping. */
+
+ rc = rbd_init_disk(rbd_dev);
if (rc)
goto err_out_blkdev;
+ rc = rbd_bus_add_dev(rbd_dev);
+ if (rc)
+ goto err_out_disk;
+
/*
* At this point cleanup in the event of an error is the job
* of the sysfs code (initiated by rbd_bus_del_dev()).
- *
- * Set up and announce blkdev mapping.
*/
- rc = rbd_init_disk(rbd_dev);
+
+ down_write(&rbd_dev->header_rwsem);
+ rc = rbd_dev_snaps_register(rbd_dev);
+ up_write(&rbd_dev->header_rwsem);
if (rc)
goto err_out_bus;
@@ -2518,6 +3091,13 @@ static ssize_t rbd_add(struct bus_type *bus,
if (rc)
goto err_out_bus;
+ /* Everything's ready. Announce the disk to the world. */
+
+ add_disk(rbd_dev->disk);
+
+ pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
+ (unsigned long long) rbd_dev->mapping.size);
+
return count;
err_out_bus:
@@ -2527,19 +3107,23 @@ err_out_bus:
kfree(options);
return rc;
+err_out_disk:
+ rbd_free_disk(rbd_dev);
err_out_blkdev:
unregister_blkdev(rbd_dev->major, rbd_dev->name);
+err_out_id:
+ rbd_dev_id_put(rbd_dev);
+err_out_header:
+ rbd_header_free(&rbd_dev->header);
err_out_client:
+ kfree(rbd_dev->header_name);
rbd_put_client(rbd_dev);
-err_put_id:
- if (rbd_dev->pool_name) {
- kfree(rbd_dev->snap_name);
- kfree(rbd_dev->header_name);
- kfree(rbd_dev->image_name);
- kfree(rbd_dev->pool_name);
- }
- rbd_id_put(rbd_dev);
-err_nomem:
+ kfree(rbd_dev->image_id);
+err_out_args:
+ kfree(rbd_dev->mapping.snap_name);
+ kfree(rbd_dev->image_name);
+ kfree(rbd_dev->pool_name);
+err_out_mem:
kfree(rbd_dev);
kfree(options);
@@ -2585,12 +3169,16 @@ static void rbd_dev_release(struct device *dev)
rbd_free_disk(rbd_dev);
unregister_blkdev(rbd_dev->major, rbd_dev->name);
+ /* release allocated disk header fields */
+ rbd_header_free(&rbd_dev->header);
+
/* done with the id, and with the rbd_dev */
- kfree(rbd_dev->snap_name);
+ kfree(rbd_dev->mapping.snap_name);
+ kfree(rbd_dev->image_id);
kfree(rbd_dev->header_name);
kfree(rbd_dev->pool_name);
kfree(rbd_dev->image_name);
- rbd_id_put(rbd_dev);
+ rbd_dev_id_put(rbd_dev);
kfree(rbd_dev);
/* release module ref */
@@ -2628,47 +3216,7 @@ static ssize_t rbd_remove(struct bus_type *bus,
done:
mutex_unlock(&ctl_mutex);
- return ret;
-}
-static ssize_t rbd_snap_add(struct device *dev,
- struct device_attribute *attr,
- const char *buf,
- size_t count)
-{
- struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
- int ret;
- char *name = kmalloc(count + 1, GFP_KERNEL);
- if (!name)
- return -ENOMEM;
-
- snprintf(name, count, "%s", buf);
-
- mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-
- ret = rbd_header_add_snap(rbd_dev,
- name, GFP_KERNEL);
- if (ret < 0)
- goto err_unlock;
-
- ret = __rbd_refresh_header(rbd_dev, NULL);
- if (ret < 0)
- goto err_unlock;
-
- /* shouldn't hold ctl_mutex when notifying.. notify might
- trigger a watch callback that would need to get that mutex */
- mutex_unlock(&ctl_mutex);
-
- /* make a best effort, don't error if failed */
- rbd_req_sync_notify(rbd_dev);
-
- ret = count;
- kfree(name);
- return ret;
-
-err_unlock:
- mutex_unlock(&ctl_mutex);
- kfree(name);
return ret;
}
diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h
index 0924e9e41a6..cbe77fa105b 100644
--- a/drivers/block/rbd_types.h
+++ b/drivers/block/rbd_types.h
@@ -15,15 +15,30 @@
#include <linux/types.h>
+/* For format version 2, rbd image 'foo' consists of objects
+ * rbd_id.foo - id of image
+ * rbd_header.<id> - image metadata
+ * rbd_data.<id>.0000000000000000
+ * rbd_data.<id>.0000000000000001
+ * ... - data
+ * Clients do not access header data directly in rbd format 2.
+ */
+
+#define RBD_HEADER_PREFIX "rbd_header."
+#define RBD_DATA_PREFIX "rbd_data."
+#define RBD_ID_PREFIX "rbd_id."
+
/*
- * rbd image 'foo' consists of objects
- * foo.rbd - image metadata
- * foo.00000000
- * foo.00000001
- * ... - data
+ * For format version 1, rbd image 'foo' consists of objects
+ * foo.rbd - image metadata
+ * rb.<idhi>.<idlo>.00000000
+ * rb.<idhi>.<idlo>.00000001
+ * ... - data
+ * There is no notion of a persistent image id in rbd format 1.
*/
#define RBD_SUFFIX ".rbd"
+
#define RBD_DIRECTORY "rbd_directory"
#define RBD_INFO "rbd_info"
@@ -47,7 +62,7 @@ struct rbd_image_snap_ondisk {
struct rbd_image_header_ondisk {
char text[40];
- char block_name[24];
+ char object_prefix[24];
char signature[4];
char version[8];
struct {
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 452e71a1b75..22b6e4583fa 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -205,7 +205,7 @@ static int readpage_nounlock(struct file *filp, struct page *page)
dout("readpage inode %p file %p page %p index %lu\n",
inode, filp, page, page->index);
err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
- page->index << PAGE_CACHE_SHIFT, &len,
+ (u64) page_offset(page), &len,
ci->i_truncate_seq, ci->i_truncate_size,
&page, 1, 0);
if (err == -ENOENT)
@@ -286,7 +286,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
int nr_pages = 0;
int ret;
- off = page->index << PAGE_CACHE_SHIFT;
+ off = (u64) page_offset(page);
/* count pages */
next_index = page->index;
@@ -308,8 +308,8 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
NULL, 0,
ci->i_truncate_seq, ci->i_truncate_size,
NULL, false, 1, 0);
- if (!req)
- return -ENOMEM;
+ if (IS_ERR(req))
+ return PTR_ERR(req);
/* build page vector */
nr_pages = len >> PAGE_CACHE_SHIFT;
@@ -426,7 +426,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
struct ceph_inode_info *ci;
struct ceph_fs_client *fsc;
struct ceph_osd_client *osdc;
- loff_t page_off = page->index << PAGE_CACHE_SHIFT;
+ loff_t page_off = page_offset(page);
int len = PAGE_CACHE_SIZE;
loff_t i_size;
int err = 0;
@@ -817,8 +817,7 @@ get_more_pages:
/* ok */
if (locked_pages == 0) {
/* prepare async write request */
- offset = (unsigned long long)page->index
- << PAGE_CACHE_SHIFT;
+ offset = (u64) page_offset(page);
len = wsize;
req = ceph_osdc_new_request(&fsc->client->osdc,
&ci->i_layout,
@@ -832,8 +831,8 @@ get_more_pages:
ci->i_truncate_size,
&inode->i_mtime, true, 1, 0);
- if (!req) {
- rc = -ENOMEM;
+ if (IS_ERR(req)) {
+ rc = PTR_ERR(req);
unlock_page(page);
break;
}
@@ -1180,7 +1179,7 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
struct inode *inode = vma->vm_file->f_dentry->d_inode;
struct page *page = vmf->page;
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
- loff_t off = page->index << PAGE_CACHE_SHIFT;
+ loff_t off = page_offset(page);
loff_t size, len;
int ret;
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 620daad201d..3251e9cc640 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1005,7 +1005,7 @@ static void __queue_cap_release(struct ceph_mds_session *session,
BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE);
head = msg->front.iov_base;
- head->num = cpu_to_le32(le32_to_cpu(head->num) + 1);
+ le32_add_cpu(&head->num, 1);
item = msg->front.iov_base + msg->front.iov_len;
item->ino = cpu_to_le64(ino);
item->cap_id = cpu_to_le64(cap_id);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index ecebbc09bfc..5840d2aaed1 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -536,8 +536,8 @@ more:
do_sync,
ci->i_truncate_seq, ci->i_truncate_size,
&mtime, false, 2, page_align);
- if (!req)
- return -ENOMEM;
+ if (IS_ERR(req))
+ return PTR_ERR(req);
if (file->f_flags & O_DIRECT) {
pages = ceph_get_direct_page_vector(data, num_pages, false);
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index 1396ceb4679..36549a46e31 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -187,14 +187,18 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
u64 tmp;
struct ceph_object_layout ol;
struct ceph_pg pgid;
+ int r;
/* copy and validate */
if (copy_from_user(&dl, arg, sizeof(dl)))
return -EFAULT;
down_read(&osdc->map_sem);
- ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, &len,
- &dl.object_no, &dl.object_offset, &olen);
+ r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, &len,
+ &dl.object_no, &dl.object_offset,
+ &olen);
+ if (r < 0)
+ return -EIO;
dl.file_offset -= dl.object_offset;
dl.object_size = ceph_file_layout_object_size(ci->i_layout);
dl.block_size = ceph_file_layout_su(ci->i_layout);
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index a5a735422aa..1bcf712655d 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -2625,7 +2625,8 @@ static void check_new_map(struct ceph_mds_client *mdsc,
ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
session_state_name(s->s_state));
- if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
+ if (i >= newmap->m_max_mds ||
+ memcmp(ceph_mdsmap_get_addr(oldmap, i),
ceph_mdsmap_get_addr(newmap, i),
sizeof(struct ceph_entity_addr))) {
if (s->s_state == CEPH_MDS_SESSION_OPENING) {
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 3a42d932637..2eb43f21132 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -307,7 +307,10 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
{
struct ceph_mount_options *fsopt;
const char *dev_name_end;
- int err = -ENOMEM;
+ int err;
+
+ if (!dev_name || !*dev_name)
+ return -EINVAL;
fsopt = kzalloc(sizeof(*fsopt), GFP_KERNEL);
if (!fsopt)
@@ -328,21 +331,33 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
fsopt->congestion_kb = default_congestion_kb();
- /* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */
+ /*
+ * Distinguish the server list from the path in "dev_name".
+ * Internally we do not include the leading '/' in the path.
+ *
+ * "dev_name" will look like:
+ * <server_spec>[,<server_spec>...]:[<path>]
+ * where
+ * <server_spec> is <ip>[:<port>]
+ * <path> is optional, but if present must begin with '/'
+ */
+ dev_name_end = strchr(dev_name, '/');
+ if (dev_name_end) {
+ /* skip over leading '/' for path */
+ *path = dev_name_end + 1;
+ } else {
+ /* path is empty */
+ dev_name_end = dev_name + strlen(dev_name);
+ *path = dev_name_end;
+ }
err = -EINVAL;
- if (!dev_name)
- goto out;
- *path = strstr(dev_name, ":/");
- if (*path == NULL) {
- pr_err("device name is missing path (no :/ in %s)\n",
+ dev_name_end--; /* back up to ':' separator */
+ if (*dev_name_end != ':') {
+ pr_err("device name is missing path (no : separator in %s)\n",
dev_name);
goto out;
}
- dev_name_end = *path;
dout("device name '%.*s'\n", (int)(dev_name_end - dev_name), dev_name);
-
- /* path on server */
- *path += 2;
dout("server path '%s'\n", *path);
*popt = ceph_parse_options(options, dev_name, dev_name_end,
diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h
index 1fb93e9080b..a486f390dfb 100644
--- a/include/linux/ceph/mon_client.h
+++ b/include/linux/ceph/mon_client.h
@@ -71,7 +71,6 @@ struct ceph_mon_client {
int cur_mon; /* last monitor i contacted */
unsigned long sub_sent, sub_renew_after;
struct ceph_connection con;
- bool have_fsid;
/* pending generic requests */
struct rb_root generic_request_tree;
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index cedfb1a8434..d9b880e977e 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -207,7 +207,7 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
struct ceph_msg *msg);
-extern void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
+extern int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
struct ceph_file_layout *layout,
u64 snapid,
u64 off, u64 *plen, u64 *bno,
diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
index 25b930bffea..e37acbe989a 100644
--- a/include/linux/ceph/osdmap.h
+++ b/include/linux/ceph/osdmap.h
@@ -109,9 +109,9 @@ extern struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
extern void ceph_osdmap_destroy(struct ceph_osdmap *map);
/* calculate mapping of a file extent to an object */
-extern void ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
- u64 off, u64 *plen,
- u64 *bno, u64 *oxoff, u64 *oxlen);
+extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
+ u64 off, u64 *plen,
+ u64 *bno, u64 *oxoff, u64 *oxlen);
/* calculate mapping of object to a placement group */
extern int ceph_calc_object_layout(struct ceph_object_layout *ol,
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 900ea0f043f..812eb3b46c1 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -637,7 +637,7 @@ bad:
/*
* Do a synchronous pool op.
*/
-int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
+static int do_poolop(struct ceph_mon_client *monc, u32 op,
u32 pool, u64 snapid,
char *buf, int len)
{
@@ -687,7 +687,7 @@ out:
int ceph_monc_create_snapid(struct ceph_mon_client *monc,
u32 pool, u64 *snapid)
{
- return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
+ return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
pool, 0, (char *)snapid, sizeof(*snapid));
}
@@ -696,7 +696,7 @@ EXPORT_SYMBOL(ceph_monc_create_snapid);
int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
u32 pool, u64 snapid)
{
- return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
+ return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
pool, snapid, 0, 0);
}
@@ -769,7 +769,6 @@ static int build_initial_monmap(struct ceph_mon_client *monc)
monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
}
monc->monmap->num_mon = num_mon;
- monc->have_fsid = false;
return 0;
}
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 42119c05e82..ccbdfbba9e5 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -52,7 +52,7 @@ static int op_has_extent(int op)
op == CEPH_OSD_OP_WRITE);
}
-void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
+int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
struct ceph_file_layout *layout,
u64 snapid,
u64 off, u64 *plen, u64 *bno,
@@ -62,12 +62,15 @@ void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
u64 orig_len = *plen;
u64 objoff, objlen; /* extent in object */
+ int r;
reqhead->snapid = cpu_to_le64(snapid);
/* object extent? */
- ceph_calc_file_object_mapping(layout, off, plen, bno,
- &objoff, &objlen);
+ r = ceph_calc_file_object_mapping(layout, off, plen, bno,
+ &objoff, &objlen);
+ if (r < 0)
+ return r;
if (*plen < orig_len)
dout(" skipping last %llu, final file extent %llu~%llu\n",
orig_len - *plen, off, *plen);
@@ -83,7 +86,7 @@ void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
*bno, objoff, objlen, req->r_num_pages);
-
+ return 0;
}
EXPORT_SYMBOL(ceph_calc_raw_layout);
@@ -112,20 +115,25 @@ EXPORT_SYMBOL(ceph_calc_raw_layout);
*
* fill osd op in request message.
*/
-static void calc_layout(struct ceph_osd_client *osdc,
- struct ceph_vino vino,
- struct ceph_file_layout *layout,
- u64 off, u64 *plen,
- struct ceph_osd_request *req,
- struct ceph_osd_req_op *op)
+static int calc_layout(struct ceph_osd_client *osdc,
+ struct ceph_vino vino,
+ struct ceph_file_layout *layout,
+ u64 off, u64 *plen,
+ struct ceph_osd_request *req,
+ struct ceph_osd_req_op *op)
{
u64 bno;
+ int r;
- ceph_calc_raw_layout(osdc, layout, vino.snap, off,
- plen, &bno, req, op);
+ r = ceph_calc_raw_layout(osdc, layout, vino.snap, off,
+ plen, &bno, req, op);
+ if (r < 0)
+ return r;
snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
req->r_oid_len = strlen(req->r_oid);
+
+ return r;
}
/*
@@ -456,6 +464,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
{
struct ceph_osd_req_op ops[3];
struct ceph_osd_request *req;
+ int r;
ops[0].op = opcode;
ops[0].extent.truncate_seq = truncate_seq;
@@ -474,10 +483,12 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
use_mempool,
GFP_NOFS, NULL, NULL);
if (!req)
- return NULL;
+ return ERR_PTR(-ENOMEM);
/* calculate max write size */
- calc_layout(osdc, vino, layout, off, plen, req, ops);
+ r = calc_layout(osdc, vino, layout, off, plen, req, ops);
+ if (r < 0)
+ return ERR_PTR(r);
req->r_file_layout = *layout; /* keep a copy */
/* in case it differs from natural (file) alignment that
@@ -1920,8 +1931,8 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
NULL, 0, truncate_seq, truncate_size, NULL,
false, 1, page_align);
- if (!req)
- return -ENOMEM;
+ if (IS_ERR(req))
+ return PTR_ERR(req);
/* it may be a short read due to an object boundary */
req->r_pages = pages;
@@ -1963,8 +1974,8 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
snapc, do_sync,
truncate_seq, truncate_size, mtime,
nofail, 1, page_align);
- if (!req)
- return -ENOMEM;
+ if (IS_ERR(req))
+ return PTR_ERR(req);
/* it may be a short write due to an object boundary */
req->r_pages = pages;
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 3124b71a888..5433fb0eb3c 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -984,7 +984,7 @@ bad:
* for now, we write only a single su, until we can
* pass a stride back to the caller.
*/
-void ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
+int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
u64 off, u64 *plen,
u64 *ono,
u64 *oxoff, u64 *oxlen)
@@ -998,11 +998,17 @@ void ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
dout("mapping %llu~%llu osize %u fl_su %u\n", off, *plen,
osize, su);
+ if (su == 0 || sc == 0)
+ goto invalid;
su_per_object = osize / su;
+ if (su_per_object == 0)
+ goto invalid;
dout("osize %u / su %u = su_per_object %u\n", osize, su,
su_per_object);
- BUG_ON((su & ~PAGE_MASK) != 0);
+ if ((su & ~PAGE_MASK) != 0)
+ goto invalid;
+
/* bl = *off / su; */
t = off;
do_div(t, su);
@@ -1030,6 +1036,14 @@ void ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
*plen = *oxlen;
dout(" obj extent %llu~%llu\n", *oxoff, *oxlen);
+ return 0;
+
+invalid:
+ dout(" invalid layout\n");
+ *ono = 0;
+ *oxoff = 0;
+ *oxlen = 0;
+ return -EINVAL;
}
EXPORT_SYMBOL(ceph_calc_file_object_mapping);
diff --git a/net/ceph/pagelist.c b/net/ceph/pagelist.c
index 665cd23020f..92866bebb65 100644
--- a/net/ceph/pagelist.c
+++ b/net/ceph/pagelist.c
@@ -1,4 +1,3 @@
-
#include <linux/module.h>
#include <linux/gfp.h>
#include <linux/pagemap.h>
@@ -134,8 +133,8 @@ int ceph_pagelist_truncate(struct ceph_pagelist *pl,
ceph_pagelist_unmap_tail(pl);
while (pl->head.prev != c->page_lru) {
page = list_entry(pl->head.prev, struct page, lru);
- list_del(&page->lru); /* remove from pagelist */
- list_add_tail(&page->lru, &pl->free_list); /* add to reserve */
+ /* move from pagelist to reserve */
+ list_move_tail(&page->lru, &pl->free_list);
++pl->num_pages_free;
}
pl->room = c->room;