summaryrefslogtreecommitdiffstats
path: root/fs/ceph
diff options
context:
space:
mode:
authorJ. Bruce Fields <bfields@redhat.com>2010-08-26 13:22:27 -0400
committerJ. Bruce Fields <bfields@redhat.com>2010-08-26 13:22:27 -0400
commitf632265d0ffb5acf331252d98c64939849d96bb2 (patch)
tree31187d9a726bf1ca6ca12e26ad8e7c609eaf4d8b /fs/ceph
parent7d94784293096c0a46897acdb83be5abd9278ece (diff)
parentda5cabf80e2433131bf0ed8993abc0f7ea618c73 (diff)
Merge commit 'v2.6.36-rc1' into HEAD
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/Kconfig2
-rw-r--r--fs/ceph/Makefile2
-rw-r--r--fs/ceph/addr.c16
-rw-r--r--fs/ceph/armor.c6
-rw-r--r--fs/ceph/auth.c6
-rw-r--r--fs/ceph/auth_x.c11
-rw-r--r--fs/ceph/buffer.c16
-rw-r--r--fs/ceph/caps.c426
-rw-r--r--fs/ceph/ceph_frag.h4
-rw-r--r--fs/ceph/ceph_fs.c50
-rw-r--r--fs/ceph/ceph_fs.h87
-rw-r--r--fs/ceph/ceph_hash.h4
-rw-r--r--fs/ceph/ceph_strings.c3
-rw-r--r--fs/ceph/crush/crush.h4
-rw-r--r--fs/ceph/crush/hash.h4
-rw-r--r--fs/ceph/crush/mapper.c41
-rw-r--r--fs/ceph/crush/mapper.h4
-rw-r--r--fs/ceph/crypto.c27
-rw-r--r--fs/ceph/crypto.h4
-rw-r--r--fs/ceph/debugfs.c23
-rw-r--r--fs/ceph/decode.h6
-rw-r--r--fs/ceph/dir.c23
-rw-r--r--fs/ceph/file.c34
-rw-r--r--fs/ceph/inode.c32
-rw-r--r--fs/ceph/ioctl.c24
-rw-r--r--fs/ceph/ioctl.h2
-rw-r--r--fs/ceph/locks.c256
-rw-r--r--fs/ceph/mds_client.c306
-rw-r--r--fs/ceph/mds_client.h33
-rw-r--r--fs/ceph/mdsmap.c6
-rw-r--r--fs/ceph/mdsmap.h8
-rw-r--r--fs/ceph/messenger.c96
-rw-r--r--fs/ceph/mon_client.c181
-rw-r--r--fs/ceph/mon_client.h5
-rw-r--r--fs/ceph/msgr.h4
-rw-r--r--fs/ceph/osd_client.c18
-rw-r--r--fs/ceph/osdmap.c63
-rw-r--r--fs/ceph/rados.h13
-rw-r--r--fs/ceph/super.c92
-rw-r--r--fs/ceph/super.h40
-rw-r--r--fs/ceph/xattr.c2
41 files changed, 1426 insertions, 558 deletions
diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig
index 04b8280582a..bc87b9c1d27 100644
--- a/fs/ceph/Kconfig
+++ b/fs/ceph/Kconfig
@@ -2,7 +2,7 @@ config CEPH_FS
tristate "Ceph distributed file system (EXPERIMENTAL)"
depends on INET && EXPERIMENTAL
select LIBCRC32C
- select CONFIG_CRYPTO_AES
+ select CRYPTO_AES
help
Choose Y or M here to include support for mounting the
experimental Ceph distributed file system. Ceph is an extremely
diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile
index 6a660e610be..278e1172600 100644
--- a/fs/ceph/Makefile
+++ b/fs/ceph/Makefile
@@ -6,7 +6,7 @@ ifneq ($(KERNELRELEASE),)
obj-$(CONFIG_CEPH_FS) += ceph.o
-ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o \
+ceph-objs := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
export.o caps.o snap.o xattr.o \
messenger.o msgpool.o buffer.o pagelist.o \
mds_client.o mdsmap.o \
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index d9c60b84949..5598a0d0229 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -309,7 +309,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
zero_user_segment(page, s, PAGE_CACHE_SIZE);
}
- if (add_to_page_cache_lru(page, mapping, page->index, GFP_NOFS)) {
+ if (add_to_page_cache_lru(page, mapping, page->index,
+ GFP_NOFS)) {
page_cache_release(page);
dout("readpages %p add_to_page_cache failed %p\n",
inode, page);
@@ -552,7 +553,7 @@ static void writepages_finish(struct ceph_osd_request *req,
* page truncation thread, possibly losing some data that
* raced its way in
*/
- if ((issued & CEPH_CAP_FILE_CACHE) == 0)
+ if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
generic_error_remove_page(inode->i_mapping, page);
unlock_page(page);
@@ -797,9 +798,12 @@ get_more_pages:
dout("%p will write page %p idx %lu\n",
inode, page, page->index);
- writeback_stat = atomic_long_inc_return(&client->writeback_count);
- if (writeback_stat > CONGESTION_ON_THRESH(client->mount_args->congestion_kb)) {
- set_bdi_congested(&client->backing_dev_info, BLK_RW_ASYNC);
+ writeback_stat =
+ atomic_long_inc_return(&client->writeback_count);
+ if (writeback_stat > CONGESTION_ON_THRESH(
+ client->mount_args->congestion_kb)) {
+ set_bdi_congested(&client->backing_dev_info,
+ BLK_RW_ASYNC);
}
set_page_writeback(page);
@@ -1036,7 +1040,7 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
*pagep = page;
dout("write_begin file %p inode %p page %p %d~%d\n", file,
- inode, page, (int)pos, (int)len);
+ inode, page, (int)pos, (int)len);
r = ceph_update_writeable_page(file, pos, len, page);
} while (r == -EAGAIN);
diff --git a/fs/ceph/armor.c b/fs/ceph/armor.c
index 67b2c030924..eb2a666b0be 100644
--- a/fs/ceph/armor.c
+++ b/fs/ceph/armor.c
@@ -1,11 +1,15 @@
#include <linux/errno.h>
+int ceph_armor(char *dst, const char *src, const char *end);
+int ceph_unarmor(char *dst, const char *src, const char *end);
+
/*
* base64 encode/decode.
*/
-const char *pem_key = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
+static const char *pem_key =
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
static int encode_bits(int c)
{
diff --git a/fs/ceph/auth.c b/fs/ceph/auth.c
index 89490beaf53..6d2e3060062 100644
--- a/fs/ceph/auth.c
+++ b/fs/ceph/auth.c
@@ -20,7 +20,7 @@ static u32 supported_protocols[] = {
CEPH_AUTH_CEPHX
};
-int ceph_auth_init_protocol(struct ceph_auth_client *ac, int protocol)
+static int ceph_auth_init_protocol(struct ceph_auth_client *ac, int protocol)
{
switch (protocol) {
case CEPH_AUTH_NONE:
@@ -133,8 +133,8 @@ bad:
return -ERANGE;
}
-int ceph_build_auth_request(struct ceph_auth_client *ac,
- void *msg_buf, size_t msg_len)
+static int ceph_build_auth_request(struct ceph_auth_client *ac,
+ void *msg_buf, size_t msg_len)
{
struct ceph_mon_request_header *monhdr = msg_buf;
void *p = monhdr + 1;
diff --git a/fs/ceph/auth_x.c b/fs/ceph/auth_x.c
index 83d4d2785ff..582e0b2caf8 100644
--- a/fs/ceph/auth_x.c
+++ b/fs/ceph/auth_x.c
@@ -87,8 +87,8 @@ static int ceph_x_decrypt(struct ceph_crypto_key *secret,
/*
* get existing (or insert new) ticket handler
*/
-struct ceph_x_ticket_handler *get_ticket_handler(struct ceph_auth_client *ac,
- int service)
+static struct ceph_x_ticket_handler *
+get_ticket_handler(struct ceph_auth_client *ac, int service)
{
struct ceph_x_ticket_handler *th;
struct ceph_x_info *xi = ac->private;
@@ -429,7 +429,7 @@ static int ceph_x_build_request(struct ceph_auth_client *ac,
auth->struct_v = 1;
auth->key = 0;
for (u = (u64 *)tmp_enc; u + 1 <= (u64 *)(tmp_enc + ret); u++)
- auth->key ^= *u;
+ auth->key ^= *(__le64 *)u;
dout(" server_challenge %llx client_challenge %llx key %llx\n",
xi->server_challenge, le64_to_cpu(auth->client_challenge),
le64_to_cpu(auth->key));
@@ -493,7 +493,7 @@ static int ceph_x_handle_reply(struct ceph_auth_client *ac, int result,
return -EAGAIN;
}
- op = le32_to_cpu(head->op);
+ op = le16_to_cpu(head->op);
result = le32_to_cpu(head->result);
dout("handle_reply op %d result %d\n", op, result);
switch (op) {
@@ -613,6 +613,9 @@ static void ceph_x_destroy(struct ceph_auth_client *ac)
remove_ticket_handler(ac, th);
}
+ if (xi->auth_authorizer.buf)
+ ceph_buffer_put(xi->auth_authorizer.buf);
+
kfree(ac->private);
ac->private = NULL;
}
diff --git a/fs/ceph/buffer.c b/fs/ceph/buffer.c
index c67535d70aa..cd39f17021d 100644
--- a/fs/ceph/buffer.c
+++ b/fs/ceph/buffer.c
@@ -47,22 +47,6 @@ void ceph_buffer_release(struct kref *kref)
kfree(b);
}
-int ceph_buffer_alloc(struct ceph_buffer *b, int len, gfp_t gfp)
-{
- b->vec.iov_base = kmalloc(len, gfp | __GFP_NOWARN);
- if (b->vec.iov_base) {
- b->is_vmalloc = false;
- } else {
- b->vec.iov_base = __vmalloc(len, gfp, PAGE_KERNEL);
- b->is_vmalloc = true;
- }
- if (!b->vec.iov_base)
- return -ENOMEM;
- b->alloc_len = len;
- b->vec.iov_len = len;
- return 0;
-}
-
int ceph_decode_buffer(struct ceph_buffer **b, void **p, void *end)
{
size_t len;
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index ae3e3a30644..7bf182b0397 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -113,58 +113,41 @@ const char *ceph_cap_string(int caps)
return cap_str[i];
}
-/*
- * Cap reservations
- *
- * Maintain a global pool of preallocated struct ceph_caps, referenced
- * by struct ceph_caps_reservations. This ensures that we preallocate
- * memory needed to successfully process an MDS response. (If an MDS
- * sends us cap information and we fail to process it, we will have
- * problems due to the client and MDS being out of sync.)
- *
- * Reservations are 'owned' by a ceph_cap_reservation context.
- */
-static spinlock_t caps_list_lock;
-static struct list_head caps_list; /* unused (reserved or unreserved) */
-static int caps_total_count; /* total caps allocated */
-static int caps_use_count; /* in use */
-static int caps_reserve_count; /* unused, reserved */
-static int caps_avail_count; /* unused, unreserved */
-static int caps_min_count; /* keep at least this many (unreserved) */
-
-void __init ceph_caps_init(void)
+void ceph_caps_init(struct ceph_mds_client *mdsc)
{
- INIT_LIST_HEAD(&caps_list);
- spin_lock_init(&caps_list_lock);
+ INIT_LIST_HEAD(&mdsc->caps_list);
+ spin_lock_init(&mdsc->caps_list_lock);
}
-void ceph_caps_finalize(void)
+void ceph_caps_finalize(struct ceph_mds_client *mdsc)
{
struct ceph_cap *cap;
- spin_lock(&caps_list_lock);
- while (!list_empty(&caps_list)) {
- cap = list_first_entry(&caps_list, struct ceph_cap, caps_item);
+ spin_lock(&mdsc->caps_list_lock);
+ while (!list_empty(&mdsc->caps_list)) {
+ cap = list_first_entry(&mdsc->caps_list,
+ struct ceph_cap, caps_item);
list_del(&cap->caps_item);
kmem_cache_free(ceph_cap_cachep, cap);
}
- caps_total_count = 0;
- caps_avail_count = 0;
- caps_use_count = 0;
- caps_reserve_count = 0;
- caps_min_count = 0;
- spin_unlock(&caps_list_lock);
+ mdsc->caps_total_count = 0;
+ mdsc->caps_avail_count = 0;
+ mdsc->caps_use_count = 0;
+ mdsc->caps_reserve_count = 0;
+ mdsc->caps_min_count = 0;
+ spin_unlock(&mdsc->caps_list_lock);
}
-void ceph_adjust_min_caps(int delta)
+void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
{
- spin_lock(&caps_list_lock);
- caps_min_count += delta;
- BUG_ON(caps_min_count < 0);
- spin_unlock(&caps_list_lock);
+ spin_lock(&mdsc->caps_list_lock);
+ mdsc->caps_min_count += delta;
+ BUG_ON(mdsc->caps_min_count < 0);
+ spin_unlock(&mdsc->caps_list_lock);
}
-int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
+int ceph_reserve_caps(struct ceph_mds_client *mdsc,
+ struct ceph_cap_reservation *ctx, int need)
{
int i;
struct ceph_cap *cap;
@@ -176,16 +159,17 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
dout("reserve caps ctx=%p need=%d\n", ctx, need);
/* first reserve any caps that are already allocated */
- spin_lock(&caps_list_lock);
- if (caps_avail_count >= need)
+ spin_lock(&mdsc->caps_list_lock);
+ if (mdsc->caps_avail_count >= need)
have = need;
else
- have = caps_avail_count;
- caps_avail_count -= have;
- caps_reserve_count += have;
- BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
- caps_avail_count);
- spin_unlock(&caps_list_lock);
+ have = mdsc->caps_avail_count;
+ mdsc->caps_avail_count -= have;
+ mdsc->caps_reserve_count += have;
+ BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+ mdsc->caps_reserve_count +
+ mdsc->caps_avail_count);
+ spin_unlock(&mdsc->caps_list_lock);
for (i = have; i < need; i++) {
cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
@@ -198,19 +182,20 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
}
BUG_ON(have + alloc != need);
- spin_lock(&caps_list_lock);
- caps_total_count += alloc;
- caps_reserve_count += alloc;
- list_splice(&newcaps, &caps_list);
+ spin_lock(&mdsc->caps_list_lock);
+ mdsc->caps_total_count += alloc;
+ mdsc->caps_reserve_count += alloc;
+ list_splice(&newcaps, &mdsc->caps_list);
- BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
- caps_avail_count);
- spin_unlock(&caps_list_lock);
+ BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+ mdsc->caps_reserve_count +
+ mdsc->caps_avail_count);
+ spin_unlock(&mdsc->caps_list_lock);
ctx->count = need;
dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
- ctx, caps_total_count, caps_use_count, caps_reserve_count,
- caps_avail_count);
+ ctx, mdsc->caps_total_count, mdsc->caps_use_count,
+ mdsc->caps_reserve_count, mdsc->caps_avail_count);
return 0;
out_alloc_count:
@@ -220,92 +205,104 @@ out_alloc_count:
return ret;
}
-int ceph_unreserve_caps(struct ceph_cap_reservation *ctx)
+int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
+ struct ceph_cap_reservation *ctx)
{
dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
if (ctx->count) {
- spin_lock(&caps_list_lock);
- BUG_ON(caps_reserve_count < ctx->count);
- caps_reserve_count -= ctx->count;
- caps_avail_count += ctx->count;
+ spin_lock(&mdsc->caps_list_lock);
+ BUG_ON(mdsc->caps_reserve_count < ctx->count);
+ mdsc->caps_reserve_count -= ctx->count;
+ mdsc->caps_avail_count += ctx->count;
ctx->count = 0;
dout("unreserve caps %d = %d used + %d resv + %d avail\n",
- caps_total_count, caps_use_count, caps_reserve_count,
- caps_avail_count);
- BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
- caps_avail_count);
- spin_unlock(&caps_list_lock);
+ mdsc->caps_total_count, mdsc->caps_use_count,
+ mdsc->caps_reserve_count, mdsc->caps_avail_count);
+ BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+ mdsc->caps_reserve_count +
+ mdsc->caps_avail_count);
+ spin_unlock(&mdsc->caps_list_lock);
}
return 0;
}
-static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
+static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
+ struct ceph_cap_reservation *ctx)
{
struct ceph_cap *cap = NULL;
/* temporary, until we do something about cap import/export */
- if (!ctx)
- return kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
+ if (!ctx) {
+ cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
+ if (cap) {
+ mdsc->caps_use_count++;
+ mdsc->caps_total_count++;
+ }
+ return cap;
+ }
- spin_lock(&caps_list_lock);
+ spin_lock(&mdsc->caps_list_lock);
dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
- ctx, ctx->count, caps_total_count, caps_use_count,
- caps_reserve_count, caps_avail_count);
+ ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count,
+ mdsc->caps_reserve_count, mdsc->caps_avail_count);
BUG_ON(!ctx->count);
- BUG_ON(ctx->count > caps_reserve_count);
- BUG_ON(list_empty(&caps_list));
+ BUG_ON(ctx->count > mdsc->caps_reserve_count);
+ BUG_ON(list_empty(&mdsc->caps_list));
ctx->count--;
- caps_reserve_count--;
- caps_use_count++;
+ mdsc->caps_reserve_count--;
+ mdsc->caps_use_count++;
- cap = list_first_entry(&caps_list, struct ceph_cap, caps_item);
+ cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item);
list_del(&cap->caps_item);
- BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
- caps_avail_count);
- spin_unlock(&caps_list_lock);
+ BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+ mdsc->caps_reserve_count + mdsc->caps_avail_count);
+ spin_unlock(&mdsc->caps_list_lock);
return cap;
}
-void ceph_put_cap(struct ceph_cap *cap)
+void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap)
{
- spin_lock(&caps_list_lock);
+ spin_lock(&mdsc->caps_list_lock);
dout("put_cap %p %d = %d used + %d resv + %d avail\n",
- cap, caps_total_count, caps_use_count,
- caps_reserve_count, caps_avail_count);
- caps_use_count--;
+ cap, mdsc->caps_total_count, mdsc->caps_use_count,
+ mdsc->caps_reserve_count, mdsc->caps_avail_count);
+ mdsc->caps_use_count--;
/*
* Keep some preallocated caps around (ceph_min_count), to
* avoid lots of free/alloc churn.
*/
- if (caps_avail_count >= caps_reserve_count + caps_min_count) {
- caps_total_count--;
+ if (mdsc->caps_avail_count >= mdsc->caps_reserve_count +
+ mdsc->caps_min_count) {
+ mdsc->caps_total_count--;
kmem_cache_free(ceph_cap_cachep, cap);
} else {
- caps_avail_count++;
- list_add(&cap->caps_item, &caps_list);
+ mdsc->caps_avail_count++;
+ list_add(&cap->caps_item, &mdsc->caps_list);
}
- BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
- caps_avail_count);
- spin_unlock(&caps_list_lock);
+ BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+ mdsc->caps_reserve_count + mdsc->caps_avail_count);
+ spin_unlock(&mdsc->caps_list_lock);
}
void ceph_reservation_status(struct ceph_client *client,
int *total, int *avail, int *used, int *reserved,
int *min)
{
+ struct ceph_mds_client *mdsc = &client->mdsc;
+
if (total)
- *total = caps_total_count;
+ *total = mdsc->caps_total_count;
if (avail)
- *avail = caps_avail_count;
+ *avail = mdsc->caps_avail_count;
if (used)
- *used = caps_use_count;
+ *used = mdsc->caps_use_count;
if (reserved)
- *reserved = caps_reserve_count;
+ *reserved = mdsc->caps_reserve_count;
if (min)
- *min = caps_min_count;
+ *min = mdsc->caps_min_count;
}
/*
@@ -330,22 +327,29 @@ static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds)
return NULL;
}
+struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds)
+{
+ struct ceph_cap *cap;
+
+ spin_lock(&ci->vfs_inode.i_lock);
+ cap = __get_cap_for_mds(ci, mds);
+ spin_unlock(&ci->vfs_inode.i_lock);
+ return cap;
+}
+
/*
- * Return id of any MDS with a cap, preferably FILE_WR|WRBUFFER|EXCL, else
- * -1.
+ * Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1.
*/
-static int __ceph_get_cap_mds(struct ceph_inode_info *ci, u32 *mseq)
+static int __ceph_get_cap_mds(struct ceph_inode_info *ci)
{
struct ceph_cap *cap;
int mds = -1;
struct rb_node *p;
- /* prefer mds with WR|WRBUFFER|EXCL caps */
+ /* prefer mds with WR|BUFFER|EXCL caps */
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
cap = rb_entry(p, struct ceph_cap, ci_node);
mds = cap->mds;
- if (mseq)
- *mseq = cap->mseq;
if (cap->issued & (CEPH_CAP_FILE_WR |
CEPH_CAP_FILE_BUFFER |
CEPH_CAP_FILE_EXCL))
@@ -358,7 +362,7 @@ int ceph_get_cap_mds(struct inode *inode)
{
int mds;
spin_lock(&inode->i_lock);
- mds = __ceph_get_cap_mds(ceph_inode(inode), NULL);
+ mds = __ceph_get_cap_mds(ceph_inode(inode));
spin_unlock(&inode->i_lock);
return mds;
}
@@ -477,8 +481,8 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
* Each time we receive FILE_CACHE anew, we increment
* i_rdcache_gen.
*/
- if ((issued & CEPH_CAP_FILE_CACHE) &&
- (had & CEPH_CAP_FILE_CACHE) == 0)
+ if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
+ (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
ci->i_rdcache_gen++;
/*
@@ -537,7 +541,7 @@ retry:
new_cap = NULL;
} else {
spin_unlock(&inode->i_lock);
- new_cap = get_cap(caps_reservation);
+ new_cap = get_cap(mdsc, caps_reservation);
if (new_cap == NULL)
return -ENOMEM;
goto retry;
@@ -582,6 +586,7 @@ retry:
} else {
pr_err("ceph_add_cap: couldn't find snap realm %llx\n",
realmino);
+ WARN_ON(!realm);
}
}
@@ -621,7 +626,7 @@ retry:
if (fmode >= 0)
__ceph_get_fmode(ci, fmode);
spin_unlock(&inode->i_lock);
- wake_up(&ci->i_cap_wq);
+ wake_up_all(&ci->i_cap_wq);
return 0;
}
@@ -825,7 +830,7 @@ int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
{
int want = 0;
int mode;
- for (mode = 0; mode < 4; mode++)
+ for (mode = 0; mode < CEPH_FILE_MODE_NUM; mode++)
if (ci->i_nr_by_mode[mode])
want |= ceph_caps_for_mode(mode);
return want;
@@ -895,7 +900,7 @@ void __ceph_remove_cap(struct ceph_cap *cap)
ci->i_auth_cap = NULL;
if (removed)
- ceph_put_cap(cap);
+ ceph_put_cap(mdsc, cap);
if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
struct ceph_snap_realm *realm = ci->i_snap_realm;
@@ -981,6 +986,46 @@ static int send_cap_msg(struct ceph_mds_session *session,
return 0;
}
+static void __queue_cap_release(struct ceph_mds_session *session,
+ u64 ino, u64 cap_id, u32 migrate_seq,
+ u32 issue_seq)
+{
+ struct ceph_msg *msg;
+ struct ceph_mds_cap_release *head;
+ struct ceph_mds_cap_item *item;
+
+ spin_lock(&session->s_cap_lock);
+ BUG_ON(!session->s_num_cap_releases);
+ msg = list_first_entry(&session->s_cap_releases,
+ struct ceph_msg, list_head);
+
+ dout(" adding %llx release to mds%d msg %p (%d left)\n",
+ ino, session->s_mds, msg, session->s_num_cap_releases);
+
+ BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE);
+ head = msg->front.iov_base;
+ head->num = cpu_to_le32(le32_to_cpu(head->num) + 1);
+ item = msg->front.iov_base + msg->front.iov_len;
+ item->ino = cpu_to_le64(ino);
+ item->cap_id = cpu_to_le64(cap_id);
+ item->migrate_seq = cpu_to_le32(migrate_seq);
+ item->seq = cpu_to_le32(issue_seq);
+
+ session->s_num_cap_releases--;
+
+ msg->front.iov_len += sizeof(*item);
+ if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
+ dout(" release msg %p full\n", msg);
+ list_move_tail(&msg->list_head, &session->s_cap_releases_done);
+ } else {
+ dout(" release msg %p at %d/%d (%d)\n", msg,
+ (int)le32_to_cpu(head->num),
+ (int)CEPH_CAPS_PER_RELEASE,
+ (int)msg->front.iov_len);
+ }
+ spin_unlock(&session->s_cap_lock);
+}
+
/*
* Queue cap releases when an inode is dropped from our cache. Since
* inode is about to be destroyed, there is no need for i_lock.
@@ -994,41 +1039,9 @@ void ceph_queue_caps_release(struct inode *inode)
while (p) {
struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
struct ceph_mds_session *session = cap->session;
- struct ceph_msg *msg;
- struct ceph_mds_cap_release *head;
- struct ceph_mds_cap_item *item;
- spin_lock(&session->s_cap_lock);
- BUG_ON(!session->s_num_cap_releases);
- msg = list_first_entry(&session->s_cap_releases,
- struct ceph_msg, list_head);
-
- dout(" adding %p release to mds%d msg %p (%d left)\n",
- inode, session->s_mds, msg, session->s_num_cap_releases);
-
- BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE);
- head = msg->front.iov_base;
- head->num = cpu_to_le32(le32_to_cpu(head->num) + 1);
- item = msg->front.iov_base + msg->front.iov_len;
- item->ino = cpu_to_le64(ceph_ino(inode));
- item->cap_id = cpu_to_le64(cap->cap_id);
- item->migrate_seq = cpu_to_le32(cap->mseq);
- item->seq = cpu_to_le32(cap->issue_seq);
-
- session->s_num_cap_releases--;
-
- msg->front.iov_len += sizeof(*item);
- if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
- dout(" release msg %p full\n", msg);
- list_move_tail(&msg->list_head,
- &session->s_cap_releases_done);
- } else {
- dout(" release msg %p at %d/%d (%d)\n", msg,
- (int)le32_to_cpu(head->num),
- (int)CEPH_CAPS_PER_RELEASE,
- (int)msg->front.iov_len);
- }
- spin_unlock(&session->s_cap_lock);
+ __queue_cap_release(session, ceph_ino(inode), cap->cap_id,
+ cap->mseq, cap->issue_seq);
p = rb_next(p);
__ceph_remove_cap(cap);
}
@@ -1167,7 +1180,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
}
if (wake)
- wake_up(&ci->i_cap_wq);
+ wake_up_all(&ci->i_cap_wq);
return delayed;
}
@@ -1183,6 +1196,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
*/
void __ceph_flush_snaps(struct ceph_inode_info *ci,
struct ceph_mds_session **psession)
+ __releases(ci->vfs_inode->i_lock)
+ __acquires(ci->vfs_inode->i_lock)
{
struct inode *inode = &ci->vfs_inode;
int mds;
@@ -1218,7 +1233,13 @@ retry:
BUG_ON(capsnap->dirty == 0);
/* pick mds, take s_mutex */
- mds = __ceph_get_cap_mds(ci, &mseq);
+ if (ci->i_auth_cap == NULL) {
+ dout("no auth cap (migrating?), doing nothing\n");
+ goto out;
+ }
+ mds = ci->i_auth_cap->session->s_mds;
+ mseq = ci->i_auth_cap->mseq;
+
if (session && session->s_mds != mds) {
dout("oops, wrong session %p mutex\n", session);
mutex_unlock(&session->s_mutex);
@@ -1237,8 +1258,8 @@ retry:
}
/*
* if session == NULL, we raced against a cap
- * deletion. retry, and we'll get a better
- * @mds value next time.
+ * deletion or migration. retry, and we'll
+ * get a better @mds value next time.
*/
spin_lock(&inode->i_lock);
goto retry;
@@ -1276,6 +1297,7 @@ retry:
list_del_init(&ci->i_snap_flush_item);
spin_unlock(&mdsc->snap_flush_lock);
+out:
if (psession)
*psession = session;
else if (session) {
@@ -1421,7 +1443,6 @@ static int try_nonblocking_invalidate(struct inode *inode)
*/
void ceph_check_caps(struct ceph_inode_info *ci, int flags,
struct ceph_mds_session *session)
- __releases(session->s_mutex)
{
struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
struct ceph_mds_client *mdsc = &client->mdsc;
@@ -1496,11 +1517,13 @@ retry_locked:
ci->i_wrbuffer_ref == 0 && /* no dirty pages... */
ci->i_rdcache_gen && /* may have cached pages */
(file_wanted == 0 || /* no open files */
- (revoking & CEPH_CAP_FILE_CACHE)) && /* or revoking cache */
+ (revoking & (CEPH_CAP_FILE_CACHE|
+ CEPH_CAP_FILE_LAZYIO))) && /* or revoking cache */
!tried_invalidate) {
dout("check_caps trying to invalidate on %p\n", inode);
if (try_nonblocking_invalidate(inode) < 0) {
- if (revoking & CEPH_CAP_FILE_CACHE) {
+ if (revoking & (CEPH_CAP_FILE_CACHE|
+ CEPH_CAP_FILE_LAZYIO)) {
dout("check_caps queuing invalidate\n");
queue_invalidate = 1;
ci->i_rdcache_revoking = ci->i_rdcache_gen;
@@ -2139,7 +2162,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
else if (flushsnaps)
ceph_flush_snaps(ci);
if (wake)
- wake_up(&ci->i_cap_wq);
+ wake_up_all(&ci->i_cap_wq);
if (put)
iput(inode);
}
@@ -2215,7 +2238,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
iput(inode);
} else if (complete_capsnap) {
ceph_flush_snaps(ci);
- wake_up(&ci->i_cap_wq);
+ wake_up_all(&ci->i_cap_wq);
}
if (drop_capsnap)
iput(inode);
@@ -2236,8 +2259,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
struct ceph_mds_session *session,
struct ceph_cap *cap,
struct ceph_buffer *xattr_buf)
- __releases(inode->i_lock)
- __releases(session->s_mutex)
+ __releases(inode->i_lock)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds;
@@ -2264,6 +2286,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
* will invalidate _after_ writeback.)
*/
if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
+ (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
!ci->i_wrbuffer_ref) {
if (try_nonblocking_invalidate(inode) == 0) {
revoked_rdcache = 1;
@@ -2355,15 +2378,22 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
/* revocation, grant, or no-op? */
if (cap->issued & ~newcaps) {
- dout("revocation: %s -> %s\n", ceph_cap_string(cap->issued),
- ceph_cap_string(newcaps));
- if ((used & ~newcaps) & CEPH_CAP_FILE_BUFFER)
- writeback = 1; /* will delay ack */
- else if (dirty & ~newcaps)
- check_caps = 1; /* initiate writeback in check_caps */
- else if (((used & ~newcaps) & CEPH_CAP_FILE_CACHE) == 0 ||
- revoked_rdcache)
- check_caps = 2; /* send revoke ack in check_caps */
+ int revoking = cap->issued & ~newcaps;
+
+ dout("revocation: %s -> %s (revoking %s)\n",
+ ceph_cap_string(cap->issued),
+ ceph_cap_string(newcaps),
+ ceph_cap_string(revoking));
+ if (revoking & used & CEPH_CAP_FILE_BUFFER)
+ writeback = 1; /* initiate writeback; will delay ack */
+ else if (revoking == CEPH_CAP_FILE_CACHE &&
+ (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
+ queue_invalidate)
+ ; /* do nothing yet, invalidation will be queued */
+ else if (cap == ci->i_auth_cap)
+ check_caps = 1; /* check auth cap only */
+ else
+ check_caps = 2; /* check all caps */
cap->issued = newcaps;
cap->implemented |= newcaps;
} else if (cap->issued == newcaps) {
@@ -2391,7 +2421,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
if (queue_invalidate)
ceph_queue_invalidate(inode);
if (wake)
- wake_up(&ci->i_cap_wq);
+ wake_up_all(&ci->i_cap_wq);
if (check_caps == 1)
ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY,
@@ -2446,7 +2476,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
struct ceph_inode_info,
i_flushing_item)->vfs_inode);
mdsc->num_cap_flushing--;
- wake_up(&mdsc->cap_flushing_wq);
+ wake_up_all(&mdsc->cap_flushing_wq);
dout(" inode %p now !flushing\n", inode);
if (ci->i_dirty_caps == 0) {
@@ -2458,7 +2488,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
}
}
spin_unlock(&mdsc->cap_dirty_lock);
- wake_up(&ci->i_cap_wq);
+ wake_up_all(&ci->i_cap_wq);
out:
spin_unlock(&inode->i_lock);
@@ -2554,7 +2584,8 @@ static void handle_cap_trunc(struct inode *inode,
* caller holds s_mutex
*/
static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
- struct ceph_mds_session *session)
+ struct ceph_mds_session *session,
+ int *open_target_sessions)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds;
@@ -2586,6 +2617,12 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
ci->i_cap_exporting_mds = mds;
ci->i_cap_exporting_mseq = mseq;
ci->i_cap_exporting_issued = cap->issued;
+
+ /*
+ * make sure we have open sessions with all possible
+ * export targets, so that we get the matching IMPORT
+ */
+ *open_target_sessions = 1;
}
__ceph_remove_cap(cap);
}
@@ -2655,12 +2692,16 @@ void ceph_handle_caps(struct ceph_mds_session *session,
struct ceph_mds_caps *h;
int mds = session->s_mds;
int op;
- u32 seq;
+ u32 seq, mseq;
struct ceph_vino vino;
u64 cap_id;
u64 size, max_size;
u64 tid;
void *snaptrace;
+ size_t snaptrace_len;
+ void *flock;
+ u32 flock_len;
+ int open_target_sessions = 0;
dout("handle_caps from mds%d\n", mds);
@@ -2669,15 +2710,30 @@ void ceph_handle_caps(struct ceph_mds_session *session,
if (msg->front.iov_len < sizeof(*h))
goto bad;
h = msg->front.iov_base;
- snaptrace = h + 1;
op = le32_to_cpu(h->op);
vino.ino = le64_to_cpu(h->ino);
vino.snap = CEPH_NOSNAP;
cap_id = le64_to_cpu(h->cap_id);
seq = le32_to_cpu(h->seq);
+ mseq = le32_to_cpu(h->migrate_seq);
size = le64_to_cpu(h->size);
max_size = le64_to_cpu(h->max_size);
+ snaptrace = h + 1;
+ snaptrace_len = le32_to_cpu(h->snap_trace_len);
+
+ if (le16_to_cpu(msg->hdr.version) >= 2) {
+ void *p, *end;
+
+ p = snaptrace + snaptrace_len;
+ end = msg->front.iov_base + msg->front.iov_len;
+ ceph_decode_32_safe(&p, end, flock_len, bad);
+ flock = p;
+ } else {
+ flock = NULL;
+ flock_len = 0;
+ }
+
mutex_lock(&session->s_mutex);
session->s_seq++;
dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
@@ -2689,6 +2745,18 @@ void ceph_handle_caps(struct ceph_mds_session *session,
vino.snap, inode);
if (!inode) {
dout(" i don't have ino %llx\n", vino.ino);
+
+ if (op == CEPH_CAP_OP_IMPORT)
+ __queue_cap_release(session, vino.ino, cap_id,
+ mseq, seq);
+
+ /*
+ * send any full release message to try to move things
+ * along for the mds (who clearly thinks we still have this
+ * cap).
+ */
+ ceph_add_cap_releases(mdsc, session);
+ ceph_send_cap_releases(mdsc, session);
goto done;
}
@@ -2699,12 +2767,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
goto done;
case CEPH_CAP_OP_EXPORT:
- handle_cap_export(inode, h, session);
+ handle_cap_export(inode, h, session, &open_target_sessions);
goto done;
case CEPH_CAP_OP_IMPORT:
handle_cap_import(mdsc, inode, h, session,
- snaptrace, le32_to_cpu(h->snap_trace_len));
+ snaptrace, snaptrace_len);
ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY,
session);
goto done_unlocked;
@@ -2714,7 +2782,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
spin_lock(&inode->i_lock);
cap = __get_cap_for_mds(ceph_inode(inode), mds);
if (!cap) {
- dout("no cap on %p ino %llx.%llx from mds%d, releasing\n",
+ dout(" no cap on %p ino %llx.%llx from mds%d\n",
inode, ceph_ino(inode), ceph_snap(inode), mds);
spin_unlock(&inode->i_lock);
goto done;
@@ -2746,6 +2814,8 @@ done:
done_unlocked:
if (inode)
iput(inode);
+ if (open_target_sessions)
+ ceph_mdsc_open_export_target_sessions(mdsc, session);
return;
bad:
@@ -2865,18 +2935,19 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_cap *cap;
struct ceph_mds_request_release *rel = *p;
+ int used, dirty;
int ret = 0;
- int used = 0;
spin_lock(&inode->i_lock);
used = __ceph_caps_used(ci);
+ dirty = __ceph_caps_dirty(ci);
- dout("encode_inode_release %p mds%d used %s drop %s unless %s\n", inode,
- mds, ceph_cap_string(used), ceph_cap_string(drop),
+ dout("encode_inode_release %p mds%d used|dirty %s drop %s unless %s\n",
+ inode, mds, ceph_cap_string(used|dirty), ceph_cap_string(drop),
ceph_cap_string(unless));
- /* only drop unused caps */
- drop &= ~used;
+ /* only drop unused, clean caps */
+ drop &= ~(used | dirty);
cap = __get_cap_for_mds(ci, mds);
if (cap && __cap_is_valid(cap)) {
@@ -2956,6 +3027,7 @@ int ceph_encode_dentry_release(void **p, struct dentry *dentry,
memcpy(*p, dentry->d_name.name, dentry->d_name.len);
*p += dentry->d_name.len;
rel->dname_seq = cpu_to_le32(di->lease_seq);
+ __ceph_mdsc_drop_dentry_lease(dentry);
}
spin_unlock(&dentry->d_lock);
return ret;
diff --git a/fs/ceph/ceph_frag.h b/fs/ceph/ceph_frag.h
index 793f50cb7c2..5babb8e9535 100644
--- a/fs/ceph/ceph_frag.h
+++ b/fs/ceph/ceph_frag.h
@@ -1,5 +1,5 @@
-#ifndef _FS_CEPH_FRAG_H
-#define _FS_CEPH_FRAG_H
+#ifndef FS_CEPH_FRAG_H
+#define FS_CEPH_FRAG_H
/*
* "Frags" are a way to describe a subset of a 32-bit number space,
diff --git a/fs/ceph/ceph_fs.c b/fs/ceph/ceph_fs.c
index 79d76bc4303..3ac6cc7c115 100644
--- a/fs/ceph/ceph_fs.c
+++ b/fs/ceph/ceph_fs.c
@@ -29,46 +29,44 @@ int ceph_file_layout_is_valid(const struct ceph_file_layout *layout)
int ceph_flags_to_mode(int flags)
{
+ int mode;
+
#ifdef O_DIRECTORY /* fixme */
if ((flags & O_DIRECTORY) == O_DIRECTORY)
return CEPH_FILE_MODE_PIN;
#endif
+ if ((flags & O_APPEND) == O_APPEND)
+ flags |= O_WRONLY;
+
+ if ((flags & O_ACCMODE) == O_RDWR)
+ mode = CEPH_FILE_MODE_RDWR;
+ else if ((flags & O_ACCMODE) == O_WRONLY)
+ mode = CEPH_FILE_MODE_WR;
+ else
+ mode = CEPH_FILE_MODE_RD;
+
#ifdef O_LAZY
if (flags & O_LAZY)
- return CEPH_FILE_MODE_LAZY;
+ mode |= CEPH_FILE_MODE_LAZY;
#endif
- if ((flags & O_APPEND) == O_APPEND)
- flags |= O_WRONLY;
- flags &= O_ACCMODE;
- if ((flags & O_RDWR) == O_RDWR)
- return CEPH_FILE_MODE_RDWR;
- if ((flags & O_WRONLY) == O_WRONLY)
- return CEPH_FILE_MODE_WR;
- return CEPH_FILE_MODE_RD;
+ return mode;
}
int ceph_caps_for_mode(int mode)
{
- switch (mode) {
- case CEPH_FILE_MODE_PIN:
- return CEPH_CAP_PIN;
- case CEPH_FILE_MODE_RD:
- return CEPH_CAP_PIN | CEPH_CAP_FILE_SHARED |
+ int caps = CEPH_CAP_PIN;
+
+ if (mode & CEPH_FILE_MODE_RD)
+ caps |= CEPH_CAP_FILE_SHARED |
CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE;
- case CEPH_FILE_MODE_RDWR:
- return CEPH_CAP_PIN | CEPH_CAP_FILE_SHARED |
- CEPH_CAP_FILE_EXCL |
- CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE |
- CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER |
- CEPH_CAP_AUTH_SHARED | CEPH_CAP_AUTH_EXCL |
- CEPH_CAP_XATTR_SHARED | CEPH_CAP_XATTR_EXCL;
- case CEPH_FILE_MODE_WR:
- return CEPH_CAP_PIN | CEPH_CAP_FILE_SHARED |
- CEPH_CAP_FILE_EXCL |
+ if (mode & CEPH_FILE_MODE_WR)
+ caps |= CEPH_CAP_FILE_EXCL |
CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER |
CEPH_CAP_AUTH_SHARED | CEPH_CAP_AUTH_EXCL |
CEPH_CAP_XATTR_SHARED | CEPH_CAP_XATTR_EXCL;
- }
- return 0;
+ if (mode & CEPH_FILE_MODE_LAZY)
+ caps |= CEPH_CAP_FILE_LAZYIO;
+
+ return caps;
}
diff --git a/fs/ceph/ceph_fs.h b/fs/ceph/ceph_fs.h
index 2fa992eaf7d..d5619ac8671 100644
--- a/fs/ceph/ceph_fs.h
+++ b/fs/ceph/ceph_fs.h
@@ -9,27 +9,13 @@
* LGPL2
*/
-#ifndef _FS_CEPH_CEPH_FS_H
-#define _FS_CEPH_CEPH_FS_H
+#ifndef CEPH_FS_H
+#define CEPH_FS_H
#include "msgr.h"
#include "rados.h"
/*
- * Ceph release version
- */
-#define CEPH_VERSION_MAJOR 0
-#define CEPH_VERSION_MINOR 20
-#define CEPH_VERSION_PATCH 0
-
-#define _CEPH_STRINGIFY(x) #x
-#define CEPH_STRINGIFY(x) _CEPH_STRINGIFY(x)
-#define CEPH_MAKE_VERSION(x, y, z) CEPH_STRINGIFY(x) "." CEPH_STRINGIFY(y) \
- "." CEPH_STRINGIFY(z)
-#define CEPH_VERSION CEPH_MAKE_VERSION(CEPH_VERSION_MAJOR, \
- CEPH_VERSION_MINOR, CEPH_VERSION_PATCH)
-
-/*
* subprotocol versions. when specific messages types or high-level
* protocols change, bump the affected components. we keep rev
* internal cluster protocols separately from the public,
@@ -53,18 +39,10 @@
/*
* feature bits
*/
-#define CEPH_FEATURE_UID 1
-#define CEPH_FEATURE_NOSRCADDR 2
-#define CEPH_FEATURE_FLOCK 4
-
-#define CEPH_FEATURE_SUPPORTED_MON CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR
-#define CEPH_FEATURE_REQUIRED_MON CEPH_FEATURE_UID
-#define CEPH_FEATURE_SUPPORTED_MDS CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR|CEPH_FEATURE_FLOCK
-#define CEPH_FEATURE_REQUIRED_MDS CEPH_FEATURE_UID
-#define CEPH_FEATURE_SUPPORTED_OSD CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR
-#define CEPH_FEATURE_REQUIRED_OSD CEPH_FEATURE_UID
-#define CEPH_FEATURE_SUPPORTED_CLIENT CEPH_FEATURE_NOSRCADDR
-#define CEPH_FEATURE_REQUIRED_CLIENT CEPH_FEATURE_NOSRCADDR
+#define CEPH_FEATURE_UID (1<<0)
+#define CEPH_FEATURE_NOSRCADDR (1<<1)
+#define CEPH_FEATURE_MONCLOCKCHECK (1<<2)
+#define CEPH_FEATURE_FLOCK (1<<3)
/*
@@ -96,6 +74,8 @@ int ceph_file_layout_is_valid(const struct ceph_file_layout *layout);
#define CEPH_CRYPTO_NONE 0x0
#define CEPH_CRYPTO_AES 0x1
+#define CEPH_AES_IV "cephsageyudagreg"
+
/* security/authentication protocols */
#define CEPH_AUTH_UNKNOWN 0x0
#define CEPH_AUTH_NONE 0x1
@@ -275,6 +255,7 @@ extern const char *ceph_mds_state_name(int s);
#define CEPH_LOCK_IDFT 512 /* dir frag tree */
#define CEPH_LOCK_INEST 1024 /* mds internal */
#define CEPH_LOCK_IXATTR 2048
+#define CEPH_LOCK_IFLOCK 4096 /* advisory file locks */
#define CEPH_LOCK_INO 8192 /* immutable inode bits; not a lock */
/* client_session ops */
@@ -316,6 +297,8 @@ enum {
CEPH_MDS_OP_RMXATTR = 0x01106,
CEPH_MDS_OP_SETLAYOUT = 0x01107,
CEPH_MDS_OP_SETATTR = 0x01108,
+ CEPH_MDS_OP_SETFILELOCK= 0x01109,
+ CEPH_MDS_OP_GETFILELOCK= 0x00110,
CEPH_MDS_OP_MKNOD = 0x01201,
CEPH_MDS_OP_LINK = 0x01202,
@@ -386,6 +369,15 @@ union ceph_mds_request_args {
struct {
struct ceph_file_layout layout;
} __attribute__ ((packed)) setlayout;
+ struct {
+ __u8 rule; /* currently fcntl or flock */
+ __u8 type; /* shared, exclusive, remove*/
+ __le64 pid; /* process id requesting the lock */
+ __le64 pid_namespace;
+ __le64 start; /* initial location to lock */
+ __le64 length; /* num bytes to lock from start */
+ __u8 wait; /* will caller wait for lock to become available? */
+ } __attribute__ ((packed)) filelock_change;
} __attribute__ ((packed));
#define CEPH_MDS_FLAG_REPLAY 1 /* this is a replayed op */
@@ -480,6 +472,23 @@ struct ceph_mds_reply_dirfrag {
__le32 dist[];
} __attribute__ ((packed));
+#define CEPH_LOCK_FCNTL 1
+#define CEPH_LOCK_FLOCK 2
+
+#define CEPH_LOCK_SHARED 1
+#define CEPH_LOCK_EXCL 2
+#define CEPH_LOCK_UNLOCK 4
+
+struct ceph_filelock {
+ __le64 start;/* file offset to start lock at */
+ __le64 length; /* num bytes to lock; 0 for all following start */
+ __le64 client; /* which client holds the lock */
+ __le64 pid; /* process id holding the lock on the client */
+ __le64 pid_namespace;
+ __u8 type; /* shared lock, exclusive lock, or unlock */
+} __attribute__ ((packed));
+
+
/* file access modes */
#define CEPH_FILE_MODE_PIN 0
#define CEPH_FILE_MODE_RD 1
@@ -508,9 +517,10 @@ int ceph_flags_to_mode(int flags);
#define CEPH_CAP_SAUTH 2
#define CEPH_CAP_SLINK 4
#define CEPH_CAP_SXATTR 6
-#define CEPH_CAP_SFILE 8 /* goes at the end (uses >2 cap bits) */
+#define CEPH_CAP_SFILE 8
+#define CEPH_CAP_SFLOCK 20
-#define CEPH_CAP_BITS 16
+#define CEPH_CAP_BITS 22
/* composed values */
#define CEPH_CAP_AUTH_SHARED (CEPH_CAP_GSHARED << CEPH_CAP_SAUTH)
@@ -528,6 +538,9 @@ int ceph_flags_to_mode(int flags);
#define CEPH_CAP_FILE_BUFFER (CEPH_CAP_GBUFFER << CEPH_CAP_SFILE)
#define CEPH_CAP_FILE_WREXTEND (CEPH_CAP_GWREXTEND << CEPH_CAP_SFILE)
#define CEPH_CAP_FILE_LAZYIO (CEPH_CAP_GLAZYIO << CEPH_CAP_SFILE)
+#define CEPH_CAP_FLOCK_SHARED (CEPH_CAP_GSHARED << CEPH_CAP_SFLOCK)
+#define CEPH_CAP_FLOCK_EXCL (CEPH_CAP_GEXCL << CEPH_CAP_SFLOCK)
+
/* cap masks (for getattr) */
#define CEPH_STAT_CAP_INODE CEPH_CAP_PIN
@@ -563,7 +576,8 @@ int ceph_flags_to_mode(int flags);
CEPH_CAP_FILE_EXCL)
#define CEPH_CAP_ANY_WR (CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_FILE_WR)
#define CEPH_CAP_ANY (CEPH_CAP_ANY_RD | CEPH_CAP_ANY_EXCL | \
- CEPH_CAP_ANY_FILE_WR | CEPH_CAP_PIN)
+ CEPH_CAP_ANY_FILE_WR | CEPH_CAP_FILE_LAZYIO | \
+ CEPH_CAP_PIN)
#define CEPH_CAP_LOCKS (CEPH_LOCK_IFILE | CEPH_LOCK_IAUTH | CEPH_LOCK_ILINK | \
CEPH_LOCK_IXATTR)
@@ -653,12 +667,21 @@ struct ceph_mds_cap_reconnect {
__le64 cap_id;
__le32 wanted;
__le32 issued;
+ __le64 snaprealm;
+ __le64 pathbase; /* base ino for our path to this ino */
+ __le32 flock_len; /* size of flock state blob, if any */
+} __attribute__ ((packed));
+/* followed by flock blob */
+
+struct ceph_mds_cap_reconnect_v1 {
+ __le64 cap_id;
+ __le32 wanted;
+ __le32 issued;
__le64 size;
struct ceph_timespec mtime, atime;
__le64 snaprealm;
__le64 pathbase; /* base ino for our path to this ino */
} __attribute__ ((packed));
-/* followed by encoded string */
struct ceph_mds_snaprealm_reconnect {
__le64 ino; /* snap realm base */
diff --git a/fs/ceph/ceph_hash.h b/fs/ceph/ceph_hash.h
index 5ac470c433c..d099c3f9023 100644
--- a/fs/ceph/ceph_hash.h
+++ b/fs/ceph/ceph_hash.h
@@ -1,5 +1,5 @@
-#ifndef _FS_CEPH_HASH_H
-#define _FS_CEPH_HASH_H
+#ifndef FS_CEPH_HASH_H
+#define FS_CEPH_HASH_H
#define CEPH_STR_HASH_LINUX 0x1 /* linux dcache hash */
#define CEPH_STR_HASH_RJENKINS 0x2 /* robert jenkins' */
diff --git a/fs/ceph/ceph_strings.c b/fs/ceph/ceph_strings.c
index 7503aee828c..c6179d3a26a 100644
--- a/fs/ceph/ceph_strings.c
+++ b/fs/ceph/ceph_strings.c
@@ -28,6 +28,7 @@ const char *ceph_osd_op_name(int op)
case CEPH_OSD_OP_TRUNCATE: return "truncate";
case CEPH_OSD_OP_ZERO: return "zero";
case CEPH_OSD_OP_WRITEFULL: return "writefull";
+ case CEPH_OSD_OP_ROLLBACK: return "rollback";
case CEPH_OSD_OP_APPEND: return "append";
case CEPH_OSD_OP_STARTSYNC: return "startsync";
@@ -129,6 +130,8 @@ const char *ceph_mds_op_name(int op)
case CEPH_MDS_OP_LSSNAP: return "lssnap";
case CEPH_MDS_OP_MKSNAP: return "mksnap";
case CEPH_MDS_OP_RMSNAP: return "rmsnap";
+ case CEPH_MDS_OP_SETFILELOCK: return "setfilelock";
+ case CEPH_MDS_OP_GETFILELOCK: return "getfilelock";
}
return "???";
}
diff --git a/fs/ceph/crush/crush.h b/fs/ceph/crush/crush.h
index dcd7e752370..97e435b191f 100644
--- a/fs/ceph/crush/crush.h
+++ b/fs/ceph/crush/crush.h
@@ -1,5 +1,5 @@
-#ifndef _CRUSH_CRUSH_H
-#define _CRUSH_CRUSH_H
+#ifndef CEPH_CRUSH_CRUSH_H
+#define CEPH_CRUSH_CRUSH_H
#include <linux/types.h>
diff --git a/fs/ceph/crush/hash.h b/fs/ceph/crush/hash.h
index ff48e110e4b..91e884230d5 100644
--- a/fs/ceph/crush/hash.h
+++ b/fs/ceph/crush/hash.h
@@ -1,5 +1,5 @@
-#ifndef _CRUSH_HASH_H
-#define _CRUSH_HASH_H
+#ifndef CEPH_CRUSH_HASH_H
+#define CEPH_CRUSH_HASH_H
#define CRUSH_HASH_RJENKINS1 0
diff --git a/fs/ceph/crush/mapper.c b/fs/ceph/crush/mapper.c
index 9ba54efb654..a4eec133258 100644
--- a/fs/ceph/crush/mapper.c
+++ b/fs/ceph/crush/mapper.c
@@ -238,7 +238,7 @@ static int bucket_straw_choose(struct crush_bucket_straw *bucket,
static int crush_bucket_choose(struct crush_bucket *in, int x, int r)
{
- dprintk("choose %d x=%d r=%d\n", in->id, x, r);
+ dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r);
switch (in->alg) {
case CRUSH_BUCKET_UNIFORM:
return bucket_uniform_choose((struct crush_bucket_uniform *)in,
@@ -264,7 +264,7 @@ static int crush_bucket_choose(struct crush_bucket *in, int x, int r)
*/
static int is_out(struct crush_map *map, __u32 *weight, int item, int x)
{
- if (weight[item] >= 0x1000)
+ if (weight[item] >= 0x10000)
return 0;
if (weight[item] == 0)
return 1;
@@ -305,7 +305,9 @@ static int crush_choose(struct crush_map *map,
int itemtype;
int collide, reject;
const int orig_tries = 5; /* attempts before we fall back to search */
- dprintk("choose bucket %d x %d outpos %d\n", bucket->id, x, outpos);
+
+ dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "",
+ bucket->id, x, outpos, numrep);
for (rep = outpos; rep < numrep; rep++) {
/* keep trying until we get a non-out, non-colliding item */
@@ -366,6 +368,7 @@ static int crush_choose(struct crush_map *map,
BUG_ON(item >= 0 ||
(-1-item) >= map->max_buckets);
in = map->buckets[-1-item];
+ retry_bucket = 1;
continue;
}
@@ -377,15 +380,25 @@ static int crush_choose(struct crush_map *map,
}
}
- if (recurse_to_leaf &&
- item < 0 &&
- crush_choose(map, map->buckets[-1-item],
- weight,
- x, outpos+1, 0,
- out2, outpos,
- firstn, 0, NULL) <= outpos) {
- reject = 1;
- } else {
+ reject = 0;
+ if (recurse_to_leaf) {
+ if (item < 0) {
+ if (crush_choose(map,
+ map->buckets[-1-item],
+ weight,
+ x, outpos+1, 0,
+ out2, outpos,
+ firstn, 0,
+ NULL) <= outpos)
+ /* didn't get leaf */
+ reject = 1;
+ } else {
+ /* we already have a leaf! */
+ out2[outpos] = item;
+ }
+ }
+
+ if (!reject) {
/* out? */
if (itemtype == 0)
reject = is_out(map, weight,
@@ -424,12 +437,12 @@ reject:
continue;
}
- dprintk("choose got %d\n", item);
+ dprintk("CHOOSE got %d\n", item);
out[outpos] = item;
outpos++;
}
- dprintk("choose returns %d\n", outpos);
+ dprintk("CHOOSE returns %d\n", outpos);
return outpos;
}
diff --git a/fs/ceph/crush/mapper.h b/fs/ceph/crush/mapper.h
index 98e90046fd9..c46b99c18bb 100644
--- a/fs/ceph/crush/mapper.h
+++ b/fs/ceph/crush/mapper.h
@@ -1,5 +1,5 @@
-#ifndef _CRUSH_MAPPER_H
-#define _CRUSH_MAPPER_H
+#ifndef CEPH_CRUSH_MAPPER_H
+#define CEPH_CRUSH_MAPPER_H
/*
* CRUSH functions for find rules and then mapping an input to an
diff --git a/fs/ceph/crypto.c b/fs/ceph/crypto.c
index f704b3b6242..a3e627f6329 100644
--- a/fs/ceph/crypto.c
+++ b/fs/ceph/crypto.c
@@ -75,10 +75,11 @@ static struct crypto_blkcipher *ceph_crypto_alloc_cipher(void)
return crypto_alloc_blkcipher("cbc(aes)", 0, CRYPTO_ALG_ASYNC);
}
-const u8 *aes_iv = "cephsageyudagreg";
+static const u8 *aes_iv = (u8 *)CEPH_AES_IV;
-int ceph_aes_encrypt(const void *key, int key_len, void *dst, size_t *dst_len,
- const void *src, size_t src_len)
+static int ceph_aes_encrypt(const void *key, int key_len,
+ void *dst, size_t *dst_len,
+ const void *src, size_t src_len)
{
struct scatterlist sg_in[2], sg_out[1];
struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher();
@@ -126,9 +127,10 @@ int ceph_aes_encrypt(const void *key, int key_len, void *dst, size_t *dst_len,
return 0;
}
-int ceph_aes_encrypt2(const void *key, int key_len, void *dst, size_t *dst_len,
- const void *src1, size_t src1_len,
- const void *src2, size_t src2_len)
+static int ceph_aes_encrypt2(const void *key, int key_len, void *dst,
+ size_t *dst_len,
+ const void *src1, size_t src1_len,
+ const void *src2, size_t src2_len)
{
struct scatterlist sg_in[3], sg_out[1];
struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher();
@@ -179,8 +181,9 @@ int ceph_aes_encrypt2(const void *key, int key_len, void *dst, size_t *dst_len,
return 0;
}
-int ceph_aes_decrypt(const void *key, int key_len, void *dst, size_t *dst_len,
- const void *src, size_t src_len)
+static int ceph_aes_decrypt(const void *key, int key_len,
+ void *dst, size_t *dst_len,
+ const void *src, size_t src_len)
{
struct scatterlist sg_in[1], sg_out[2];
struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher();
@@ -238,10 +241,10 @@ int ceph_aes_decrypt(const void *key, int key_len, void *dst, size_t *dst_len,
return 0;
}
-int ceph_aes_decrypt2(const void *key, int key_len,
- void *dst1, size_t *dst1_len,
- void *dst2, size_t *dst2_len,
- const void *src, size_t src_len)
+static int ceph_aes_decrypt2(const void *key, int key_len,
+ void *dst1, size_t *dst1_len,
+ void *dst2, size_t *dst2_len,
+ const void *src, size_t src_len)
{
struct scatterlist sg_in[1], sg_out[3];
struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher();
diff --git a/fs/ceph/crypto.h b/fs/ceph/crypto.h
index 40b502e6bd8..bdf38607323 100644
--- a/fs/ceph/crypto.h
+++ b/fs/ceph/crypto.h
@@ -42,7 +42,7 @@ extern int ceph_encrypt2(struct ceph_crypto_key *secret,
const void *src2, size_t src2_len);
/* armor.c */
-extern int ceph_armor(char *dst, const void *src, const void *end);
-extern int ceph_unarmor(void *dst, const char *src, const char *end);
+extern int ceph_armor(char *dst, const char *src, const char *end);
+extern int ceph_unarmor(char *dst, const char *src, const char *end);
#endif
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 3be33fb066c..360c4f22718 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -261,7 +261,7 @@ static int osdc_show(struct seq_file *s, void *pp)
static int caps_show(struct seq_file *s, void *p)
{
- struct ceph_client *client = p;
+ struct ceph_client *client = s->private;
int total, avail, used, reserved, min;
ceph_reservation_status(client, &total, &avail, &used, &reserved, &min);
@@ -291,7 +291,7 @@ static int dentry_lru_show(struct seq_file *s, void *ptr)
return 0;
}
-#define DEFINE_SHOW_FUNC(name) \
+#define DEFINE_SHOW_FUNC(name) \
static int name##_open(struct inode *inode, struct file *file) \
{ \
struct seq_file *sf; \
@@ -361,8 +361,8 @@ int ceph_debugfs_client_init(struct ceph_client *client)
int ret = 0;
char name[80];
- snprintf(name, sizeof(name), FSID_FORMAT ".client%lld",
- PR_FSID(&client->fsid), client->monc.auth->global_id);
+ snprintf(name, sizeof(name), "%pU.client%lld", &client->fsid,
+ client->monc.auth->global_id);
client->debugfs_dir = debugfs_create_dir(name, ceph_debugfs_dir);
if (!client->debugfs_dir)
@@ -432,11 +432,12 @@ int ceph_debugfs_client_init(struct ceph_client *client)
if (!client->debugfs_caps)
goto out;
- client->debugfs_congestion_kb = debugfs_create_file("writeback_congestion_kb",
- 0600,
- client->debugfs_dir,
- client,
- &congestion_kb_fops);
+ client->debugfs_congestion_kb =
+ debugfs_create_file("writeback_congestion_kb",
+ 0600,
+ client->debugfs_dir,
+ client,
+ &congestion_kb_fops);
if (!client->debugfs_congestion_kb)
goto out;
@@ -466,7 +467,7 @@ void ceph_debugfs_client_cleanup(struct ceph_client *client)
debugfs_remove(client->debugfs_dir);
}
-#else // CONFIG_DEBUG_FS
+#else /* CONFIG_DEBUG_FS */
int __init ceph_debugfs_init(void)
{
@@ -486,4 +487,4 @@ void ceph_debugfs_client_cleanup(struct ceph_client *client)
{
}
-#endif // CONFIG_DEBUG_FS
+#endif /* CONFIG_DEBUG_FS */
diff --git a/fs/ceph/decode.h b/fs/ceph/decode.h
index 65b3e022eaf..3d25415afe6 100644
--- a/fs/ceph/decode.h
+++ b/fs/ceph/decode.h
@@ -99,11 +99,13 @@ static inline void ceph_encode_timespec(struct ceph_timespec *tv,
*/
static inline void ceph_encode_addr(struct ceph_entity_addr *a)
{
- a->in_addr.ss_family = htons(a->in_addr.ss_family);
+ __be16 ss_family = htons(a->in_addr.ss_family);
+ a->in_addr.ss_family = *(__u16 *)&ss_family;
}
static inline void ceph_decode_addr(struct ceph_entity_addr *a)
{
- a->in_addr.ss_family = ntohs(a->in_addr.ss_family);
+ __be16 ss_family = *(__be16 *)&a->in_addr.ss_family;
+ a->in_addr.ss_family = ntohs(ss_family);
WARN_ON(a->in_addr.ss_family == 512);
}
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index f85719310db..67bbb41d552 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -27,7 +27,7 @@
const struct inode_operations ceph_dir_iops;
const struct file_operations ceph_dir_fops;
-struct dentry_operations ceph_dentry_ops;
+const struct dentry_operations ceph_dentry_ops;
/*
* Initialize ceph dentry state.
@@ -94,6 +94,8 @@ static unsigned fpos_off(loff_t p)
*/
static int __dcache_readdir(struct file *filp,
void *dirent, filldir_t filldir)
+ __releases(inode->i_lock)
+ __acquires(inode->i_lock)
{
struct inode *inode = filp->f_dentry->d_inode;
struct ceph_file_info *fi = filp->private_data;
@@ -266,6 +268,7 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
spin_lock(&inode->i_lock);
if ((filp->f_pos == 2 || fi->dentry) &&
!ceph_test_opt(client, NOASYNCREADDIR) &&
+ ceph_snap(inode) != CEPH_SNAPDIR &&
(ci->i_ceph_flags & CEPH_I_COMPLETE) &&
__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
err = __dcache_readdir(filp, dirent, filldir);
@@ -1013,18 +1016,22 @@ out_touch:
/*
* When a dentry is released, clear the dir I_COMPLETE if it was part
- * of the current dir gen.
+ * of the current dir gen or if this is in the snapshot namespace.
*/
static void ceph_dentry_release(struct dentry *dentry)
{
struct ceph_dentry_info *di = ceph_dentry(dentry);
struct inode *parent_inode = dentry->d_parent->d_inode;
+ u64 snapid = ceph_snap(parent_inode);
- if (parent_inode) {
+ dout("dentry_release %p parent %p\n", dentry, parent_inode);
+
+ if (parent_inode && snapid != CEPH_SNAPDIR) {
struct ceph_inode_info *ci = ceph_inode(parent_inode);
spin_lock(&parent_inode->i_lock);
- if (ci->i_shared_gen == di->lease_shared_gen) {
+ if (ci->i_shared_gen == di->lease_shared_gen ||
+ snapid <= CEPH_MAXSNAP) {
dout(" clearing %p complete (d_release)\n",
parent_inode);
ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
@@ -1234,14 +1241,16 @@ const struct inode_operations ceph_dir_iops = {
.create = ceph_create,
};
-struct dentry_operations ceph_dentry_ops = {
+const struct dentry_operations ceph_dentry_ops = {
.d_revalidate = ceph_d_revalidate,
.d_release = ceph_dentry_release,
};
-struct dentry_operations ceph_snapdir_dentry_ops = {
+const struct dentry_operations ceph_snapdir_dentry_ops = {
.d_revalidate = ceph_snapdir_d_revalidate,
+ .d_release = ceph_dentry_release,
};
-struct dentry_operations ceph_snap_dentry_ops = {
+const struct dentry_operations ceph_snap_dentry_ops = {
+ .d_release = ceph_dentry_release,
};
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 6251a1574b9..8c044a4f045 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -265,7 +265,7 @@ int ceph_release(struct inode *inode, struct file *file)
kmem_cache_free(ceph_file_cachep, cf);
/* wake up anyone waiting for caps on this inode */
- wake_up(&ci->i_cap_wq);
+ wake_up_all(&ci->i_cap_wq);
return 0;
}
@@ -317,7 +317,7 @@ void ceph_release_page_vector(struct page **pages, int num_pages)
/*
* allocate a vector new pages
*/
-struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags)
+static struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags)
{
struct page **pages;
int i;
@@ -665,7 +665,7 @@ more:
* throw out any page cache pages in this range. this
* may block.
*/
- truncate_inode_pages_range(inode->i_mapping, pos,
+ truncate_inode_pages_range(inode->i_mapping, pos,
(pos+len) | (PAGE_CACHE_SIZE-1));
} else {
pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
@@ -740,28 +740,32 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov,
unsigned long nr_segs, loff_t pos)
{
struct file *filp = iocb->ki_filp;
+ struct ceph_file_info *fi = filp->private_data;
loff_t *ppos = &iocb->ki_pos;
size_t len = iov->iov_len;
struct inode *inode = filp->f_dentry->d_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
- void *base = iov->iov_base;
+ void __user *base = iov->iov_base;
ssize_t ret;
- int got = 0;
+ int want, got = 0;
int checkeof = 0, read = 0;
dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
inode, ceph_vinop(inode), pos, (unsigned)len, inode);
again:
__ceph_do_pending_vmtruncate(inode);
- ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_CACHE,
- &got, -1);
+ if (fi->fmode & CEPH_FILE_MODE_LAZY)
+ want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
+ else
+ want = CEPH_CAP_FILE_CACHE;
+ ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
if (ret < 0)
goto out;
dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
inode, ceph_vinop(inode), pos, (unsigned)len,
ceph_cap_string(got));
- if ((got & CEPH_CAP_FILE_CACHE) == 0 ||
+ if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
(iocb->ki_filp->f_flags & O_DIRECT) ||
(inode->i_sb->s_flags & MS_SYNCHRONOUS))
/* hmm, this isn't really async... */
@@ -807,11 +811,12 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
unsigned long nr_segs, loff_t pos)
{
struct file *file = iocb->ki_filp;
+ struct ceph_file_info *fi = file->private_data;
struct inode *inode = file->f_dentry->d_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_osd_client *osdc = &ceph_sb_to_client(inode->i_sb)->osdc;
loff_t endoff = pos + iov->iov_len;
- int got = 0;
+ int want, got = 0;
int ret, err;
if (ceph_snap(inode) != CEPH_NOSNAP)
@@ -824,8 +829,11 @@ retry_snap:
dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
inode->i_size);
- ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
- &got, endoff);
+ if (fi->fmode & CEPH_FILE_MODE_LAZY)
+ want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
+ else
+ want = CEPH_CAP_FILE_BUFFER;
+ ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
if (ret < 0)
goto out;
@@ -833,7 +841,7 @@ retry_snap:
inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
ceph_cap_string(got));
- if ((got & CEPH_CAP_FILE_BUFFER) == 0 ||
+ if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
(iocb->ki_filp->f_flags & O_DIRECT) ||
(inode->i_sb->s_flags & MS_SYNCHRONOUS)) {
ret = ceph_sync_write(file, iov->iov_base, iov->iov_len,
@@ -930,6 +938,8 @@ const struct file_operations ceph_file_fops = {
.aio_write = ceph_aio_write,
.mmap = ceph_mmap,
.fsync = ceph_fsync,
+ .lock = ceph_lock,
+ .flock = ceph_flock,
.splice_read = generic_file_splice_read,
.splice_write = generic_file_splice_write,
.unlocked_ioctl = ceph_ioctl,
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 226f5a50d36..5d893d31e39 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -442,8 +442,9 @@ int ceph_fill_file_size(struct inode *inode, int issued,
* the file is either opened or mmaped
*/
if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_RD|
- CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER|
- CEPH_CAP_FILE_EXCL)) ||
+ CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER|
+ CEPH_CAP_FILE_EXCL|
+ CEPH_CAP_FILE_LAZYIO)) ||
mapping_mapped(inode->i_mapping) ||
__ceph_caps_file_wanted(ci)) {
ci->i_truncate_pending++;
@@ -827,7 +828,7 @@ static void ceph_set_dentry_offset(struct dentry *dn)
spin_lock(&dcache_lock);
spin_lock(&dn->d_lock);
- list_move_tail(&dir->d_subdirs, &dn->d_u.d_child);
+ list_move(&dn->d_u.d_child, &dir->d_subdirs);
dout("set_dentry_offset %p %lld (%p %p)\n", dn, di->offset,
dn->d_u.d_child.prev, dn->d_u.d_child.next);
spin_unlock(&dn->d_lock);
@@ -854,8 +855,8 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in,
d_drop(dn);
realdn = d_materialise_unique(dn, in);
if (IS_ERR(realdn)) {
- pr_err("splice_dentry error %p inode %p ino %llx.%llx\n",
- dn, in, ceph_vinop(in));
+ pr_err("splice_dentry error %ld %p inode %p ino %llx.%llx\n",
+ PTR_ERR(realdn), dn, in, ceph_vinop(in));
if (prehash)
*prehash = false; /* don't rehash on error */
dn = realdn; /* note realdn contains the error */
@@ -1199,8 +1200,10 @@ retry_lookup:
goto out;
}
err = ceph_init_dentry(dn);
- if (err < 0)
+ if (err < 0) {
+ dput(dn);
goto out;
+ }
} else if (dn->d_inode &&
(ceph_ino(dn->d_inode) != vino.ino ||
ceph_snap(dn->d_inode) != vino.snap)) {
@@ -1234,18 +1237,23 @@ retry_lookup:
goto out;
}
dn = splice_dentry(dn, in, NULL);
+ if (IS_ERR(dn))
+ dn = NULL;
}
if (fill_inode(in, &rinfo->dir_in[i], NULL, session,
req->r_request_started, -1,
&req->r_caps_reservation) < 0) {
pr_err("fill_inode badness on %p\n", in);
- dput(dn);
- continue;
+ goto next_item;
}
- update_dentry_lease(dn, rinfo->dir_dlease[i],
- req->r_session, req->r_request_started);
- dput(dn);
+ if (dn)
+ update_dentry_lease(dn, rinfo->dir_dlease[i],
+ req->r_session,
+ req->r_request_started);
+next_item:
+ if (dn)
+ dput(dn);
}
req->r_did_prepopulate = true;
@@ -1494,7 +1502,7 @@ retry:
if (wrbuffer_refs == 0)
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
if (wake)
- wake_up(&ci->i_cap_wq);
+ wake_up_all(&ci->i_cap_wq);
}
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index d085f07756b..76e307d2aba 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -143,6 +143,27 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
return 0;
}
+static long ceph_ioctl_lazyio(struct file *file)
+{
+ struct ceph_file_info *fi = file->private_data;
+ struct inode *inode = file->f_dentry->d_inode;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+
+ if ((fi->fmode & CEPH_FILE_MODE_LAZY) == 0) {
+ spin_lock(&inode->i_lock);
+ ci->i_nr_by_mode[fi->fmode]--;
+ fi->fmode |= CEPH_FILE_MODE_LAZY;
+ ci->i_nr_by_mode[fi->fmode]++;
+ spin_unlock(&inode->i_lock);
+ dout("ioctl_layzio: file %p marked lazy\n", file);
+
+ ceph_check_caps(ci, 0, NULL);
+ } else {
+ dout("ioctl_layzio: file %p already lazy\n", file);
+ }
+ return 0;
+}
+
long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
{
dout("ioctl file %p cmd %u arg %lu\n", file, cmd, arg);
@@ -155,6 +176,9 @@ long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
case CEPH_IOC_GET_DATALOC:
return ceph_ioctl_get_dataloc(file, (void __user *)arg);
+
+ case CEPH_IOC_LAZYIO:
+ return ceph_ioctl_lazyio(file);
}
return -ENOTTY;
}
diff --git a/fs/ceph/ioctl.h b/fs/ceph/ioctl.h
index 25e4f1a9d05..88451a3b685 100644
--- a/fs/ceph/ioctl.h
+++ b/fs/ceph/ioctl.h
@@ -37,4 +37,6 @@ struct ceph_ioctl_dataloc {
#define CEPH_IOC_GET_DATALOC _IOWR(CEPH_IOCTL_MAGIC, 3, \
struct ceph_ioctl_dataloc)
+#define CEPH_IOC_LAZYIO _IO(CEPH_IOCTL_MAGIC, 4)
+
#endif
diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
new file mode 100644
index 00000000000..ae85af06454
--- /dev/null
+++ b/fs/ceph/locks.c
@@ -0,0 +1,256 @@
+#include "ceph_debug.h"
+
+#include <linux/file.h>
+#include <linux/namei.h>
+
+#include "super.h"
+#include "mds_client.h"
+#include "pagelist.h"
+
+/**
+ * Implement fcntl and flock locking functions.
+ */
+static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
+ u64 pid, u64 pid_ns,
+ int cmd, u64 start, u64 length, u8 wait)
+{
+ struct inode *inode = file->f_dentry->d_inode;
+ struct ceph_mds_client *mdsc =
+ &ceph_sb_to_client(inode->i_sb)->mdsc;
+ struct ceph_mds_request *req;
+ int err;
+
+ req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS);
+ if (IS_ERR(req))
+ return PTR_ERR(req);
+ req->r_inode = igrab(inode);
+
+ dout("ceph_lock_message: rule: %d, op: %d, pid: %llu, start: %llu, "
+ "length: %llu, wait: %d, type`: %d", (int)lock_type,
+ (int)operation, pid, start, length, wait, cmd);
+
+ req->r_args.filelock_change.rule = lock_type;
+ req->r_args.filelock_change.type = cmd;
+ req->r_args.filelock_change.pid = cpu_to_le64(pid);
+ /* This should be adjusted, but I'm not sure if
+ namespaces actually get id numbers*/
+ req->r_args.filelock_change.pid_namespace =
+ cpu_to_le64((u64)pid_ns);
+ req->r_args.filelock_change.start = cpu_to_le64(start);
+ req->r_args.filelock_change.length = cpu_to_le64(length);
+ req->r_args.filelock_change.wait = wait;
+
+ err = ceph_mdsc_do_request(mdsc, inode, req);
+ ceph_mdsc_put_request(req);
+ dout("ceph_lock_message: rule: %d, op: %d, pid: %llu, start: %llu, "
+ "length: %llu, wait: %d, type`: %d err code %d", (int)lock_type,
+ (int)operation, pid, start, length, wait, cmd, err);
+ return err;
+}
+
+/**
+ * Attempt to set an fcntl lock.
+ * For now, this just goes away to the server. Later it may be more awesome.
+ */
+int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
+{
+ u64 length;
+ u8 lock_cmd;
+ int err;
+ u8 wait = 0;
+ u16 op = CEPH_MDS_OP_SETFILELOCK;
+
+ fl->fl_nspid = get_pid(task_tgid(current));
+ dout("ceph_lock, fl_pid:%d", fl->fl_pid);
+
+ /* set wait bit as appropriate, then make command as Ceph expects it*/
+ if (F_SETLKW == cmd)
+ wait = 1;
+ if (F_GETLK == cmd)
+ op = CEPH_MDS_OP_GETFILELOCK;
+
+ if (F_RDLCK == fl->fl_type)
+ lock_cmd = CEPH_LOCK_SHARED;
+ else if (F_WRLCK == fl->fl_type)
+ lock_cmd = CEPH_LOCK_EXCL;
+ else
+ lock_cmd = CEPH_LOCK_UNLOCK;
+
+ if (LLONG_MAX == fl->fl_end)
+ length = 0;
+ else
+ length = fl->fl_end - fl->fl_start + 1;
+
+ err = ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
+ (u64)fl->fl_pid, (u64)fl->fl_nspid,
+ lock_cmd, fl->fl_start,
+ length, wait);
+ if (!err) {
+ dout("mds locked, locking locally");
+ err = posix_lock_file(file, fl, NULL);
+ if (err && (CEPH_MDS_OP_SETFILELOCK == op)) {
+ /* undo! This should only happen if the kernel detects
+ * local deadlock. */
+ ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
+ (u64)fl->fl_pid, (u64)fl->fl_nspid,
+ CEPH_LOCK_UNLOCK, fl->fl_start,
+ length, 0);
+ dout("got %d on posix_lock_file, undid lock", err);
+ }
+ } else {
+ dout("mds returned error code %d", err);
+ }
+ return err;
+}
+
+int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
+{
+ u64 length;
+ u8 lock_cmd;
+ int err;
+ u8 wait = 1;
+
+ fl->fl_nspid = get_pid(task_tgid(current));
+ dout("ceph_flock, fl_pid:%d", fl->fl_pid);
+
+ /* set wait bit, then clear it out of cmd*/
+ if (cmd & LOCK_NB)
+ wait = 0;
+ cmd = cmd & (LOCK_SH | LOCK_EX | LOCK_UN);
+ /* set command sequence that Ceph wants to see:
+ shared lock, exclusive lock, or unlock */
+ if (LOCK_SH == cmd)
+ lock_cmd = CEPH_LOCK_SHARED;
+ else if (LOCK_EX == cmd)
+ lock_cmd = CEPH_LOCK_EXCL;
+ else
+ lock_cmd = CEPH_LOCK_UNLOCK;
+ /* mds requires start and length rather than start and end */
+ if (LLONG_MAX == fl->fl_end)
+ length = 0;
+ else
+ length = fl->fl_end - fl->fl_start + 1;
+
+ err = ceph_lock_message(CEPH_LOCK_FLOCK, CEPH_MDS_OP_SETFILELOCK,
+ file, (u64)fl->fl_pid, (u64)fl->fl_nspid,
+ lock_cmd, fl->fl_start,
+ length, wait);
+ if (!err) {
+ err = flock_lock_file_wait(file, fl);
+ if (err) {
+ ceph_lock_message(CEPH_LOCK_FLOCK,
+ CEPH_MDS_OP_SETFILELOCK,
+ file, (u64)fl->fl_pid,
+ (u64)fl->fl_nspid,
+ CEPH_LOCK_UNLOCK, fl->fl_start,
+ length, 0);
+ dout("got %d on flock_lock_file_wait, undid lock", err);
+ }
+ } else {
+ dout("mds error code %d", err);
+ }
+ return err;
+}
+
+/**
+ * Must be called with BKL already held. Fills in the passed
+ * counter variables, so you can prepare pagelist metadata before calling
+ * ceph_encode_locks.
+ */
+void ceph_count_locks(struct inode *inode, int *fcntl_count, int *flock_count)
+{
+ struct file_lock *lock;
+
+ *fcntl_count = 0;
+ *flock_count = 0;
+
+ for (lock = inode->i_flock; lock != NULL; lock = lock->fl_next) {
+ if (lock->fl_flags & FL_POSIX)
+ ++(*fcntl_count);
+ else if (lock->fl_flags & FL_FLOCK)
+ ++(*flock_count);
+ }
+ dout("counted %d flock locks and %d fcntl locks",
+ *flock_count, *fcntl_count);
+}
+
+/**
+ * Encode the flock and fcntl locks for the given inode into the pagelist.
+ * Format is: #fcntl locks, sequential fcntl locks, #flock locks,
+ * sequential flock locks.
+ * Must be called with BLK already held, and the lock numbers should have
+ * been gathered under the same lock holding window.
+ */
+int ceph_encode_locks(struct inode *inode, struct ceph_pagelist *pagelist,
+ int num_fcntl_locks, int num_flock_locks)
+{
+ struct file_lock *lock;
+ struct ceph_filelock cephlock;
+ int err = 0;
+
+ dout("encoding %d flock and %d fcntl locks", num_flock_locks,
+ num_fcntl_locks);
+ err = ceph_pagelist_append(pagelist, &num_fcntl_locks, sizeof(u32));
+ if (err)
+ goto fail;
+ for (lock = inode->i_flock; lock != NULL; lock = lock->fl_next) {
+ if (lock->fl_flags & FL_POSIX) {
+ err = lock_to_ceph_filelock(lock, &cephlock);
+ if (err)
+ goto fail;
+ err = ceph_pagelist_append(pagelist, &cephlock,
+ sizeof(struct ceph_filelock));
+ }
+ if (err)
+ goto fail;
+ }
+
+ err = ceph_pagelist_append(pagelist, &num_flock_locks, sizeof(u32));
+ if (err)
+ goto fail;
+ for (lock = inode->i_flock; lock != NULL; lock = lock->fl_next) {
+ if (lock->fl_flags & FL_FLOCK) {
+ err = lock_to_ceph_filelock(lock, &cephlock);
+ if (err)
+ goto fail;
+ err = ceph_pagelist_append(pagelist, &cephlock,
+ sizeof(struct ceph_filelock));
+ }
+ if (err)
+ goto fail;
+ }
+fail:
+ return err;
+}
+
+/*
+ * Given a pointer to a lock, convert it to a ceph filelock
+ */
+int lock_to_ceph_filelock(struct file_lock *lock,
+ struct ceph_filelock *cephlock)
+{
+ int err = 0;
+
+ cephlock->start = cpu_to_le64(lock->fl_start);
+ cephlock->length = cpu_to_le64(lock->fl_end - lock->fl_start + 1);
+ cephlock->client = cpu_to_le64(0);
+ cephlock->pid = cpu_to_le64(lock->fl_pid);
+ cephlock->pid_namespace = cpu_to_le64((u64)lock->fl_nspid);
+
+ switch (lock->fl_type) {
+ case F_RDLCK:
+ cephlock->type = CEPH_LOCK_SHARED;
+ break;
+ case F_WRLCK:
+ cephlock->type = CEPH_LOCK_EXCL;
+ break;
+ case F_UNLCK:
+ cephlock->type = CEPH_LOCK_UNLOCK;
+ break;
+ default:
+ dout("Have unknown lock type %d", lock->fl_type);
+ err = -EINVAL;
+ }
+
+ return err;
+}
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index b49f12822cb..a75ddbf9fe3 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -3,6 +3,7 @@
#include <linux/wait.h>
#include <linux/slab.h>
#include <linux/sched.h>
+#include <linux/smp_lock.h>
#include "mds_client.h"
#include "mon_client.h"
@@ -37,6 +38,11 @@
* are no longer valid.
*/
+struct ceph_reconnect_state {
+ struct ceph_pagelist *pagelist;
+ bool flock;
+};
+
static void __wake_requests(struct ceph_mds_client *mdsc,
struct list_head *head);
@@ -449,7 +455,7 @@ void ceph_mdsc_release_request(struct kref *kref)
kfree(req->r_path1);
kfree(req->r_path2);
put_request_session(req);
- ceph_unreserve_caps(&req->r_caps_reservation);
+ ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
kfree(req);
}
@@ -512,7 +518,8 @@ static void __register_request(struct ceph_mds_client *mdsc,
{
req->r_tid = ++mdsc->last_tid;
if (req->r_num_caps)
- ceph_reserve_caps(&req->r_caps_reservation, req->r_num_caps);
+ ceph_reserve_caps(mdsc, &req->r_caps_reservation,
+ req->r_num_caps);
dout("__register_request %p tid %lld\n", req, req->r_tid);
ceph_mdsc_get_request(req);
__insert_request(mdsc, req);
@@ -704,6 +711,51 @@ static int __open_session(struct ceph_mds_client *mdsc,
}
/*
+ * open sessions for any export targets for the given mds
+ *
+ * called under mdsc->mutex
+ */
+static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+{
+ struct ceph_mds_info *mi;
+ struct ceph_mds_session *ts;
+ int i, mds = session->s_mds;
+ int target;
+
+ if (mds >= mdsc->mdsmap->m_max_mds)
+ return;
+ mi = &mdsc->mdsmap->m_info[mds];
+ dout("open_export_target_sessions for mds%d (%d targets)\n",
+ session->s_mds, mi->num_export_targets);
+
+ for (i = 0; i < mi->num_export_targets; i++) {
+ target = mi->export_targets[i];
+ ts = __ceph_lookup_mds_session(mdsc, target);
+ if (!ts) {
+ ts = register_session(mdsc, target);
+ if (IS_ERR(ts))
+ return;
+ }
+ if (session->s_state == CEPH_MDS_SESSION_NEW ||
+ session->s_state == CEPH_MDS_SESSION_CLOSING)
+ __open_session(mdsc, session);
+ else
+ dout(" mds%d target mds%d %p is %s\n", session->s_mds,
+ i, ts, session_state_name(ts->s_state));
+ ceph_put_mds_session(ts);
+ }
+}
+
+void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+{
+ mutex_lock(&mdsc->mutex);
+ __open_export_target_sessions(mdsc, session);
+ mutex_unlock(&mdsc->mutex);
+}
+
+/*
* session caps
*/
@@ -764,7 +816,7 @@ static int iterate_session_caps(struct ceph_mds_session *session,
last_inode = NULL;
}
if (old_cap) {
- ceph_put_cap(old_cap);
+ ceph_put_cap(session->s_mdsc, old_cap);
old_cap = NULL;
}
@@ -793,7 +845,7 @@ out:
if (last_inode)
iput(last_inode);
if (old_cap)
- ceph_put_cap(old_cap);
+ ceph_put_cap(session->s_mdsc, old_cap);
return ret;
}
@@ -868,7 +920,7 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
{
struct ceph_inode_info *ci = ceph_inode(inode);
- wake_up(&ci->i_cap_wq);
+ wake_up_all(&ci->i_cap_wq);
if (arg) {
spin_lock(&inode->i_lock);
ci->i_wanted_max_size = 0;
@@ -1066,16 +1118,17 @@ static int trim_caps(struct ceph_mds_client *mdsc,
*
* Called under s_mutex.
*/
-static int add_cap_releases(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session,
- int extra)
+int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
{
- struct ceph_msg *msg;
+ struct ceph_msg *msg, *partial = NULL;
struct ceph_mds_cap_release *head;
int err = -ENOMEM;
+ int extra = mdsc->client->mount_args->cap_release_safety;
+ int num;
- if (extra < 0)
- extra = mdsc->client->mount_args->cap_release_safety;
+ dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
+ extra);
spin_lock(&session->s_cap_lock);
@@ -1084,9 +1137,14 @@ static int add_cap_releases(struct ceph_mds_client *mdsc,
struct ceph_msg,
list_head);
head = msg->front.iov_base;
- extra += CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
+ num = le32_to_cpu(head->num);
+ if (num) {
+ dout(" partial %p with (%d/%d)\n", msg, num,
+ (int)CEPH_CAPS_PER_RELEASE);
+ extra += CEPH_CAPS_PER_RELEASE - num;
+ partial = msg;
+ }
}
-
while (session->s_num_cap_releases < session->s_nr_caps + extra) {
spin_unlock(&session->s_cap_lock);
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
@@ -1103,19 +1161,14 @@ static int add_cap_releases(struct ceph_mds_client *mdsc,
session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
}
- if (!list_empty(&session->s_cap_releases)) {
- msg = list_first_entry(&session->s_cap_releases,
- struct ceph_msg,
- list_head);
- head = msg->front.iov_base;
- if (head->num) {
- dout(" queueing non-full %p (%d)\n", msg,
- le32_to_cpu(head->num));
- list_move_tail(&msg->list_head,
- &session->s_cap_releases_done);
- session->s_num_cap_releases -=
- CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
- }
+ if (partial) {
+ head = partial->front.iov_base;
+ num = le32_to_cpu(head->num);
+ dout(" queueing partial %p with %d/%d\n", partial, num,
+ (int)CEPH_CAPS_PER_RELEASE);
+ list_move_tail(&partial->list_head,
+ &session->s_cap_releases_done);
+ session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
}
err = 0;
spin_unlock(&session->s_cap_lock);
@@ -1176,8 +1229,8 @@ static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
/*
* called under s_mutex
*/
-static void send_cap_releases(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session)
+void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
{
struct ceph_msg *msg;
@@ -1250,6 +1303,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
return ERR_PTR(-ENOMEM);
mutex_init(&req->r_fill_mutex);
+ req->r_mdsc = mdsc;
req->r_started = jiffies;
req->r_resend_mds = -1;
INIT_LIST_HEAD(&req->r_unsafe_dir_item);
@@ -1514,6 +1568,9 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
ceph_encode_filepath(&p, end, ino1, path1);
ceph_encode_filepath(&p, end, ino2, path2);
+ /* make note of release offset, in case we need to replay */
+ req->r_request_release_offset = p - msg->front.iov_base;
+
/* cap releases */
releases = 0;
if (req->r_inode_drop)
@@ -1561,7 +1618,7 @@ static void complete_request(struct ceph_mds_client *mdsc,
if (req->r_callback)
req->r_callback(mdsc, req);
else
- complete(&req->r_completion);
+ complete_all(&req->r_completion);
}
/*
@@ -1577,9 +1634,44 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
req->r_mds = mds;
req->r_attempts++;
+ if (req->r_inode) {
+ struct ceph_cap *cap =
+ ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
+
+ if (cap)
+ req->r_sent_on_mseq = cap->mseq;
+ else
+ req->r_sent_on_mseq = -1;
+ }
dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
+ if (req->r_got_unsafe) {
+ /*
+ * Replay. Do not regenerate message (and rebuild
+ * paths, etc.); just use the original message.
+ * Rebuilding paths will break for renames because
+ * d_move mangles the src name.
+ */
+ msg = req->r_request;
+ rhead = msg->front.iov_base;
+
+ flags = le32_to_cpu(rhead->flags);
+ flags |= CEPH_MDS_FLAG_REPLAY;
+ rhead->flags = cpu_to_le32(flags);
+
+ if (req->r_target_inode)
+ rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
+
+ rhead->num_retry = req->r_attempts - 1;
+
+ /* remove cap/dentry releases from message */
+ rhead->num_releases = 0;
+ msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset);
+ msg->front.iov_len = req->r_request_release_offset;
+ return 0;
+ }
+
if (req->r_request) {
ceph_msg_put(req->r_request);
req->r_request = NULL;
@@ -1601,13 +1693,9 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
rhead->flags = cpu_to_le32(flags);
rhead->num_fwd = req->r_num_fwd;
rhead->num_retry = req->r_attempts - 1;
+ rhead->ino = 0;
dout(" r_locked_dir = %p\n", req->r_locked_dir);
-
- if (req->r_target_inode && req->r_got_unsafe)
- rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
- else
- rhead->ino = 0;
return 0;
}
@@ -1889,25 +1977,44 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
result = le32_to_cpu(head->result);
/*
- * Tolerate 2 consecutive ESTALEs from the same mds.
- * FIXME: we should be looking at the cap migrate_seq.
+ * Handle an ESTALE
+ * if we're not talking to the authority, send to them
+ * if the authority has changed while we weren't looking,
+ * send to new authority
+ * Otherwise we just have to return an ESTALE
*/
if (result == -ESTALE) {
- req->r_direct_mode = USE_AUTH_MDS;
- req->r_num_stale++;
- if (req->r_num_stale <= 2) {
+ dout("got ESTALE on request %llu", req->r_tid);
+ if (!req->r_inode) {
+ /* do nothing; not an authority problem */
+ } else if (req->r_direct_mode != USE_AUTH_MDS) {
+ dout("not using auth, setting for that now");
+ req->r_direct_mode = USE_AUTH_MDS;
__do_request(mdsc, req);
mutex_unlock(&mdsc->mutex);
goto out;
+ } else {
+ struct ceph_inode_info *ci = ceph_inode(req->r_inode);
+ struct ceph_cap *cap =
+ ceph_get_cap_for_mds(ci, req->r_mds);;
+
+ dout("already using auth");
+ if ((!cap || cap != ci->i_auth_cap) ||
+ (cap->mseq != req->r_sent_on_mseq)) {
+ dout("but cap changed, so resending");
+ __do_request(mdsc, req);
+ mutex_unlock(&mdsc->mutex);
+ goto out;
+ }
}
- } else {
- req->r_num_stale = 0;
+ dout("have to return ESTALE on request %llu", req->r_tid);
}
+
if (head->safe) {
req->r_got_safe = true;
__unregister_request(mdsc, req);
- complete(&req->r_safe_completion);
+ complete_all(&req->r_safe_completion);
if (req->r_got_unsafe) {
/*
@@ -1922,7 +2029,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
/* last unsafe request during umount? */
if (mdsc->stopping && !__get_oldest_req(mdsc))
- complete(&mdsc->safe_umount_waiters);
+ complete_all(&mdsc->safe_umount_waiters);
mutex_unlock(&mdsc->mutex);
goto out;
}
@@ -1960,7 +2067,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
if (err == 0) {
if (result == 0 && rinfo->dir_nr)
ceph_readdir_prepopulate(req, req->r_session);
- ceph_unreserve_caps(&req->r_caps_reservation);
+ ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
}
mutex_unlock(&req->r_fill_mutex);
@@ -1980,7 +2087,7 @@ out_err:
}
mutex_unlock(&mdsc->mutex);
- add_cap_releases(mdsc, req->r_session, -1);
+ ceph_add_cap_releases(mdsc, req->r_session);
mutex_unlock(&session->s_mutex);
/* kick calling process */
@@ -2101,7 +2208,7 @@ static void handle_session(struct ceph_mds_session *session,
pr_info("mds%d reconnect denied\n", session->s_mds);
remove_session_caps(session);
wake = 1; /* for good measure */
- complete(&mdsc->session_close_waiters);
+ complete_all(&mdsc->session_close_waiters);
kick_requests(mdsc, mds);
break;
@@ -2168,9 +2275,14 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
void *arg)
{
- struct ceph_mds_cap_reconnect rec;
+ union {
+ struct ceph_mds_cap_reconnect v2;
+ struct ceph_mds_cap_reconnect_v1 v1;
+ } rec;
+ size_t reclen;
struct ceph_inode_info *ci;
- struct ceph_pagelist *pagelist = arg;
+ struct ceph_reconnect_state *recon_state = arg;
+ struct ceph_pagelist *pagelist = recon_state->pagelist;
char *path;
int pathlen, err;
u64 pathbase;
@@ -2203,17 +2315,44 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
spin_lock(&inode->i_lock);
cap->seq = 0; /* reset cap seq */
cap->issue_seq = 0; /* and issue_seq */
- rec.cap_id = cpu_to_le64(cap->cap_id);
- rec.pathbase = cpu_to_le64(pathbase);
- rec.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
- rec.issued = cpu_to_le32(cap->issued);
- rec.size = cpu_to_le64(inode->i_size);
- ceph_encode_timespec(&rec.mtime, &inode->i_mtime);
- ceph_encode_timespec(&rec.atime, &inode->i_atime);
- rec.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
+
+ if (recon_state->flock) {
+ rec.v2.cap_id = cpu_to_le64(cap->cap_id);
+ rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
+ rec.v2.issued = cpu_to_le32(cap->issued);
+ rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
+ rec.v2.pathbase = cpu_to_le64(pathbase);
+ rec.v2.flock_len = 0;
+ reclen = sizeof(rec.v2);
+ } else {
+ rec.v1.cap_id = cpu_to_le64(cap->cap_id);
+ rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
+ rec.v1.issued = cpu_to_le32(cap->issued);
+ rec.v1.size = cpu_to_le64(inode->i_size);
+ ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
+ ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
+ rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
+ rec.v1.pathbase = cpu_to_le64(pathbase);
+ reclen = sizeof(rec.v1);
+ }
spin_unlock(&inode->i_lock);
- err = ceph_pagelist_append(pagelist, &rec, sizeof(rec));
+ if (recon_state->flock) {
+ int num_fcntl_locks, num_flock_locks;
+
+ lock_kernel();
+ ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
+ rec.v2.flock_len = (2*sizeof(u32) +
+ (num_fcntl_locks+num_flock_locks) *
+ sizeof(struct ceph_filelock));
+
+ err = ceph_pagelist_append(pagelist, &rec, reclen);
+ if (!err)
+ err = ceph_encode_locks(inode, pagelist,
+ num_fcntl_locks,
+ num_flock_locks);
+ unlock_kernel();
+ }
out:
kfree(path);
@@ -2242,6 +2381,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
int mds = session->s_mds;
int err = -ENOMEM;
struct ceph_pagelist *pagelist;
+ struct ceph_reconnect_state recon_state;
pr_info("mds%d reconnect start\n", mds);
@@ -2276,7 +2416,10 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
if (err)
goto fail;
- err = iterate_session_caps(session, encode_caps_cb, pagelist);
+
+ recon_state.pagelist = pagelist;
+ recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
+ err = iterate_session_caps(session, encode_caps_cb, &recon_state);
if (err < 0)
goto fail;
@@ -2301,6 +2444,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
}
reply->pagelist = pagelist;
+ if (recon_state.flock)
+ reply->hdr.version = cpu_to_le16(2);
reply->hdr.data_len = cpu_to_le32(pagelist->length);
reply->nr_pages = calc_pages_for(0, pagelist->length);
ceph_con_send(&session->s_con, reply);
@@ -2351,9 +2496,11 @@ static void check_new_map(struct ceph_mds_client *mdsc,
oldstate = ceph_mdsmap_get_state(oldmap, i);
newstate = ceph_mdsmap_get_state(newmap, i);
- dout("check_new_map mds%d state %s -> %s (session %s)\n",
+ dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
i, ceph_mds_state_name(oldstate),
+ ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
ceph_mds_state_name(newstate),
+ ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
session_state_name(s->s_state));
if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
@@ -2403,6 +2550,21 @@ static void check_new_map(struct ceph_mds_client *mdsc,
wake_up_session_caps(s, 1);
}
}
+
+ for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
+ s = mdsc->sessions[i];
+ if (!s)
+ continue;
+ if (!ceph_mdsmap_is_laggy(newmap, i))
+ continue;
+ if (s->s_state == CEPH_MDS_SESSION_OPEN ||
+ s->s_state == CEPH_MDS_SESSION_HUNG ||
+ s->s_state == CEPH_MDS_SESSION_CLOSING) {
+ dout(" connecting to export targets of laggy mds%d\n",
+ i);
+ __open_export_target_sessions(mdsc, s);
+ }
+ }
}
@@ -2433,6 +2595,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
struct ceph_dentry_info *di;
int mds = session->s_mds;
struct ceph_mds_lease *h = msg->front.iov_base;
+ u32 seq;
struct ceph_vino vino;
int mask;
struct qstr dname;
@@ -2446,6 +2609,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
vino.ino = le64_to_cpu(h->ino);
vino.snap = CEPH_NOSNAP;
mask = le16_to_cpu(h->mask);
+ seq = le32_to_cpu(h->seq);
dname.name = (void *)h + sizeof(*h) + sizeof(u32);
dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
if (dname.len != get_unaligned_le32(h+1))
@@ -2456,8 +2620,9 @@ static void handle_lease(struct ceph_mds_client *mdsc,
/* lookup inode */
inode = ceph_find_inode(sb, vino);
- dout("handle_lease '%s', mask %d, ino %llx %p\n",
- ceph_lease_op_name(h->action), mask, vino.ino, inode);
+ dout("handle_lease %s, mask %d, ino %llx %p %.*s\n",
+ ceph_lease_op_name(h->action), mask, vino.ino, inode,
+ dname.len, dname.name);
if (inode == NULL) {
dout("handle_lease no inode %llx\n", vino.ino);
goto release;
@@ -2482,7 +2647,8 @@ static void handle_lease(struct ceph_mds_client *mdsc,
switch (h->action) {
case CEPH_MDS_LEASE_REVOKE:
if (di && di->lease_session == session) {
- h->seq = cpu_to_le32(di->lease_seq);
+ if (ceph_seq_cmp(di->lease_seq, seq) > 0)
+ h->seq = cpu_to_le32(di->lease_seq);
__ceph_mdsc_drop_dentry_lease(dentry);
}
release = 1;
@@ -2496,7 +2662,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
unsigned long duration =
le32_to_cpu(h->duration_ms) * HZ / 1000;
- di->lease_seq = le32_to_cpu(h->seq);
+ di->lease_seq = seq;
dentry->d_time = di->lease_renew_from + duration;
di->lease_renew_after = di->lease_renew_from +
(duration >> 1);
@@ -2686,10 +2852,10 @@ static void delayed_work(struct work_struct *work)
send_renew_caps(mdsc, s);
else
ceph_con_keepalive(&s->s_con);
- add_cap_releases(mdsc, s, -1);
+ ceph_add_cap_releases(mdsc, s);
if (s->s_state == CEPH_MDS_SESSION_OPEN ||
s->s_state == CEPH_MDS_SESSION_HUNG)
- send_cap_releases(mdsc, s);
+ ceph_send_cap_releases(mdsc, s);
mutex_unlock(&s->s_mutex);
ceph_put_mds_session(s);
@@ -2735,6 +2901,9 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
spin_lock_init(&mdsc->dentry_lru_lock);
INIT_LIST_HEAD(&mdsc->dentry_lru);
+ ceph_caps_init(mdsc);
+ ceph_adjust_min_caps(mdsc, client->min_caps);
+
return 0;
}
@@ -2779,6 +2948,12 @@ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
drop_leases(mdsc);
ceph_flush_dirty_caps(mdsc);
wait_requests(mdsc);
+
+ /*
+ * wait for reply handlers to drop their request refs and
+ * their inode/dcache refs
+ */
+ ceph_msgr_flush();
}
/*
@@ -2924,6 +3099,7 @@ void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
if (mdsc->mdsmap)
ceph_mdsmap_destroy(mdsc->mdsmap);
kfree(mdsc->sessions);
+ ceph_caps_finalize(mdsc);
}
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index d9936c4f121..ab7e89f5e34 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -151,6 +151,7 @@ typedef void (*ceph_mds_request_callback_t) (struct ceph_mds_client *mdsc,
struct ceph_mds_request {
u64 r_tid; /* transaction id */
struct rb_node r_node;
+ struct ceph_mds_client *r_mdsc;
int r_op; /* mds op code */
int r_mds;
@@ -188,6 +189,7 @@ struct ceph_mds_request {
int r_old_inode_drop, r_old_inode_unless;
struct ceph_msg *r_request; /* original request */
+ int r_request_release_offset;
struct ceph_msg *r_reply;
struct ceph_mds_reply_info_parsed r_reply_info;
int r_err;
@@ -206,8 +208,8 @@ struct ceph_mds_request {
int r_attempts; /* resend attempts */
int r_num_fwd; /* number of forward attempts */
- int r_num_stale;
int r_resend_mds; /* mds to resend to next, if any*/
+ u32 r_sent_on_mseq; /* cap mseq request was sent at*/
struct kref r_kref;
struct list_head r_wait;
@@ -266,6 +268,27 @@ struct ceph_mds_client {
spinlock_t cap_dirty_lock; /* protects above items */
wait_queue_head_t cap_flushing_wq;
+ /*
+ * Cap reservations
+ *
+ * Maintain a global pool of preallocated struct ceph_caps, referenced
+ * by struct ceph_caps_reservations. This ensures that we preallocate
+ * memory needed to successfully process an MDS response. (If an MDS
+ * sends us cap information and we fail to process it, we will have
+ * problems due to the client and MDS being out of sync.)
+ *
+ * Reservations are 'owned' by a ceph_cap_reservation context.
+ */
+ spinlock_t caps_list_lock;
+ struct list_head caps_list; /* unused (reserved or
+ unreserved) */
+ int caps_total_count; /* total caps allocated */
+ int caps_use_count; /* in use */
+ int caps_reserve_count; /* unused, reserved */
+ int caps_avail_count; /* unused, unreserved */
+ int caps_min_count; /* keep at least this many
+ (unreserved) */
+
#ifdef CONFIG_DEBUG_FS
struct dentry *debugfs_file;
#endif
@@ -322,6 +345,11 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req)
kref_put(&req->r_kref, ceph_mdsc_release_request);
}
+extern int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session);
+extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session);
+
extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
@@ -336,4 +364,7 @@ extern void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc,
struct ceph_msg *msg);
+extern void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session);
+
#endif
diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c
index c4c498e6dfe..040be6d1150 100644
--- a/fs/ceph/mdsmap.c
+++ b/fs/ceph/mdsmap.c
@@ -85,6 +85,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
struct ceph_entity_addr addr;
u32 num_export_targets;
void *pexport_targets = NULL;
+ struct ceph_timespec laggy_since;
ceph_decode_need(p, end, sizeof(u64)*2 + 1 + sizeof(u32), bad);
global_id = ceph_decode_64(p);
@@ -103,7 +104,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
state_seq = ceph_decode_64(p);
ceph_decode_copy(p, &addr, sizeof(addr));
ceph_decode_addr(&addr);
- *p += sizeof(struct ceph_timespec);
+ ceph_decode_copy(p, &laggy_since, sizeof(laggy_since));
*p += sizeof(u32);
ceph_decode_32_safe(p, end, namelen, bad);
*p += namelen;
@@ -122,6 +123,9 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
m->m_info[mds].global_id = global_id;
m->m_info[mds].state = state;
m->m_info[mds].addr = addr;
+ m->m_info[mds].laggy =
+ (laggy_since.tv_sec != 0 ||
+ laggy_since.tv_nsec != 0);
m->m_info[mds].num_export_targets = num_export_targets;
if (num_export_targets) {
m->m_info[mds].export_targets =
diff --git a/fs/ceph/mdsmap.h b/fs/ceph/mdsmap.h
index eacc131aa5c..4c5cb0880bb 100644
--- a/fs/ceph/mdsmap.h
+++ b/fs/ceph/mdsmap.h
@@ -13,6 +13,7 @@ struct ceph_mds_info {
struct ceph_entity_addr addr;
s32 state;
int num_export_targets;
+ bool laggy;
u32 *export_targets;
};
@@ -47,6 +48,13 @@ static inline int ceph_mdsmap_get_state(struct ceph_mdsmap *m, int w)
return m->m_info[w].state;
}
+static inline bool ceph_mdsmap_is_laggy(struct ceph_mdsmap *m, int w)
+{
+ if (w >= 0 && w < m->m_max_mds)
+ return m->m_info[w].laggy;
+ return false;
+}
+
extern int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m);
extern struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end);
extern void ceph_mdsmap_destroy(struct ceph_mdsmap *m);
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 64b8b1f7863..2502d76fcec 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -43,7 +43,8 @@ static void ceph_fault(struct ceph_connection *con);
* nicely render a sockaddr as a string.
*/
#define MAX_ADDR_STR 20
-static char addr_str[MAX_ADDR_STR][40];
+#define MAX_ADDR_STR_LEN 60
+static char addr_str[MAX_ADDR_STR][MAX_ADDR_STR_LEN];
static DEFINE_SPINLOCK(addr_str_lock);
static int last_addr_str;
@@ -52,7 +53,6 @@ const char *pr_addr(const struct sockaddr_storage *ss)
int i;
char *s;
struct sockaddr_in *in4 = (void *)ss;
- unsigned char *quad = (void *)&in4->sin_addr.s_addr;
struct sockaddr_in6 *in6 = (void *)ss;
spin_lock(&addr_str_lock);
@@ -64,25 +64,13 @@ const char *pr_addr(const struct sockaddr_storage *ss)
switch (ss->ss_family) {
case AF_INET:
- sprintf(s, "%u.%u.%u.%u:%u",
- (unsigned int)quad[0],
- (unsigned int)quad[1],
- (unsigned int)quad[2],
- (unsigned int)quad[3],
- (unsigned int)ntohs(in4->sin_port));
+ snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%u", &in4->sin_addr,
+ (unsigned int)ntohs(in4->sin_port));
break;
case AF_INET6:
- sprintf(s, "%04x:%04x:%04x:%04x:%04x:%04x:%04x:%04x:%u",
- in6->sin6_addr.s6_addr16[0],
- in6->sin6_addr.s6_addr16[1],
- in6->sin6_addr.s6_addr16[2],
- in6->sin6_addr.s6_addr16[3],
- in6->sin6_addr.s6_addr16[4],
- in6->sin6_addr.s6_addr16[5],
- in6->sin6_addr.s6_addr16[6],
- in6->sin6_addr.s6_addr16[7],
- (unsigned int)ntohs(in6->sin6_port));
+ snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%u", &in6->sin6_addr,
+ (unsigned int)ntohs(in6->sin6_port));
break;
default:
@@ -120,7 +108,7 @@ void ceph_msgr_exit(void)
destroy_workqueue(ceph_msgr_wq);
}
-void ceph_msgr_flush()
+void ceph_msgr_flush(void)
{
flush_workqueue(ceph_msgr_wq);
}
@@ -215,12 +203,13 @@ static void set_sock_callbacks(struct socket *sock,
*/
static struct socket *ceph_tcp_connect(struct ceph_connection *con)
{
- struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.in_addr;
+ struct sockaddr_storage *paddr = &con->peer_addr.in_addr;
struct socket *sock;
int ret;
BUG_ON(con->sock);
- ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
+ ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM,
+ IPPROTO_TCP, &sock);
if (ret)
return ERR_PTR(ret);
con->sock = sock;
@@ -234,7 +223,8 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con)
dout("connect %s\n", pr_addr(&con->peer_addr.in_addr));
- ret = sock->ops->connect(sock, paddr, sizeof(*paddr), O_NONBLOCK);
+ ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr),
+ O_NONBLOCK);
if (ret == -EINPROGRESS) {
dout("connect %s EINPROGRESS sk_state = %u\n",
pr_addr(&con->peer_addr.in_addr),
@@ -657,7 +647,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
con->connect_seq, global_seq, proto);
- con->out_connect.features = CEPH_FEATURE_SUPPORTED_CLIENT;
+ con->out_connect.features = cpu_to_le64(CEPH_FEATURE_SUPPORTED);
con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
con->out_connect.global_seq = cpu_to_le32(global_seq);
@@ -1009,19 +999,32 @@ int ceph_parse_ips(const char *c, const char *end,
struct sockaddr_in *in4 = (void *)ss;
struct sockaddr_in6 *in6 = (void *)ss;
int port;
+ char delim = ',';
+
+ if (*p == '[') {
+ delim = ']';
+ p++;
+ }
memset(ss, 0, sizeof(*ss));
if (in4_pton(p, end - p, (u8 *)&in4->sin_addr.s_addr,
- ',', &ipend)) {
+ delim, &ipend))
ss->ss_family = AF_INET;
- } else if (in6_pton(p, end - p, (u8 *)&in6->sin6_addr.s6_addr,
- ',', &ipend)) {
+ else if (in6_pton(p, end - p, (u8 *)&in6->sin6_addr.s6_addr,
+ delim, &ipend))
ss->ss_family = AF_INET6;
- } else {
+ else
goto bad;
- }
p = ipend;
+ if (delim == ']') {
+ if (*p != ']') {
+ dout("missing matching ']'\n");
+ goto bad;
+ }
+ p++;
+ }
+
/* port? */
if (p < end && *p == ':') {
port = 0;
@@ -1055,7 +1058,7 @@ int ceph_parse_ips(const char *c, const char *end,
return 0;
bad:
- pr_err("parse_ips bad ip '%s'\n", c);
+ pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c);
return -EINVAL;
}
@@ -1078,11 +1081,11 @@ static int process_banner(struct ceph_connection *con)
sizeof(con->peer_addr)) != 0 &&
!(addr_is_blank(&con->actual_peer_addr.in_addr) &&
con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
- pr_warning("wrong peer, want %s/%lld, got %s/%lld\n",
+ pr_warning("wrong peer, want %s/%d, got %s/%d\n",
pr_addr(&con->peer_addr.in_addr),
- le64_to_cpu(con->peer_addr.nonce),
+ (int)le32_to_cpu(con->peer_addr.nonce),
pr_addr(&con->actual_peer_addr.in_addr),
- le64_to_cpu(con->actual_peer_addr.nonce));
+ (int)le32_to_cpu(con->actual_peer_addr.nonce));
con->error_msg = "wrong peer at address";
return -1;
}
@@ -1120,8 +1123,8 @@ static void fail_protocol(struct ceph_connection *con)
static int process_connect(struct ceph_connection *con)
{
- u64 sup_feat = CEPH_FEATURE_SUPPORTED_CLIENT;
- u64 req_feat = CEPH_FEATURE_REQUIRED_CLIENT;
+ u64 sup_feat = CEPH_FEATURE_SUPPORTED;
+ u64 req_feat = CEPH_FEATURE_REQUIRED;
u64 server_feat = le64_to_cpu(con->in_reply.features);
dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
@@ -1299,8 +1302,8 @@ static void process_ack(struct ceph_connection *con)
static int read_partial_message_section(struct ceph_connection *con,
- struct kvec *section, unsigned int sec_len,
- u32 *crc)
+ struct kvec *section,
+ unsigned int sec_len, u32 *crc)
{
int left;
int ret;
@@ -1396,10 +1399,12 @@ static int read_partial_message(struct ceph_connection *con)
if (!con->in_msg) {
dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
con->in_hdr.front_len, con->in_hdr.data_len);
+ skip = 0;
con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
if (skip) {
/* skip this message */
dout("alloc_msg said skip message\n");
+ BUG_ON(con->in_msg);
con->in_base_pos = -front_len - middle_len - data_len -
sizeof(m->footer);
con->in_tag = CEPH_MSGR_TAG_READY;
@@ -1429,7 +1434,8 @@ static int read_partial_message(struct ceph_connection *con)
/* middle */
if (m->middle) {
- ret = read_partial_message_section(con, &m->middle->vec, middle_len,
+ ret = read_partial_message_section(con, &m->middle->vec,
+ middle_len,
&con->in_middle_crc);
if (ret <= 0)
return ret;
@@ -1915,7 +1921,7 @@ out:
/*
* in case we faulted due to authentication, invalidate our
* current tickets so that we can get new ones.
- */
+ */
if (con->auth_retry && con->ops->invalidate_authorizer) {
dout("calling invalidate_authorizer()\n");
con->ops->invalidate_authorizer(con);
@@ -2013,20 +2019,20 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
{
mutex_lock(&con->mutex);
if (!list_empty(&msg->list_head)) {
- dout("con_revoke %p msg %p\n", con, msg);
+ dout("con_revoke %p msg %p - was on queue\n", con, msg);
list_del_init(&msg->list_head);
ceph_msg_put(msg);
msg->hdr.seq = 0;
- if (con->out_msg == msg) {
- ceph_msg_put(con->out_msg);
- con->out_msg = NULL;
- }
+ }
+ if (con->out_msg == msg) {
+ dout("con_revoke %p msg %p - was sending\n", con, msg);
+ con->out_msg = NULL;
if (con->out_kvec_is_msg) {
con->out_skip = con->out_kvec_bytes;
con->out_kvec_is_msg = false;
}
- } else {
- dout("con_revoke %p msg %p - not queued (sent?)\n", con, msg);
+ ceph_msg_put(msg);
+ msg->hdr.seq = 0;
}
mutex_unlock(&con->mutex);
}
diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c
index 21c62e9b7d1..b2a5a3e4a67 100644
--- a/fs/ceph/mon_client.c
+++ b/fs/ceph/mon_client.c
@@ -345,11 +345,11 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
out:
mutex_unlock(&monc->mutex);
- wake_up(&client->auth_wq);
+ wake_up_all(&client->auth_wq);
}
/*
- * statfs
+ * generic requests (e.g., statfs, poolop)
*/
static struct ceph_mon_generic_request *__lookup_generic_req(
struct ceph_mon_client *monc, u64 tid)
@@ -400,6 +400,8 @@ static void release_generic_request(struct kref *kref)
ceph_msg_put(req->reply);
if (req->request)
ceph_msg_put(req->request);
+
+ kfree(req);
}
static void put_generic_request(struct ceph_mon_generic_request *req)
@@ -440,6 +442,35 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
return m;
}
+static int do_generic_request(struct ceph_mon_client *monc,
+ struct ceph_mon_generic_request *req)
+{
+ int err;
+
+ /* register request */
+ mutex_lock(&monc->mutex);
+ req->tid = ++monc->last_tid;
+ req->request->hdr.tid = cpu_to_le64(req->tid);
+ __insert_generic_request(monc, req);
+ monc->num_generic_requests++;
+ ceph_con_send(monc->con, ceph_msg_get(req->request));
+ mutex_unlock(&monc->mutex);
+
+ err = wait_for_completion_interruptible(&req->completion);
+
+ mutex_lock(&monc->mutex);
+ rb_erase(&req->node, &monc->generic_request_tree);
+ monc->num_generic_requests--;
+ mutex_unlock(&monc->mutex);
+
+ if (!err)
+ err = req->result;
+ return err;
+}
+
+/*
+ * statfs
+ */
static void handle_statfs_reply(struct ceph_mon_client *monc,
struct ceph_msg *msg)
{
@@ -460,13 +491,13 @@ static void handle_statfs_reply(struct ceph_mon_client *monc,
}
mutex_unlock(&monc->mutex);
if (req) {
- complete(&req->completion);
+ complete_all(&req->completion);
put_generic_request(req);
}
return;
bad:
- pr_err("corrupt generic reply, no tid\n");
+ pr_err("corrupt generic reply, tid %llu\n", tid);
ceph_msg_dump(msg);
}
@@ -485,6 +516,7 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
kref_init(&req->kref);
req->buf = buf;
+ req->buf_len = sizeof(*buf);
init_completion(&req->completion);
err = -ENOMEM;
@@ -502,33 +534,134 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
h->monhdr.session_mon_tid = 0;
h->fsid = monc->monmap->fsid;
- /* register request */
- mutex_lock(&monc->mutex);
- req->tid = ++monc->last_tid;
- req->request->hdr.tid = cpu_to_le64(req->tid);
- __insert_generic_request(monc, req);
- monc->num_generic_requests++;
- mutex_unlock(&monc->mutex);
+ err = do_generic_request(monc, req);
- /* send request and wait */
- ceph_con_send(monc->con, ceph_msg_get(req->request));
- err = wait_for_completion_interruptible(&req->completion);
+out:
+ kref_put(&req->kref, release_generic_request);
+ return err;
+}
+
+/*
+ * pool ops
+ */
+static int get_poolop_reply_buf(const char *src, size_t src_len,
+ char *dst, size_t dst_len)
+{
+ u32 buf_len;
+
+ if (src_len != sizeof(u32) + dst_len)
+ return -EINVAL;
+
+ buf_len = le32_to_cpu(*(u32 *)src);
+ if (buf_len != dst_len)
+ return -EINVAL;
+
+ memcpy(dst, src + sizeof(u32), dst_len);
+ return 0;
+}
+
+static void handle_poolop_reply(struct ceph_mon_client *monc,
+ struct ceph_msg *msg)
+{
+ struct ceph_mon_generic_request *req;
+ struct ceph_mon_poolop_reply *reply = msg->front.iov_base;
+ u64 tid = le64_to_cpu(msg->hdr.tid);
+
+ if (msg->front.iov_len < sizeof(*reply))
+ goto bad;
+ dout("handle_poolop_reply %p tid %llu\n", msg, tid);
mutex_lock(&monc->mutex);
- rb_erase(&req->node, &monc->generic_request_tree);
- monc->num_generic_requests--;
+ req = __lookup_generic_req(monc, tid);
+ if (req) {
+ if (req->buf_len &&
+ get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply),
+ msg->front.iov_len - sizeof(*reply),
+ req->buf, req->buf_len) < 0) {
+ mutex_unlock(&monc->mutex);
+ goto bad;
+ }
+ req->result = le32_to_cpu(reply->reply_code);
+ get_generic_request(req);
+ }
mutex_unlock(&monc->mutex);
+ if (req) {
+ complete(&req->completion);
+ put_generic_request(req);
+ }
+ return;
- if (!err)
- err = req->result;
+bad:
+ pr_err("corrupt generic reply, tid %llu\n", tid);
+ ceph_msg_dump(msg);
+}
+
+/*
+ * Do a synchronous pool op.
+ */
+int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
+ u32 pool, u64 snapid,
+ char *buf, int len)
+{
+ struct ceph_mon_generic_request *req;
+ struct ceph_mon_poolop *h;
+ int err;
+
+ req = kzalloc(sizeof(*req), GFP_NOFS);
+ if (!req)
+ return -ENOMEM;
+
+ kref_init(&req->kref);
+ req->buf = buf;
+ req->buf_len = len;
+ init_completion(&req->completion);
+
+ err = -ENOMEM;
+ req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS);
+ if (!req->request)
+ goto out;
+ req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS);
+ if (!req->reply)
+ goto out;
+
+ /* fill out request */
+ req->request->hdr.version = cpu_to_le16(2);
+ h = req->request->front.iov_base;
+ h->monhdr.have_version = 0;
+ h->monhdr.session_mon = cpu_to_le16(-1);
+ h->monhdr.session_mon_tid = 0;
+ h->fsid = monc->monmap->fsid;
+ h->pool = cpu_to_le32(pool);
+ h->op = cpu_to_le32(op);
+ h->auid = 0;
+ h->snapid = cpu_to_le64(snapid);
+ h->name_len = 0;
+
+ err = do_generic_request(monc, req);
out:
kref_put(&req->kref, release_generic_request);
return err;
}
+int ceph_monc_create_snapid(struct ceph_mon_client *monc,
+ u32 pool, u64 *snapid)
+{
+ return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
+ pool, 0, (char *)snapid, sizeof(*snapid));
+
+}
+
+int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
+ u32 pool, u64 snapid)
+{
+ return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
+ pool, snapid, 0, 0);
+
+}
+
/*
- * Resend pending statfs requests.
+ * Resend pending generic requests.
*/
static void __resend_generic_request(struct ceph_mon_client *monc)
{
@@ -716,14 +849,15 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
monc->m_auth->front_max);
if (ret < 0) {
monc->client->auth_err = ret;
- wake_up(&monc->client->auth_wq);
+ wake_up_all(&monc->client->auth_wq);
} else if (ret > 0) {
__send_prepared_auth_request(monc, ret);
} else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) {
dout("authenticated, starting session\n");
monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
- monc->client->msgr->inst.name.num = monc->auth->global_id;
+ monc->client->msgr->inst.name.num =
+ cpu_to_le64(monc->auth->global_id);
__send_subscribe(monc);
__resend_generic_request(monc);
@@ -780,6 +914,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
handle_statfs_reply(monc, msg);
break;
+ case CEPH_MSG_POOLOP_REPLY:
+ handle_poolop_reply(monc, msg);
+ break;
+
case CEPH_MSG_MON_MAP:
ceph_monc_handle_map(monc, msg);
break;
@@ -817,6 +955,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
case CEPH_MSG_MON_SUBSCRIBE_ACK:
m = ceph_msg_get(monc->m_subscribe_ack);
break;
+ case CEPH_MSG_POOLOP_REPLY:
case CEPH_MSG_STATFS_REPLY:
return get_generic_reply(con, hdr, skip);
case CEPH_MSG_AUTH_REPLY:
diff --git a/fs/ceph/mon_client.h b/fs/ceph/mon_client.h
index 174d794321d..8e396f2c096 100644
--- a/fs/ceph/mon_client.h
+++ b/fs/ceph/mon_client.h
@@ -50,6 +50,7 @@ struct ceph_mon_generic_request {
struct rb_node node;
int result;
void *buf;
+ int buf_len;
struct completion completion;
struct ceph_msg *request; /* original request */
struct ceph_msg *reply; /* and reply */
@@ -111,6 +112,10 @@ extern int ceph_monc_open_session(struct ceph_mon_client *monc);
extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);
+extern int ceph_monc_create_snapid(struct ceph_mon_client *monc,
+ u32 pool, u64 *snapid);
+extern int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
+ u32 pool, u64 snapid);
#endif
diff --git a/fs/ceph/msgr.h b/fs/ceph/msgr.h
index 892a0298dfd..680d3d648ca 100644
--- a/fs/ceph/msgr.h
+++ b/fs/ceph/msgr.h
@@ -1,5 +1,5 @@
-#ifndef __MSGR_H
-#define __MSGR_H
+#ifndef CEPH_MSGR_H
+#define CEPH_MSGR_H
/*
* Data types for message passing layer used by Ceph.
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index d25b4add85b..bed6391e52c 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -862,12 +862,12 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
if (req->r_callback)
req->r_callback(req, msg);
else
- complete(&req->r_completion);
+ complete_all(&req->r_completion);
if (flags & CEPH_OSD_FLAG_ONDISK) {
if (req->r_safe_callback)
req->r_safe_callback(req, msg);
- complete(&req->r_safe_completion); /* fsync waiter */
+ complete_all(&req->r_safe_completion); /* fsync waiter */
}
done:
@@ -1083,7 +1083,7 @@ done:
if (newmap)
kick_requests(osdc, NULL);
up_read(&osdc->map_sem);
- wake_up(&osdc->client->auth_wq);
+ wake_up_all(&osdc->client->auth_wq);
return;
bad:
@@ -1276,8 +1276,6 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
/* it may be a short read due to an object boundary */
req->r_pages = pages;
- num_pages = calc_pages_for(off, *plen);
- req->r_num_pages = num_pages;
dout("readpages final extent is %llu~%llu (%d pages)\n",
off, *plen, req->r_num_pages);
@@ -1319,7 +1317,6 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
/* it may be a short write due to an object boundary */
req->r_pages = pages;
- req->r_num_pages = calc_pages_for(off, len);
dout("writepages %llu~%llu (%d pages)\n", off, len,
req->r_num_pages);
@@ -1344,7 +1341,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
int type = le16_to_cpu(msg->hdr.type);
if (!osd)
- return;
+ goto out;
osdc = osd->o_osdc;
switch (type) {
@@ -1359,6 +1356,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
pr_err("received unknown message type %d %s\n", type,
ceph_msg_type_name(type));
}
+out:
ceph_msg_put(msg);
}
@@ -1475,8 +1473,8 @@ static void put_osd_con(struct ceph_connection *con)
* authentication
*/
static int get_authorizer(struct ceph_connection *con,
- void **buf, int *len, int *proto,
- void **reply_buf, int *reply_len, int force_new)
+ void **buf, int *len, int *proto,
+ void **reply_buf, int *reply_len, int force_new)
{
struct ceph_osd *o = con->private;
struct ceph_osd_client *osdc = o->o_osdc;
@@ -1496,7 +1494,7 @@ static int get_authorizer(struct ceph_connection *con,
&o->o_authorizer_reply_buf,
&o->o_authorizer_reply_buf_len);
if (ret)
- return ret;
+ return ret;
}
*proto = ac->protocol;
diff --git a/fs/ceph/osdmap.c b/fs/ceph/osdmap.c
index ddc656fb5c0..e31f118f139 100644
--- a/fs/ceph/osdmap.c
+++ b/fs/ceph/osdmap.c
@@ -424,12 +424,30 @@ static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
kfree(pi);
}
-void __decode_pool(void **p, struct ceph_pg_pool_info *pi)
+static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
{
+ unsigned n, m;
+
ceph_decode_copy(p, &pi->v, sizeof(pi->v));
calc_pg_masks(pi);
- *p += le32_to_cpu(pi->v.num_snaps) * sizeof(u64);
+
+ /* num_snaps * snap_info_t */
+ n = le32_to_cpu(pi->v.num_snaps);
+ while (n--) {
+ ceph_decode_need(p, end, sizeof(u64) + 1 + sizeof(u64) +
+ sizeof(struct ceph_timespec), bad);
+ *p += sizeof(u64) + /* key */
+ 1 + sizeof(u64) + /* u8, snapid */
+ sizeof(struct ceph_timespec);
+ m = ceph_decode_32(p); /* snap name */
+ *p += m;
+ }
+
*p += le32_to_cpu(pi->v.num_removed_snap_intervals) * sizeof(u64) * 2;
+ return 0;
+
+bad:
+ return -EINVAL;
}
static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
@@ -568,9 +586,12 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
if (ev > CEPH_PG_POOL_VERSION) {
pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
ev, CEPH_PG_POOL_VERSION);
+ kfree(pi);
goto bad;
}
- __decode_pool(p, pi);
+ err = __decode_pool(p, end, pi);
+ if (err < 0)
+ goto bad;
__insert_pg_pool(&map->pg_pools, pi);
}
@@ -707,6 +728,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
newcrush = crush_decode(*p, min(*p+len, end));
if (IS_ERR(newcrush))
return ERR_CAST(newcrush);
+ *p += len;
}
/* new flags? */
@@ -758,7 +780,9 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
pi->id = pool;
__insert_pg_pool(&map->pg_pools, pi);
}
- __decode_pool(p, pi);
+ err = __decode_pool(p, end, pi);
+ if (err < 0)
+ goto bad;
}
if (version >= 5 && __decode_pool_names(p, end, map) < 0)
goto bad;
@@ -829,12 +853,13 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
/* remove any? */
while (rbp && pgid_cmp(rb_entry(rbp, struct ceph_pg_mapping,
node)->pgid, pgid) <= 0) {
- struct rb_node *cur = rbp;
+ struct ceph_pg_mapping *cur =
+ rb_entry(rbp, struct ceph_pg_mapping, node);
+
rbp = rb_next(rbp);
- dout(" removed pg_temp %llx\n",
- *(u64 *)&rb_entry(cur, struct ceph_pg_mapping,
- node)->pgid);
- rb_erase(cur, &map->pg_temp);
+ dout(" removed pg_temp %llx\n", *(u64 *)&cur->pgid);
+ rb_erase(&cur->node, &map->pg_temp);
+ kfree(cur);
}
if (pglen) {
@@ -850,19 +875,22 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
for (j = 0; j < pglen; j++)
pg->osds[j] = ceph_decode_32(p);
err = __insert_pg_mapping(pg, &map->pg_temp);
- if (err)
+ if (err) {
+ kfree(pg);
goto bad;
+ }
dout(" added pg_temp %llx len %d\n", *(u64 *)&pgid,
pglen);
}
}
while (rbp) {
- struct rb_node *cur = rbp;
+ struct ceph_pg_mapping *cur =
+ rb_entry(rbp, struct ceph_pg_mapping, node);
+
rbp = rb_next(rbp);
- dout(" removed pg_temp %llx\n",
- *(u64 *)&rb_entry(cur, struct ceph_pg_mapping,
- node)->pgid);
- rb_erase(cur, &map->pg_temp);
+ dout(" removed pg_temp %llx\n", *(u64 *)&cur->pgid);
+ rb_erase(&cur->node, &map->pg_temp);
+ kfree(cur);
}
/* ignore the rest */
@@ -1020,8 +1048,9 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
ruleno = crush_find_rule(osdmap->crush, pool->v.crush_ruleset,
pool->v.type, pool->v.size);
if (ruleno < 0) {
- pr_err("no crush rule pool %d type %d size %d\n",
- poolid, pool->v.type, pool->v.size);
+ pr_err("no crush rule pool %d ruleset %d type %d size %d\n",
+ poolid, pool->v.crush_ruleset, pool->v.type,
+ pool->v.size);
return NULL;
}
diff --git a/fs/ceph/rados.h b/fs/ceph/rados.h
index 8fcc023056c..6d5247f2e81 100644
--- a/fs/ceph/rados.h
+++ b/fs/ceph/rados.h
@@ -1,5 +1,5 @@
-#ifndef __RADOS_H
-#define __RADOS_H
+#ifndef CEPH_RADOS_H
+#define CEPH_RADOS_H
/*
* Data types for the Ceph distributed object storage layer RADOS
@@ -203,6 +203,7 @@ enum {
CEPH_OSD_OP_TMAPGET = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 12,
CEPH_OSD_OP_CREATE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 13,
+ CEPH_OSD_OP_ROLLBACK= CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 14,
/** attrs **/
/* read */
@@ -272,6 +273,10 @@ static inline int ceph_osd_op_mode_modify(int op)
return (op & CEPH_OSD_OP_MODE) == CEPH_OSD_OP_MODE_WR;
}
+/*
+ * note that the following tmap stuff is also defined in the ceph librados.h
+ * any modification here needs to be updated there
+ */
#define CEPH_OSD_TMAP_HDR 'h'
#define CEPH_OSD_TMAP_SET 's'
#define CEPH_OSD_TMAP_RM 'r'
@@ -297,6 +302,7 @@ enum {
CEPH_OSD_FLAG_PARALLELEXEC = 512, /* execute op in parallel */
CEPH_OSD_FLAG_PGOP = 1024, /* pg op, no object */
CEPH_OSD_FLAG_EXEC = 2048, /* op may exec */
+ CEPH_OSD_FLAG_EXEC_PUBLIC = 4096, /* op may exec (public) */
};
enum {
@@ -350,6 +356,9 @@ struct ceph_osd_op {
struct {
__le64 cookie, count;
} __attribute__ ((packed)) pgls;
+ struct {
+ __le64 snapid;
+ } __attribute__ ((packed)) snap;
};
__le32 payload_len;
} __attribute__ ((packed));
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 4e0bee240b9..9922628532b 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -2,6 +2,7 @@
#include "ceph_debug.h"
#include <linux/backing-dev.h>
+#include <linux/ctype.h>
#include <linux/fs.h>
#include <linux/inet.h>
#include <linux/in6.h>
@@ -89,7 +90,7 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
buf->f_files = le64_to_cpu(st.num_objects);
buf->f_ffree = -1;
- buf->f_namelen = PATH_MAX;
+ buf->f_namelen = NAME_MAX;
buf->f_frsize = PAGE_CACHE_SIZE;
/* leave fsid little-endian, regardless of host endianness */
@@ -101,12 +102,21 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
}
-static int ceph_syncfs(struct super_block *sb, int wait)
+static int ceph_sync_fs(struct super_block *sb, int wait)
{
- dout("sync_fs %d\n", wait);
+ struct ceph_client *client = ceph_sb_to_client(sb);
+
+ if (!wait) {
+ dout("sync_fs (non-blocking)\n");
+ ceph_flush_dirty_caps(&client->mdsc);
+ dout("sync_fs (non-blocking) done\n");
+ return 0;
+ }
+
+ dout("sync_fs (blocking)\n");
ceph_osdc_sync(&ceph_sb_to_client(sb)->osdc);
ceph_mdsc_sync(&ceph_sb_to_client(sb)->mdsc);
- dout("sync_fs %d done\n", wait);
+ dout("sync_fs (blocking) done\n");
return 0;
}
@@ -150,9 +160,7 @@ static int ceph_show_options(struct seq_file *m, struct vfsmount *mnt)
struct ceph_mount_args *args = client->mount_args;
if (args->flags & CEPH_OPT_FSID)
- seq_printf(m, ",fsidmajor=%llu,fsidminor%llu",
- le64_to_cpu(*(__le64 *)&args->fsid.fsid[0]),
- le64_to_cpu(*(__le64 *)&args->fsid.fsid[8]));
+ seq_printf(m, ",fsid=%pU", &args->fsid);
if (args->flags & CEPH_OPT_NOSHARE)
seq_puts(m, ",noshare");
if (args->flags & CEPH_OPT_DIRSTAT)
@@ -279,7 +287,7 @@ static const struct super_operations ceph_super_ops = {
.alloc_inode = ceph_alloc_inode,
.destroy_inode = ceph_destroy_inode,
.write_inode = ceph_write_inode,
- .sync_fs = ceph_syncfs,
+ .sync_fs = ceph_sync_fs,
.put_super = ceph_put_super,
.show_options = ceph_show_options,
.statfs = ceph_statfs,
@@ -322,9 +330,6 @@ const char *ceph_msg_type_name(int type)
* mount options
*/
enum {
- Opt_fsidmajor,
- Opt_fsidminor,
- Opt_monport,
Opt_wsize,
Opt_rsize,
Opt_osdtimeout,
@@ -339,6 +344,7 @@ enum {
Opt_congestion_kb,
Opt_last_int,
/* int args above */
+ Opt_fsid,
Opt_snapdirname,
Opt_name,
Opt_secret,
@@ -355,9 +361,6 @@ enum {
};
static match_table_t arg_tokens = {
- {Opt_fsidmajor, "fsidmajor=%ld"},
- {Opt_fsidminor, "fsidminor=%ld"},
- {Opt_monport, "monport=%d"},
{Opt_wsize, "wsize=%d"},
{Opt_rsize, "rsize=%d"},
{Opt_osdtimeout, "osdtimeout=%d"},
@@ -371,6 +374,7 @@ static match_table_t arg_tokens = {
{Opt_readdir_max_bytes, "readdir_max_bytes=%d"},
{Opt_congestion_kb, "write_congestion_kb=%d"},
/* int args above */
+ {Opt_fsid, "fsid=%s"},
{Opt_snapdirname, "snapdirname=%s"},
{Opt_name, "name=%s"},
{Opt_secret, "secret=%s"},
@@ -386,6 +390,36 @@ static match_table_t arg_tokens = {
{-1, NULL}
};
+static int parse_fsid(const char *str, struct ceph_fsid *fsid)
+{
+ int i = 0;
+ char tmp[3];
+ int err = -EINVAL;
+ int d;
+
+ dout("parse_fsid '%s'\n", str);
+ tmp[2] = 0;
+ while (*str && i < 16) {
+ if (ispunct(*str)) {
+ str++;
+ continue;
+ }
+ if (!isxdigit(str[0]) || !isxdigit(str[1]))
+ break;
+ tmp[0] = str[0];
+ tmp[1] = str[1];
+ if (sscanf(tmp, "%x", &d) < 1)
+ break;
+ fsid->fsid[i] = d & 0xff;
+ i++;
+ str += 2;
+ }
+
+ if (i == 16)
+ err = 0;
+ dout("parse_fsid ret %d got fsid %pU", err, fsid);
+ return err;
+}
static struct ceph_mount_args *parse_mount_args(int flags, char *options,
const char *dev_name,
@@ -469,12 +503,6 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
dout("got token %d\n", token);
}
switch (token) {
- case Opt_fsidmajor:
- *(__le64 *)&args->fsid.fsid[0] = cpu_to_le64(intval);
- break;
- case Opt_fsidminor:
- *(__le64 *)&args->fsid.fsid[8] = cpu_to_le64(intval);
- break;
case Opt_ip:
err = ceph_parse_ips(argstr[0].from,
argstr[0].to,
@@ -485,6 +513,11 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
args->flags |= CEPH_OPT_MYIP;
break;
+ case Opt_fsid:
+ err = parse_fsid(argstr[0].from, &args->fsid);
+ if (err == 0)
+ args->flags |= CEPH_OPT_FSID;
+ break;
case Opt_snapdirname:
kfree(args->snapdir_name);
args->snapdir_name = kstrndup(argstr[0].from,
@@ -515,6 +548,9 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
case Opt_osdkeepalivetimeout:
args->osd_keepalive_timeout = intval;
break;
+ case Opt_osd_idle_ttl:
+ args->osd_idle_ttl = intval;
+ break;
case Opt_mount_timeout:
args->mount_timeout = intval;
break;
@@ -630,7 +666,6 @@ static struct ceph_client *ceph_create_client(struct ceph_mount_args *args)
/* caps */
client->min_caps = args->max_readdir;
- ceph_adjust_min_caps(client->min_caps);
/* subsystems */
err = ceph_monc_init(&client->monc, client);
@@ -680,8 +715,6 @@ static void ceph_destroy_client(struct ceph_client *client)
ceph_monc_stop(&client->monc);
- ceph_adjust_min_caps(-client->min_caps);
-
ceph_debugfs_client_cleanup(client);
destroy_workqueue(client->wb_wq);
destroy_workqueue(client->pg_inv_wq);
@@ -706,13 +739,13 @@ int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid)
{
if (client->have_fsid) {
if (ceph_fsid_compare(&client->fsid, fsid)) {
- pr_err("bad fsid, had " FSID_FORMAT " got " FSID_FORMAT,
- PR_FSID(&client->fsid), PR_FSID(fsid));
+ pr_err("bad fsid, had %pU got %pU",
+ &client->fsid, fsid);
return -1;
}
} else {
- pr_info("client%lld fsid " FSID_FORMAT "\n",
- client->monc.auth->global_id, PR_FSID(fsid));
+ pr_info("client%lld fsid %pU\n", client->monc.auth->global_id,
+ fsid);
memcpy(&client->fsid, fsid, sizeof(*fsid));
ceph_debugfs_client_init(client);
client->have_fsid = true;
@@ -926,7 +959,7 @@ static int ceph_compare_super(struct super_block *sb, void *data)
/*
* construct our own bdi so we can control readahead, etc.
*/
-static atomic_long_t bdi_seq = ATOMIC_INIT(0);
+static atomic_long_t bdi_seq = ATOMIC_LONG_INIT(0);
static int ceph_register_bdi(struct super_block *sb, struct ceph_client *client)
{
@@ -1043,8 +1076,6 @@ static int __init init_ceph(void)
if (ret)
goto out_msgr;
- ceph_caps_init();
-
ret = register_filesystem(&ceph_fs_type);
if (ret)
goto out_icache;
@@ -1069,7 +1100,6 @@ static void __exit exit_ceph(void)
{
dout("exit_ceph\n");
unregister_filesystem(&ceph_fs_type);
- ceph_caps_finalize();
destroy_caches();
ceph_msgr_exit();
ceph_debugfs_cleanup();
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 10a4a406e88..2482d696f0d 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -31,6 +31,12 @@
#define CEPH_BLOCK (1 << CEPH_BLOCK_SHIFT)
/*
+ * Supported features
+ */
+#define CEPH_FEATURE_SUPPORTED CEPH_FEATURE_NOSRCADDR | CEPH_FEATURE_FLOCK
+#define CEPH_FEATURE_REQUIRED CEPH_FEATURE_NOSRCADDR
+
+/*
* mount options
*/
#define CEPH_OPT_FSID (1<<0)
@@ -560,11 +566,13 @@ static inline int __ceph_caps_wanted(struct ceph_inode_info *ci)
/* what the mds thinks we want */
extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci);
-extern void ceph_caps_init(void);
-extern void ceph_caps_finalize(void);
-extern void ceph_adjust_min_caps(int delta);
-extern int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need);
-extern int ceph_unreserve_caps(struct ceph_cap_reservation *ctx);
+extern void ceph_caps_init(struct ceph_mds_client *mdsc);
+extern void ceph_caps_finalize(struct ceph_mds_client *mdsc);
+extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta);
+extern int ceph_reserve_caps(struct ceph_mds_client *mdsc,
+ struct ceph_cap_reservation *ctx, int need);
+extern int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
+ struct ceph_cap_reservation *ctx);
extern void ceph_reservation_status(struct ceph_client *client,
int *total, int *avail, int *used,
int *reserved, int *min);
@@ -738,13 +746,6 @@ extern struct kmem_cache *ceph_file_cachep;
extern const char *ceph_msg_type_name(int type);
extern int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid);
-#define FSID_FORMAT "%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-" \
- "%02x%02x%02x%02x%02x%02x"
-#define PR_FSID(f) (f)->fsid[0], (f)->fsid[1], (f)->fsid[2], (f)->fsid[3], \
- (f)->fsid[4], (f)->fsid[5], (f)->fsid[6], (f)->fsid[7], \
- (f)->fsid[8], (f)->fsid[9], (f)->fsid[10], (f)->fsid[11], \
- (f)->fsid[12], (f)->fsid[13], (f)->fsid[14], (f)->fsid[15]
-
/* inode.c */
extern const struct inode_operations ceph_file_iops;
@@ -806,13 +807,16 @@ static inline void ceph_remove_cap(struct ceph_cap *cap)
__ceph_remove_cap(cap);
spin_unlock(&inode->i_lock);
}
-extern void ceph_put_cap(struct ceph_cap *cap);
+extern void ceph_put_cap(struct ceph_mds_client *mdsc,
+ struct ceph_cap *cap);
extern void ceph_queue_caps_release(struct inode *inode);
extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);
extern int ceph_fsync(struct file *file, int datasync);
extern void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session);
+extern struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci,
+ int mds);
extern int ceph_get_cap_mds(struct inode *inode);
extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps);
extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
@@ -857,7 +861,7 @@ extern void ceph_release_page_vector(struct page **pages, int num_pages);
/* dir.c */
extern const struct file_operations ceph_dir_fops;
extern const struct inode_operations ceph_dir_iops;
-extern struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops,
+extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops,
ceph_snapdir_dentry_ops;
extern int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry);
@@ -888,6 +892,14 @@ extern void ceph_debugfs_cleanup(void);
extern int ceph_debugfs_client_init(struct ceph_client *client);
extern void ceph_debugfs_client_cleanup(struct ceph_client *client);
+/* locks.c */
+extern int ceph_lock(struct file *file, int cmd, struct file_lock *fl);
+extern int ceph_flock(struct file *file, int cmd, struct file_lock *fl);
+extern void ceph_count_locks(struct inode *inode, int *p_num, int *f_num);
+extern int ceph_encode_locks(struct inode *i, struct ceph_pagelist *p,
+ int p_locks, int f_locks);
+extern int lock_to_ceph_filelock(struct file_lock *fl, struct ceph_filelock *c);
+
static inline struct inode *get_dentry_parent_inode(struct dentry *dentry)
{
if (dentry && dentry->d_parent)
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 68aeebc6968..097a2654c00 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -337,6 +337,8 @@ void __ceph_destroy_xattrs(struct ceph_inode_info *ci)
}
static int __build_xattrs(struct inode *inode)
+ __releases(inode->i_lock)
+ __acquires(inode->i_lock)
{
u32 namelen;
u32 numattr = 0;