From 858ee3784e8105467f1f3017f4ece51cb51d4830 Mon Sep 17 00:00:00 2001 From: Doug Ledford Date: Thu, 31 May 2012 16:26:29 -0700 Subject: ipc/mqueue: switch back to using non-max values on create Commit b231cca4381e ("message queues: increase range limits") changed how we create a queue that does not include an attr struct passed to open so that it creates the queue with whatever the maximum values are. However, if the admin has set the maximums to allow flexibility in creating a queue (aka, both a large size and large queue are allowed, but combined they create a queue too large for the RLIMIT_MSGQUEUE of the user), then attempts to create a queue without an attr struct will fail. Switch back to using acceptable defaults regardless of what the maximums are. Note: so far, we only know of a few applications that rely on this behavior (specifically, set the maximums in /proc, then run the application which calls mq_open() without passing in an attr struct, and the application expects the newly created message queue to have the maximum sizes that were set in /proc used on the mq_open() call, and all of those applications that we know of are actually part of regression test suites that were coded to do something like this: for size in 4096 65536 $((1024 * 1024)) $((16 * 1024 * 1024)); do echo $size > /proc/sys/fs/mqueue/msgsize_max mq_open || echo "Error opening mq with size $size" done These test suites that depend on any behavior like this are broken. The concept that programs should rely upon the system wide maximum in order to get their desired results instead of simply using a attr struct to specify what they want is fundamentally unfriendly programming practice for any multi-tasking OS. Fixing this will break those few apps that we know of (and those app authors recognize the brokenness of their code and the need to fix it). However, the following patch "mqueue: separate mqueue default value" allows a workaround in the form of new knobs for the default msg queue creation parameters for any software out there that we don't already know about that might rely on this behavior at the moment. Signed-off-by: Doug Ledford Cc: Serge E. Hallyn Cc: Amerigo Wang Cc: Joe Korty Cc: Jiri Slaby Acked-by: KOSAKI Motohiro Cc: Manfred Spraul Signed-off-by: Andrew Morton Signed-off-by: Linus Torvalds --- ipc/mqueue.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'ipc/mqueue.c') diff --git a/ipc/mqueue.c b/ipc/mqueue.c index a2757d4ab77..b103022179a 100644 --- a/ipc/mqueue.c +++ b/ipc/mqueue.c @@ -144,8 +144,9 @@ static struct inode *mqueue_get_inode(struct super_block *sb, info->qsize = 0; info->user = NULL; /* set when all is ok */ memset(&info->attr, 0, sizeof(info->attr)); - info->attr.mq_maxmsg = ipc_ns->mq_msg_max; - info->attr.mq_msgsize = ipc_ns->mq_msgsize_max; + info->attr.mq_maxmsg = min(ipc_ns->mq_msg_max, DFLT_MSG); + info->attr.mq_msgsize = + min(ipc_ns->mq_msgsize_max, DFLT_MSGSIZE); if (attr) { info->attr.mq_maxmsg = attr->mq_maxmsg; info->attr.mq_msgsize = attr->mq_msgsize; -- cgit v1.2.3-70-g09d2 From 02967ea08ede0f8cc7e0526aedffdae65a099b07 Mon Sep 17 00:00:00 2001 From: Doug Ledford Date: Thu, 31 May 2012 16:26:29 -0700 Subject: ipc/mqueue: enforce hard limits In two places we don't enforce the hard limits for CAP_SYS_RESOURCE apps. In preparation for making more reasonable hard limits, start enforcing them even on CAP_SYS_RESOURCE. Signed-off-by: Doug Ledford Cc: Serge E. Hallyn Cc: Amerigo Wang Cc: Joe Korty Cc: Jiri Slaby Acked-by: KOSAKI Motohiro Cc: Manfred Spraul Signed-off-by: Andrew Morton Signed-off-by: Linus Torvalds --- ipc/mqueue.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'ipc/mqueue.c') diff --git a/ipc/mqueue.c b/ipc/mqueue.c index b103022179a..6e10a55a78c 100644 --- a/ipc/mqueue.c +++ b/ipc/mqueue.c @@ -301,8 +301,9 @@ static int mqueue_create(struct inode *dir, struct dentry *dentry, error = -EACCES; goto out_unlock; } - if (ipc_ns->mq_queues_count >= ipc_ns->mq_queues_max && - !capable(CAP_SYS_RESOURCE)) { + if (ipc_ns->mq_queues_count >= HARD_QUEUESMAX || + (ipc_ns->mq_queues_count >= ipc_ns->mq_queues_max && + !capable(CAP_SYS_RESOURCE))) { error = -ENOSPC; goto out_unlock; } @@ -589,7 +590,8 @@ static int mq_attr_ok(struct ipc_namespace *ipc_ns, struct mq_attr *attr) if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) return 0; if (capable(CAP_SYS_RESOURCE)) { - if (attr->mq_maxmsg > HARD_MSGMAX) + if (attr->mq_maxmsg > HARD_MSGMAX || + attr->mq_msgsize > HARD_MSGSIZEMAX) return 0; } else { if (attr->mq_maxmsg > ipc_ns->mq_msg_max || -- cgit v1.2.3-70-g09d2 From 5b5c4d1a1440e94994c73dddbad7be0676cd8b9a Mon Sep 17 00:00:00 2001 From: Doug Ledford Date: Thu, 31 May 2012 16:26:30 -0700 Subject: ipc/mqueue: update maximums for the mqueue subsystem Commit b231cca4381e ("message queues: increase range limits") changed the maximum size of a message in a message queue from INT_MAX to 8192*128. Unfortunately, we had customers that relied on a size much larger than 8192*128 on their production systems. After reviewing POSIX, we found that it is silent on the maximum message size. We did find a couple other areas in which it was not silent. Fix up the mqueue maximums so that the customer's system can continue to work, and document both the POSIX and real world requirements in ipc_namespace.h so that we don't have this issue crop back up. Also, commit 9cf18e1dd74cd0 ("ipc: HARD_MSGMAX should be higher not lower on 64bit") fiddled with HARD_MSGMAX without realizing that the number was intentionally in place to limit the msg queue depth to one that was small enough to kmalloc an array of pointers (hence why we divided 128k by sizeof(long)). If we wish to meet POSIX requirements, we have no choice but to change our allocation to a vmalloc instead (at least for the large queue size case). With that, it's possible to increase our allowed maximum to the POSIX requirements (or more if we choose). [sfr@canb.auug.org.au: using vmalloc requires including vmalloc.h] Signed-off-by: Doug Ledford Cc: Serge E. Hallyn Cc: Amerigo Wang Cc: Joe Korty Cc: Jiri Slaby Acked-by: KOSAKI Motohiro Cc: Manfred Spraul Signed-off-by: Stephen Rothwell Signed-off-by: Andrew Morton Signed-off-by: Linus Torvalds --- include/linux/ipc_namespace.h | 47 ++++++++++++++++++++++++++++++++----------- ipc/mqueue.c | 11 ++++++++-- 2 files changed, 44 insertions(+), 14 deletions(-) (limited to 'ipc/mqueue.c') diff --git a/include/linux/ipc_namespace.h b/include/linux/ipc_namespace.h index bde094ee7b0..6e1dd08194f 100644 --- a/include/linux/ipc_namespace.h +++ b/include/linux/ipc_namespace.h @@ -90,18 +90,41 @@ static inline void shm_destroy_orphaned(struct ipc_namespace *ns) {} #ifdef CONFIG_POSIX_MQUEUE extern int mq_init_ns(struct ipc_namespace *ns); -/* default values */ -#define MIN_QUEUESMAX 1 -#define DFLT_QUEUESMAX 256 /* max number of message queues */ -#define HARD_QUEUESMAX 1024 -#define MIN_MSGMAX 1 -#define DFLT_MSG 10U -#define DFLT_MSGMAX 10 /* max number of messages in each queue */ -#define HARD_MSGMAX (32768*sizeof(void *)/4) -#define MIN_MSGSIZEMAX 128 -#define DFLT_MSGSIZE 8192U -#define DFLT_MSGSIZEMAX 8192 /* max message size */ -#define HARD_MSGSIZEMAX (8192*128) +/* + * POSIX Message Queue default values: + * + * MIN_*: Lowest value an admin can set the maximum unprivileged limit to + * DFLT_*MAX: Default values for the maximum unprivileged limits + * DFLT_{MSG,MSGSIZE}: Default values used when the user doesn't supply + * an attribute to the open call and the queue must be created + * HARD_*: Highest value the maximums can be set to. These are enforced + * on CAP_SYS_RESOURCE apps as well making them inviolate (so make them + * suitably high) + * + * POSIX Requirements: + * Per app minimum openable message queues - 8. This does not map well + * to the fact that we limit the number of queues on a per namespace + * basis instead of a per app basis. So, make the default high enough + * that no given app should have a hard time opening 8 queues. + * Minimum maximum for HARD_MSGMAX - 32767. I bumped this to 65536. + * Minimum maximum for HARD_MSGSIZEMAX - POSIX is silent on this. However, + * we have run into a situation where running applications in the wild + * require this to be at least 5MB, and preferably 10MB, so I set the + * value to 16MB in hopes that this user is the worst of the bunch and + * the new maximum will handle anyone else. I may have to revisit this + * in the future. + */ +#define MIN_QUEUESMAX 1 +#define DFLT_QUEUESMAX 256 +#define HARD_QUEUESMAX 1024 +#define MIN_MSGMAX 1 +#define DFLT_MSG 64U +#define DFLT_MSGMAX 1024 +#define HARD_MSGMAX 65536 +#define MIN_MSGSIZEMAX 128 +#define DFLT_MSGSIZE 8192U +#define DFLT_MSGSIZEMAX (1024*1024) +#define HARD_MSGSIZEMAX (16*1024*1024) #else static inline int mq_init_ns(struct ipc_namespace *ns) { return 0; } #endif diff --git a/ipc/mqueue.c b/ipc/mqueue.c index 6e10a55a78c..f8eba5e46c5 100644 --- a/ipc/mqueue.c +++ b/ipc/mqueue.c @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -152,7 +153,10 @@ static struct inode *mqueue_get_inode(struct super_block *sb, info->attr.mq_msgsize = attr->mq_msgsize; } mq_msg_tblsz = info->attr.mq_maxmsg * sizeof(struct msg_msg *); - info->messages = kmalloc(mq_msg_tblsz, GFP_KERNEL); + if (mq_msg_tblsz > KMALLOC_MAX_SIZE) + info->messages = vmalloc(mq_msg_tblsz); + else + info->messages = kmalloc(mq_msg_tblsz, GFP_KERNEL); if (!info->messages) goto out_inode; @@ -262,7 +266,10 @@ static void mqueue_evict_inode(struct inode *inode) spin_lock(&info->lock); for (i = 0; i < info->attr.mq_curmsgs; i++) free_msg(info->messages[i]); - kfree(info->messages); + if (info->attr.mq_maxmsg * sizeof(struct msg_msg *) > KMALLOC_MAX_SIZE) + vfree(info->messages); + else + kfree(info->messages); spin_unlock(&info->lock); /* Total amount of bytes accounted for the mqueue */ -- cgit v1.2.3-70-g09d2 From fd1f87d24d492fda464bedf10a5dd5174ff9b065 Mon Sep 17 00:00:00 2001 From: KOSAKI Motohiro Date: Thu, 31 May 2012 16:26:31 -0700 Subject: mqueue: don't use kmalloc with KMALLOC_MAX_SIZE KMALLOC_MAX_SIZE is not a good threshold. It is extremely high and problematic. Unfortunately, some silly drivers depend on this and we can't change it. But any new code needn't use such extreme ugly high order allocations. It brings us awful fragmentation issues and system slowdown. Signed-off-by: KOSAKI Motohiro Acked-by: Doug Ledford Acked-by: Joe Korty Cc: Amerigo Wang Cc: Serge E. Hallyn Cc: Jiri Slaby Cc: Joe Korty Cc: Manfred Spraul Signed-off-by: Andrew Morton Signed-off-by: Linus Torvalds --- ipc/mqueue.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'ipc/mqueue.c') diff --git a/ipc/mqueue.c b/ipc/mqueue.c index f8eba5e46c5..6828e2c93ce 100644 --- a/ipc/mqueue.c +++ b/ipc/mqueue.c @@ -153,7 +153,7 @@ static struct inode *mqueue_get_inode(struct super_block *sb, info->attr.mq_msgsize = attr->mq_msgsize; } mq_msg_tblsz = info->attr.mq_maxmsg * sizeof(struct msg_msg *); - if (mq_msg_tblsz > KMALLOC_MAX_SIZE) + if (mq_msg_tblsz > PAGE_SIZE) info->messages = vmalloc(mq_msg_tblsz); else info->messages = kmalloc(mq_msg_tblsz, GFP_KERNEL); @@ -266,7 +266,7 @@ static void mqueue_evict_inode(struct inode *inode) spin_lock(&info->lock); for (i = 0; i < info->attr.mq_curmsgs; i++) free_msg(info->messages[i]); - if (info->attr.mq_maxmsg * sizeof(struct msg_msg *) > KMALLOC_MAX_SIZE) + if (is_vmalloc_addr(info->messages)) vfree(info->messages); else kfree(info->messages); -- cgit v1.2.3-70-g09d2 From cef0184c115e5e4e10498f6548d9526465e72478 Mon Sep 17 00:00:00 2001 From: KOSAKI Motohiro Date: Thu, 31 May 2012 16:26:33 -0700 Subject: mqueue: separate mqueue default value from maximum value Commit b231cca4381e ("message queues: increase range limits") changed mqueue default value when attr parameter is specified NULL from hard coded value to fs.mqueue.{msg,msgsize}_max sysctl value. This made large side effect. When user need to use two mqueue applications 1) using !NULL attr parameter and it require big message size and 2) using NULL attr parameter and only need small size message, app (1) require to raise fs.mqueue.msgsize_max and app (2) consume large memory size even though it doesn't need. Doug Ledford propsed to switch back it to static hard coded value. However it also has a compatibility problem. Some applications might started depend on the default value is tunable. The solution is to separate default value from maximum value. Signed-off-by: KOSAKI Motohiro Signed-off-by: Doug Ledford Acked-by: Doug Ledford Acked-by: Joe Korty Cc: Amerigo Wang Acked-by: Serge E. Hallyn Cc: Jiri Slaby Cc: Manfred Spraul Signed-off-by: Andrew Morton Signed-off-by: Linus Torvalds --- Documentation/sysctl/fs.txt | 7 +++++++ include/linux/ipc_namespace.h | 2 ++ ipc/mq_sysctl.c | 18 ++++++++++++++++++ ipc/mqueue.c | 9 ++++++--- 4 files changed, 33 insertions(+), 3 deletions(-) (limited to 'ipc/mqueue.c') diff --git a/Documentation/sysctl/fs.txt b/Documentation/sysctl/fs.txt index 88fd7f5c8dc..13d6166d7a2 100644 --- a/Documentation/sysctl/fs.txt +++ b/Documentation/sysctl/fs.txt @@ -225,6 +225,13 @@ a queue must be less or equal then msg_max. maximum message size value (it is every message queue's attribute set during its creation). +/proc/sys/fs/mqueue/msg_default is a read/write file for setting/getting the +default number of messages in a queue value if attr parameter of mq_open(2) is +NULL. If it exceed msg_max, the default value is initialized msg_max. + +/proc/sys/fs/mqueue/msgsize_default is a read/write file for setting/getting +the default message size value if attr parameter of mq_open(2) is NULL. If it +exceed msgsize_max, the default value is initialized msgsize_max. 4. /proc/sys/fs/epoll - Configuration options for the epoll interface -------------------------------------------------------- diff --git a/include/linux/ipc_namespace.h b/include/linux/ipc_namespace.h index 2488535a32a..5499c92a915 100644 --- a/include/linux/ipc_namespace.h +++ b/include/linux/ipc_namespace.h @@ -62,6 +62,8 @@ struct ipc_namespace { unsigned int mq_queues_max; /* initialized to DFLT_QUEUESMAX */ unsigned int mq_msg_max; /* initialized to DFLT_MSGMAX */ unsigned int mq_msgsize_max; /* initialized to DFLT_MSGSIZEMAX */ + unsigned int mq_msg_default; + unsigned int mq_msgsize_default; /* user_ns which owns the ipc ns */ struct user_namespace *user_ns; diff --git a/ipc/mq_sysctl.c b/ipc/mq_sysctl.c index e22336a09b4..383d638340b 100644 --- a/ipc/mq_sysctl.c +++ b/ipc/mq_sysctl.c @@ -73,6 +73,24 @@ static ctl_table mq_sysctls[] = { .extra1 = &msg_maxsize_limit_min, .extra2 = &msg_maxsize_limit_max, }, + { + .procname = "msg_default", + .data = &init_ipc_ns.mq_msg_default, + .maxlen = sizeof(int), + .mode = 0644, + .proc_handler = proc_mq_dointvec_minmax, + .extra1 = &msg_max_limit_min, + .extra2 = &msg_max_limit_max, + }, + { + .procname = "msgsize_default", + .data = &init_ipc_ns.mq_msgsize_default, + .maxlen = sizeof(int), + .mode = 0644, + .proc_handler = proc_mq_dointvec_minmax, + .extra1 = &msg_maxsize_limit_min, + .extra2 = &msg_maxsize_limit_max, + }, {} }; diff --git a/ipc/mqueue.c b/ipc/mqueue.c index 6828e2c93ce..609d53f7a91 100644 --- a/ipc/mqueue.c +++ b/ipc/mqueue.c @@ -145,9 +145,10 @@ static struct inode *mqueue_get_inode(struct super_block *sb, info->qsize = 0; info->user = NULL; /* set when all is ok */ memset(&info->attr, 0, sizeof(info->attr)); - info->attr.mq_maxmsg = min(ipc_ns->mq_msg_max, DFLT_MSG); - info->attr.mq_msgsize = - min(ipc_ns->mq_msgsize_max, DFLT_MSGSIZE); + info->attr.mq_maxmsg = min(ipc_ns->mq_msg_max, + ipc_ns->mq_msg_default); + info->attr.mq_msgsize = min(ipc_ns->mq_msgsize_max, + ipc_ns->mq_msgsize_default); if (attr) { info->attr.mq_maxmsg = attr->mq_maxmsg; info->attr.mq_msgsize = attr->mq_msgsize; @@ -1261,6 +1262,8 @@ int mq_init_ns(struct ipc_namespace *ns) ns->mq_queues_max = DFLT_QUEUESMAX; ns->mq_msg_max = DFLT_MSGMAX; ns->mq_msgsize_max = DFLT_MSGSIZEMAX; + ns->mq_msg_default = DFLT_MSG; + ns->mq_msgsize_default = DFLT_MSGSIZE; ns->mq_mnt = kern_mount_data(&mqueue_fs_type, ns); if (IS_ERR(ns->mq_mnt)) { -- cgit v1.2.3-70-g09d2 From d6629859b36d953a4b1369b749f178736911bf10 Mon Sep 17 00:00:00 2001 From: Doug Ledford Date: Thu, 31 May 2012 16:26:35 -0700 Subject: ipc/mqueue: improve performance of send/recv The existing implementation of the POSIX message queue send and recv functions is, well, abysmal. Even worse than abysmal. I submitted a patch to increase the maximum POSIX message queue limit to 65536 due to customer needs, however, upon looking over the send/recv implementation, I realized that my customer needs help with that too even if they don't know it. The basic problem is that, given the fairly typical use case scenario for a large queue of queueing lots of messages all at the same priority (I verified with my customer that this is indeed what their app does), the msg_insert routine is basically a frikkin' bubble sort. I mean, whoa, that's *so* middle school. OK, OK, to not slam the original author too much, I'm sure they didn't envision a queue depth of 50,000+ messages. No one would think that moving elements in an array, one at a time, and dereferencing each pointer in that array to check priority of the message being pointed too, again one at a time, for 50,000+ times would be good. So let's assume that, as is typical, the users have found a way to break our code simply by using it in a way we didn't envision. Fair enough. "So, just how broken is it?", you ask. I wondered the same thing, so I wrote an app to let me know. It's my next patch. It gave me some interesting results. Here's what it tested: Interference with other apps - In continuous mode, the app just sits there and hits a message queue forever, while you go do something productive on another terminal using other CPUs. You then measure how long it takes you to do that something productive. Then you restart the app in fake continuous mode, and it sits in a tight loop on a CPU while you repeat your tests. The whole point of this is to keep one CPU tied up (so it can't be used in your other work) but in one case tied up hitting the mqueue code so we can see the effect of walking that 65,528 element array one pointer at a time on the global CPU cache. If it's bad, then it will slow down your app on the other CPUs just by polluting cache mercilessly. In the fake case, it will be in a tight loop, but not polluting cache. Testing the mqueue subsystem directly - Here we just run a number of tests to see how the mqueue subsystem performs under different conditions. A couple conditions are known to be worst case for the old system, and some routines, so this tests all of them. So, on to the results already: Subsystem/Test Old New Time to compile linux kernel (make -j12 on a 6 core CPU) Running mqueue test user 49m10.744s user 45m26.294s sys 5m51.924s sys 4m59.894s total 55m02.668s total 50m26.188s Running fake test user 45m32.686s user 45m18.552s sys 5m12.465s sys 4m56.468s total 50m45.151s total 50m15.020s % slowdown from mqueue cache thrashing ~8% ~.5% Avg time to send/recv (in nanoseconds per message) when queue empty 305/288 349/318 when queue full (65528 messages) constant priority 526589/823 362/314 increasing priority 403105/916 495/445 decreasing priority 73420/594 482/409 random priority 280147/920 546/436 Time to fill/drain queue (65528 messages, in seconds) constant priority 17.37/.12 .13/.12 increasing priority 4.14/.14 .21/.18 decreasing priority 12.93/.13 .21/.18 random priority 8.88/.16 .22/.17 So, I think the results speak for themselves. It's possible this implementation could be improved by cacheing at least one priority level in the node tree (that would bring the queue empty performance more in line with the old implementation), but this works and is *so* much better than what we had, especially for the common case of a single priority in use, that further refinements can be in follow on patches. [akpm@linux-foundation.org: fix typo in comment, remove stray semicolon] [levinsasha928@gmail.com: use correct gfp flags in msg_insert] Signed-off-by: Doug Ledford Cc: Stephen Rothwell Cc: Manfred Spraul Acked-by: KOSAKI Motohiro Signed-off-by: Sasha Levin Signed-off-by: Andrew Morton Signed-off-by: Linus Torvalds --- ipc/mqueue.c | 173 ++++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 130 insertions(+), 43 deletions(-) (limited to 'ipc/mqueue.c') diff --git a/ipc/mqueue.c b/ipc/mqueue.c index 609d53f7a91..3c72a05dc32 100644 --- a/ipc/mqueue.c +++ b/ipc/mqueue.c @@ -50,6 +50,12 @@ #define STATE_PENDING 1 #define STATE_READY 2 +struct posix_msg_tree_node { + struct rb_node rb_node; + struct list_head msg_list; + int priority; +}; + struct ext_wait_queue { /* queue of sleeping tasks */ struct task_struct *task; struct list_head list; @@ -62,7 +68,7 @@ struct mqueue_inode_info { struct inode vfs_inode; wait_queue_head_t wait_q; - struct msg_msg **messages; + struct rb_root msg_tree; struct mq_attr attr; struct sigevent notify; @@ -110,6 +116,90 @@ static struct ipc_namespace *get_ns_from_inode(struct inode *inode) return ns; } +/* Auxiliary functions to manipulate messages' list */ +static int msg_insert(struct msg_msg *msg, struct mqueue_inode_info *info) +{ + struct rb_node **p, *parent = NULL; + struct posix_msg_tree_node *leaf; + + p = &info->msg_tree.rb_node; + while (*p) { + parent = *p; + leaf = rb_entry(parent, struct posix_msg_tree_node, rb_node); + + if (likely(leaf->priority == msg->m_type)) + goto insert_msg; + else if (msg->m_type < leaf->priority) + p = &(*p)->rb_left; + else + p = &(*p)->rb_right; + } + leaf = kzalloc(sizeof(*leaf), GFP_ATOMIC); + if (!leaf) + return -ENOMEM; + rb_init_node(&leaf->rb_node); + INIT_LIST_HEAD(&leaf->msg_list); + leaf->priority = msg->m_type; + rb_link_node(&leaf->rb_node, parent, p); + rb_insert_color(&leaf->rb_node, &info->msg_tree); + info->qsize += sizeof(struct posix_msg_tree_node); +insert_msg: + info->attr.mq_curmsgs++; + info->qsize += msg->m_ts; + list_add_tail(&msg->m_list, &leaf->msg_list); + return 0; +} + +static inline struct msg_msg *msg_get(struct mqueue_inode_info *info) +{ + struct rb_node **p, *parent = NULL; + struct posix_msg_tree_node *leaf; + struct msg_msg *msg; + +try_again: + p = &info->msg_tree.rb_node; + while (*p) { + parent = *p; + /* + * During insert, low priorities go to the left and high to the + * right. On receive, we want the highest priorities first, so + * walk all the way to the right. + */ + p = &(*p)->rb_right; + } + if (!parent) { + if (info->attr.mq_curmsgs) { + pr_warn_once("Inconsistency in POSIX message queue, " + "no tree element, but supposedly messages " + "should exist!\n"); + info->attr.mq_curmsgs = 0; + } + return NULL; + } + leaf = rb_entry(parent, struct posix_msg_tree_node, rb_node); + if (list_empty(&leaf->msg_list)) { + pr_warn_once("Inconsistency in POSIX message queue, " + "empty leaf node but we haven't implemented " + "lazy leaf delete!\n"); + rb_erase(&leaf->rb_node, &info->msg_tree); + info->qsize -= sizeof(struct posix_msg_tree_node); + kfree(leaf); + goto try_again; + } else { + msg = list_first_entry(&leaf->msg_list, + struct msg_msg, m_list); + list_del(&msg->m_list); + if (list_empty(&leaf->msg_list)) { + rb_erase(&leaf->rb_node, &info->msg_tree); + info->qsize -= sizeof(struct posix_msg_tree_node); + kfree(leaf); + } + } + info->attr.mq_curmsgs--; + info->qsize -= msg->m_ts; + return msg; +} + static struct inode *mqueue_get_inode(struct super_block *sb, struct ipc_namespace *ipc_ns, umode_t mode, struct mq_attr *attr) @@ -130,7 +220,7 @@ static struct inode *mqueue_get_inode(struct super_block *sb, if (S_ISREG(mode)) { struct mqueue_inode_info *info; - unsigned long mq_bytes, mq_msg_tblsz; + unsigned long mq_bytes, mq_treesize; inode->i_fop = &mqueue_file_operations; inode->i_size = FILENT_SIZE; @@ -144,6 +234,7 @@ static struct inode *mqueue_get_inode(struct super_block *sb, info->notify_user_ns = NULL; info->qsize = 0; info->user = NULL; /* set when all is ok */ + info->msg_tree = RB_ROOT; memset(&info->attr, 0, sizeof(info->attr)); info->attr.mq_maxmsg = min(ipc_ns->mq_msg_max, ipc_ns->mq_msg_default); @@ -153,16 +244,25 @@ static struct inode *mqueue_get_inode(struct super_block *sb, info->attr.mq_maxmsg = attr->mq_maxmsg; info->attr.mq_msgsize = attr->mq_msgsize; } - mq_msg_tblsz = info->attr.mq_maxmsg * sizeof(struct msg_msg *); - if (mq_msg_tblsz > PAGE_SIZE) - info->messages = vmalloc(mq_msg_tblsz); - else - info->messages = kmalloc(mq_msg_tblsz, GFP_KERNEL); - if (!info->messages) - goto out_inode; + /* + * We used to allocate a static array of pointers and account + * the size of that array as well as one msg_msg struct per + * possible message into the queue size. That's no longer + * accurate as the queue is now an rbtree and will grow and + * shrink depending on usage patterns. We can, however, still + * account one msg_msg struct per message, but the nodes are + * allocated depending on priority usage, and most programs + * only use one, or a handful, of priorities. However, since + * this is pinned memory, we need to assume worst case, so + * that means the min(mq_maxmsg, max_priorities) * struct + * posix_msg_tree_node. + */ + mq_treesize = info->attr.mq_maxmsg * sizeof(struct msg_msg) + + min_t(unsigned int, info->attr.mq_maxmsg, MQ_PRIO_MAX) * + sizeof(struct posix_msg_tree_node); - mq_bytes = (mq_msg_tblsz + - (info->attr.mq_maxmsg * info->attr.mq_msgsize)); + mq_bytes = mq_treesize + (info->attr.mq_maxmsg * + info->attr.mq_msgsize); spin_lock(&mq_lock); if (u->mq_bytes + mq_bytes < u->mq_bytes || @@ -253,9 +353,9 @@ static void mqueue_evict_inode(struct inode *inode) { struct mqueue_inode_info *info; struct user_struct *user; - unsigned long mq_bytes; - int i; + unsigned long mq_bytes, mq_treesize; struct ipc_namespace *ipc_ns; + struct msg_msg *msg; clear_inode(inode); @@ -265,17 +365,18 @@ static void mqueue_evict_inode(struct inode *inode) ipc_ns = get_ns_from_inode(inode); info = MQUEUE_I(inode); spin_lock(&info->lock); - for (i = 0; i < info->attr.mq_curmsgs; i++) - free_msg(info->messages[i]); - if (is_vmalloc_addr(info->messages)) - vfree(info->messages); - else - kfree(info->messages); + while ((msg = msg_get(info)) != NULL) + free_msg(msg); spin_unlock(&info->lock); /* Total amount of bytes accounted for the mqueue */ - mq_bytes = info->attr.mq_maxmsg * (sizeof(struct msg_msg *) - + info->attr.mq_msgsize); + mq_treesize = info->attr.mq_maxmsg * sizeof(struct msg_msg) + + min_t(unsigned int, info->attr.mq_maxmsg, MQ_PRIO_MAX) * + sizeof(struct posix_msg_tree_node); + + mq_bytes = mq_treesize + (info->attr.mq_maxmsg * + info->attr.mq_msgsize); + user = info->user; if (user) { spin_lock(&mq_lock); @@ -495,26 +596,6 @@ static struct ext_wait_queue *wq_get_first_waiter( return list_entry(ptr, struct ext_wait_queue, list); } -/* Auxiliary functions to manipulate messages' list */ -static void msg_insert(struct msg_msg *ptr, struct mqueue_inode_info *info) -{ - int k; - - k = info->attr.mq_curmsgs - 1; - while (k >= 0 && info->messages[k]->m_type >= ptr->m_type) { - info->messages[k + 1] = info->messages[k]; - k--; - } - info->attr.mq_curmsgs++; - info->qsize += ptr->m_ts; - info->messages[k + 1] = ptr; -} - -static inline struct msg_msg *msg_get(struct mqueue_inode_info *info) -{ - info->qsize -= info->messages[--info->attr.mq_curmsgs]->m_ts; - return info->messages[info->attr.mq_curmsgs]; -} static inline void set_cookie(struct sk_buff *skb, char code) { @@ -848,7 +929,8 @@ static inline void pipelined_receive(struct mqueue_inode_info *info) wake_up_interruptible(&info->wait_q); return; } - msg_insert(sender->msg, info); + if (msg_insert(sender->msg, info)) + return; list_del(&sender->list); sender->state = STATE_PENDING; wake_up_process(sender->task); @@ -936,7 +1018,12 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr, pipelined_send(info, msg_ptr, receiver); } else { /* adds message to the queue */ - msg_insert(msg_ptr, info); + if (msg_insert(msg_ptr, info)) { + free_msg(msg_ptr); + ret = -ENOMEM; + spin_unlock(&info->lock); + goto out_fput; + } __do_notify(info); } inode->i_atime = inode->i_mtime = inode->i_ctime = -- cgit v1.2.3-70-g09d2 From 2c12ea498f349207c28840c0ed9654321aab7720 Mon Sep 17 00:00:00 2001 From: Doug Ledford Date: Thu, 31 May 2012 16:26:36 -0700 Subject: ipc/mqueue: correct mq_attr_ok test While working on the other parts of the mqueue stuff, I noticed that the calculation for overflow in mq_attr_ok didn't actually match reality (this is especially true since my last patch which changed how we account memory slightly). In particular, we used to test for overflow using: msgs * msgsize + msgs * sizeof(struct msg_msg *) That was never really correct because each message we allocate via load_msg() is actually a struct msg_msg followed by the data for the message (and if struct msg_msg + data exceeds PAGE_SIZE we end up allocating struct msg_msgseg structs too, but accounting for them would get really tedious, so let's ignore those...they're only a pointer in size anyway). This patch updates the calculation to be more accurate in regards to maximum possible memory consumption by the mqueue. [akpm@linux-foundation.org: add a local to simplify overflow-checking expression] Signed-off-by: Doug Ledford Cc: Stephen Rothwell Cc: Manfred Spraul Acked-by: KOSAKI Motohiro Signed-off-by: Andrew Morton Signed-off-by: Linus Torvalds --- ipc/mqueue.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'ipc/mqueue.c') diff --git a/ipc/mqueue.c b/ipc/mqueue.c index 3c72a05dc32..076399ce363 100644 --- a/ipc/mqueue.c +++ b/ipc/mqueue.c @@ -676,6 +676,9 @@ static void remove_notification(struct mqueue_inode_info *info) static int mq_attr_ok(struct ipc_namespace *ipc_ns, struct mq_attr *attr) { + int mq_treesize; + unsigned long total_size; + if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) return 0; if (capable(CAP_SYS_RESOURCE)) { @@ -690,9 +693,11 @@ static int mq_attr_ok(struct ipc_namespace *ipc_ns, struct mq_attr *attr) /* check for overflow */ if (attr->mq_msgsize > ULONG_MAX/attr->mq_maxmsg) return 0; - if ((unsigned long)(attr->mq_maxmsg * (attr->mq_msgsize - + sizeof (struct msg_msg *))) < - (unsigned long)(attr->mq_maxmsg * attr->mq_msgsize)) + mq_treesize = attr->mq_maxmsg * sizeof(struct msg_msg) + + min_t(unsigned int, attr->mq_maxmsg, MQ_PRIO_MAX) * + sizeof(struct posix_msg_tree_node); + total_size = attr->mq_maxmsg * attr->mq_msgsize; + if (total_size + mq_treesize < total_size) return 0; return 1; } -- cgit v1.2.3-70-g09d2 From 113289cc086f80f28acd06f160a7c6423cdd4191 Mon Sep 17 00:00:00 2001 From: Doug Ledford Date: Thu, 31 May 2012 16:26:36 -0700 Subject: ipc/mqueue: strengthen checks on mqueue creation We already check the mq attr struct if it's passed in, but now that the admin can set system wide defaults separate from maximums, it's actually possible to set the defaults to something that would overflow. So, if there is no attr struct passed in to the open call, check the default values. While we are at it, simplify mq_attr_ok() by making it return 0 or an error condition, so that way if we add more tests to it later, we have the option of what error should be returned instead of the calling location having to pick a possibly inaccurate error code. [akpm@linux-foundation.org: s/ENOMEM/EOVERFLOW/] Signed-off-by: Doug Ledford Cc: Stephen Rothwell Cc: Manfred Spraul Acked-by: KOSAKI Motohiro Signed-off-by: Andrew Morton Signed-off-by: Linus Torvalds --- ipc/mqueue.c | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) (limited to 'ipc/mqueue.c') diff --git a/ipc/mqueue.c b/ipc/mqueue.c index 076399ce363..af1692556c5 100644 --- a/ipc/mqueue.c +++ b/ipc/mqueue.c @@ -680,26 +680,26 @@ static int mq_attr_ok(struct ipc_namespace *ipc_ns, struct mq_attr *attr) unsigned long total_size; if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) - return 0; + return -EINVAL; if (capable(CAP_SYS_RESOURCE)) { if (attr->mq_maxmsg > HARD_MSGMAX || attr->mq_msgsize > HARD_MSGSIZEMAX) - return 0; + return -EINVAL; } else { if (attr->mq_maxmsg > ipc_ns->mq_msg_max || attr->mq_msgsize > ipc_ns->mq_msgsize_max) - return 0; + return -EINVAL; } /* check for overflow */ if (attr->mq_msgsize > ULONG_MAX/attr->mq_maxmsg) - return 0; + return -EOVERFLOW; mq_treesize = attr->mq_maxmsg * sizeof(struct msg_msg) + min_t(unsigned int, attr->mq_maxmsg, MQ_PRIO_MAX) * sizeof(struct posix_msg_tree_node); total_size = attr->mq_maxmsg * attr->mq_msgsize; if (total_size + mq_treesize < total_size) - return 0; - return 1; + return -EOVERFLOW; + return 0; } /* @@ -714,12 +714,21 @@ static struct file *do_create(struct ipc_namespace *ipc_ns, struct dentry *dir, int ret; if (attr) { - if (!mq_attr_ok(ipc_ns, attr)) { - ret = -EINVAL; + ret = mq_attr_ok(ipc_ns, attr); + if (ret) goto out; - } /* store for use during create */ dentry->d_fsdata = attr; + } else { + struct mq_attr def_attr; + + def_attr.mq_maxmsg = min(ipc_ns->mq_msg_max, + ipc_ns->mq_msg_default); + def_attr.mq_msgsize = min(ipc_ns->mq_msgsize_max, + ipc_ns->mq_msgsize_default); + ret = mq_attr_ok(ipc_ns, &def_attr); + if (ret) + goto out; } mode &= ~current_umask(); -- cgit v1.2.3-70-g09d2 From ce2d52cc1364a22fc1a161781e60ee3cbb499a6d Mon Sep 17 00:00:00 2001 From: Doug Ledford Date: Thu, 31 May 2012 16:26:38 -0700 Subject: ipc/mqueue: add rbtree node caching support When I wrote the first patch that added the rbtree support for message queue insertion, it sped up the case where the queue was very full drastically from the original code. It, however, slowed down the case where the queue was empty (not drastically though). This patch caches the last freed rbtree node struct so we can quickly reuse it when we get a new message. This is the common path for any queue that very frequently goes from 0 to 1 then back to 0 messages in queue. Andrew Morton didn't like that we were doing a GFP_ATOMIC allocation in msg_insert, so this patch attempts to speculatively allocate a new node struct outside of the spin lock when we know we need it, but will still fall back to a GFP_ATOMIC allocation if it has to. Once I added the caching, the necessary various ret = ; spin_unlock gyrations in mq_timedsend were getting pretty ugly, so this also slightly refactors that function to streamline the flow of the code and the function exit. Finally, while working on getting performance back I made sure that all of the node structs were always fully initialized when they were first used, rendering the use of kzalloc unnecessary and a waste of CPU cycles. The net result of all of this is: 1) We will avoid a GFP_ATOMIC allocation when possible, but fall back on it when necessary. 2) We will speculatively allocate a node struct using GFP_KERNEL if our cache is empty (and save the struct to our cache if it's still empty after we have obtained the spin lock). 3) The performance of the common queue empty case has significantly improved and is now much more in line with the older performance for this case. The performance changes are: Old mqueue new mqueue new mqueue + caching queue empty send/recv 305/288ns 349/318ns 310/322ns I don't think we'll ever be able to get the recv performance back, but that's because the old recv performance was a direct result and consequence of the old methods abysmal send performance. The recv path simply must do more so that the send path does not incur such a penalty under higher queue depths. As it turns out, the new caching code also sped up the various queue full cases relative to my last patch. That could be because of the difference between the syscall path in 3.3.4-rc5 and 3.3.4-rc6, or because of the change in code flow in the mq_timedsend routine. Regardless, I'll take it. It wasn't huge, and I *would* say it was within the margin for error, but after many repeated runs what I'm seeing is that the old numbers trend slightly higher (about 10 to 20ns depending on which test is the one running). [akpm@linux-foundation.org: checkpatch fixes] Signed-off-by: Doug Ledford Cc: Frederic Weisbecker Cc: Manfred Spraul Cc: Stephen Rothwell Cc: KOSAKI Motohiro Signed-off-by: Andrew Morton Signed-off-by: Linus Torvalds --- ipc/mqueue.c | 104 ++++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 81 insertions(+), 23 deletions(-) (limited to 'ipc/mqueue.c') diff --git a/ipc/mqueue.c b/ipc/mqueue.c index af1692556c5..8ce57691e7b 100644 --- a/ipc/mqueue.c +++ b/ipc/mqueue.c @@ -69,6 +69,7 @@ struct mqueue_inode_info { wait_queue_head_t wait_q; struct rb_root msg_tree; + struct posix_msg_tree_node *node_cache; struct mq_attr attr; struct sigevent notify; @@ -134,15 +135,20 @@ static int msg_insert(struct msg_msg *msg, struct mqueue_inode_info *info) else p = &(*p)->rb_right; } - leaf = kzalloc(sizeof(*leaf), GFP_ATOMIC); - if (!leaf) - return -ENOMEM; - rb_init_node(&leaf->rb_node); - INIT_LIST_HEAD(&leaf->msg_list); + if (info->node_cache) { + leaf = info->node_cache; + info->node_cache = NULL; + } else { + leaf = kmalloc(sizeof(*leaf), GFP_ATOMIC); + if (!leaf) + return -ENOMEM; + rb_init_node(&leaf->rb_node); + INIT_LIST_HEAD(&leaf->msg_list); + info->qsize += sizeof(*leaf); + } leaf->priority = msg->m_type; rb_link_node(&leaf->rb_node, parent, p); rb_insert_color(&leaf->rb_node, &info->msg_tree); - info->qsize += sizeof(struct posix_msg_tree_node); insert_msg: info->attr.mq_curmsgs++; info->qsize += msg->m_ts; @@ -177,13 +183,17 @@ try_again: return NULL; } leaf = rb_entry(parent, struct posix_msg_tree_node, rb_node); - if (list_empty(&leaf->msg_list)) { + if (unlikely(list_empty(&leaf->msg_list))) { pr_warn_once("Inconsistency in POSIX message queue, " "empty leaf node but we haven't implemented " "lazy leaf delete!\n"); rb_erase(&leaf->rb_node, &info->msg_tree); - info->qsize -= sizeof(struct posix_msg_tree_node); - kfree(leaf); + if (info->node_cache) { + info->qsize -= sizeof(*leaf); + kfree(leaf); + } else { + info->node_cache = leaf; + } goto try_again; } else { msg = list_first_entry(&leaf->msg_list, @@ -191,8 +201,12 @@ try_again: list_del(&msg->m_list); if (list_empty(&leaf->msg_list)) { rb_erase(&leaf->rb_node, &info->msg_tree); - info->qsize -= sizeof(struct posix_msg_tree_node); - kfree(leaf); + if (info->node_cache) { + info->qsize -= sizeof(*leaf); + kfree(leaf); + } else { + info->node_cache = leaf; + } } } info->attr.mq_curmsgs--; @@ -235,6 +249,7 @@ static struct inode *mqueue_get_inode(struct super_block *sb, info->qsize = 0; info->user = NULL; /* set when all is ok */ info->msg_tree = RB_ROOT; + info->node_cache = NULL; memset(&info->attr, 0, sizeof(info->attr)); info->attr.mq_maxmsg = min(ipc_ns->mq_msg_max, ipc_ns->mq_msg_default); @@ -367,6 +382,7 @@ static void mqueue_evict_inode(struct inode *inode) spin_lock(&info->lock); while ((msg = msg_get(info)) != NULL) free_msg(msg); + kfree(info->node_cache); spin_unlock(&info->lock); /* Total amount of bytes accounted for the mqueue */ @@ -964,7 +980,8 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr, struct mqueue_inode_info *info; ktime_t expires, *timeout = NULL; struct timespec ts; - int ret; + struct posix_msg_tree_node *new_leaf = NULL; + int ret = 0; if (u_abs_timeout) { int res = prepare_timeout(u_abs_timeout, &expires, &ts); @@ -1012,39 +1029,60 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr, msg_ptr->m_ts = msg_len; msg_ptr->m_type = msg_prio; + /* + * msg_insert really wants us to have a valid, spare node struct so + * it doesn't have to kmalloc a GFP_ATOMIC allocation, but it will + * fall back to that if necessary. + */ + if (!info->node_cache) + new_leaf = kmalloc(sizeof(*new_leaf), GFP_KERNEL); + spin_lock(&info->lock); + if (!info->node_cache && new_leaf) { + /* Save our speculative allocation into the cache */ + rb_init_node(&new_leaf->rb_node); + INIT_LIST_HEAD(&new_leaf->msg_list); + info->node_cache = new_leaf; + info->qsize += sizeof(*new_leaf); + new_leaf = NULL; + } else { + kfree(new_leaf); + } + if (info->attr.mq_curmsgs == info->attr.mq_maxmsg) { if (filp->f_flags & O_NONBLOCK) { - spin_unlock(&info->lock); ret = -EAGAIN; } else { wait.task = current; wait.msg = (void *) msg_ptr; wait.state = STATE_NONE; ret = wq_sleep(info, SEND, timeout, &wait); + /* + * wq_sleep must be called with info->lock held, and + * returns with the lock released + */ + goto out_free; } - if (ret < 0) - free_msg(msg_ptr); } else { receiver = wq_get_first_waiter(info, RECV); if (receiver) { pipelined_send(info, msg_ptr, receiver); } else { /* adds message to the queue */ - if (msg_insert(msg_ptr, info)) { - free_msg(msg_ptr); - ret = -ENOMEM; - spin_unlock(&info->lock); - goto out_fput; - } + ret = msg_insert(msg_ptr, info); + if (ret) + goto out_unlock; __do_notify(info); } inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME; - spin_unlock(&info->lock); - ret = 0; } +out_unlock: + spin_unlock(&info->lock); +out_free: + if (ret) + free_msg(msg_ptr); out_fput: fput(filp); out: @@ -1063,6 +1101,7 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr, struct ext_wait_queue wait; ktime_t expires, *timeout = NULL; struct timespec ts; + struct posix_msg_tree_node *new_leaf = NULL; if (u_abs_timeout) { int res = prepare_timeout(u_abs_timeout, &expires, &ts); @@ -1098,7 +1137,26 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr, goto out_fput; } + /* + * msg_insert really wants us to have a valid, spare node struct so + * it doesn't have to kmalloc a GFP_ATOMIC allocation, but it will + * fall back to that if necessary. + */ + if (!info->node_cache) + new_leaf = kmalloc(sizeof(*new_leaf), GFP_KERNEL); + spin_lock(&info->lock); + + if (!info->node_cache && new_leaf) { + /* Save our speculative allocation into the cache */ + rb_init_node(&new_leaf->rb_node); + INIT_LIST_HEAD(&new_leaf->msg_list); + info->node_cache = new_leaf; + info->qsize += sizeof(*new_leaf); + } else { + kfree(new_leaf); + } + if (info->attr.mq_curmsgs == 0) { if (filp->f_flags & O_NONBLOCK) { spin_unlock(&info->lock); -- cgit v1.2.3-70-g09d2