diff options
Diffstat (limited to 'net/sunrpc')
-rw-r--r-- | net/sunrpc/clnt.c | 4 | ||||
-rw-r--r-- | net/sunrpc/rpc_pipe.c | 6 | ||||
-rw-r--r-- | net/sunrpc/sunrpc_syms.c | 2 | ||||
-rw-r--r-- | net/sunrpc/svc.c | 457 | ||||
-rw-r--r-- | net/sunrpc/svcauth_unix.c | 5 | ||||
-rw-r--r-- | net/sunrpc/svcsock.c | 375 |
6 files changed, 721 insertions, 128 deletions
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 124ff0ceb55..78696f2dc7d 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -161,10 +161,10 @@ static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, s } /* save the nodename */ - clnt->cl_nodelen = strlen(system_utsname.nodename); + clnt->cl_nodelen = strlen(utsname()->nodename); if (clnt->cl_nodelen > UNX_MAXNODENAME) clnt->cl_nodelen = UNX_MAXNODENAME; - memcpy(clnt->cl_nodename, system_utsname.nodename, clnt->cl_nodelen); + memcpy(clnt->cl_nodename, utsname()->nodename, clnt->cl_nodelen); return clnt; out_no_auth: diff --git a/net/sunrpc/rpc_pipe.c b/net/sunrpc/rpc_pipe.c index 700c6e061a0..9a0b41a97f9 100644 --- a/net/sunrpc/rpc_pipe.c +++ b/net/sunrpc/rpc_pipe.c @@ -494,7 +494,7 @@ rpc_get_inode(struct super_block *sb, int mode) case S_IFDIR: inode->i_fop = &simple_dir_operations; inode->i_op = &simple_dir_inode_operations; - inode->i_nlink++; + inc_nlink(inode); default: break; } @@ -571,7 +571,7 @@ rpc_populate(struct dentry *parent, if (private) rpc_inode_setowner(inode, private); if (S_ISDIR(mode)) - dir->i_nlink++; + inc_nlink(dir); d_add(dentry, inode); } mutex_unlock(&dir->i_mutex); @@ -593,7 +593,7 @@ __rpc_mkdir(struct inode *dir, struct dentry *dentry) goto out_err; inode->i_ino = iunique(dir->i_sb, 100); d_instantiate(dentry, inode); - dir->i_nlink++; + inc_nlink(dir); inode_dir_notify(dir, DN_CREATE); return 0; out_err: diff --git a/net/sunrpc/sunrpc_syms.c b/net/sunrpc/sunrpc_syms.c index 26c0531d7e2..192dff5dabc 100644 --- a/net/sunrpc/sunrpc_syms.c +++ b/net/sunrpc/sunrpc_syms.c @@ -70,6 +70,8 @@ EXPORT_SYMBOL(put_rpccred); /* RPC server stuff */ EXPORT_SYMBOL(svc_create); EXPORT_SYMBOL(svc_create_thread); +EXPORT_SYMBOL(svc_create_pooled); +EXPORT_SYMBOL(svc_set_num_threads); EXPORT_SYMBOL(svc_exit_thread); EXPORT_SYMBOL(svc_destroy); EXPORT_SYMBOL(svc_drop); diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index 44b8d9d4c18..a99e67b164c 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -4,6 +4,10 @@ * High-level RPC service routines * * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de> + * + * Multiple threads pools and NUMAisation + * Copyright (c) 2006 Silicon Graphics, Inc. + * by Greg Banks <gnb@melbourne.sgi.com> */ #include <linux/linkage.h> @@ -12,6 +16,8 @@ #include <linux/net.h> #include <linux/in.h> #include <linux/mm.h> +#include <linux/interrupt.h> +#include <linux/module.h> #include <linux/sunrpc/types.h> #include <linux/sunrpc/xdr.h> @@ -23,14 +29,252 @@ #define RPC_PARANOIA 1 /* + * Mode for mapping cpus to pools. + */ +enum { + SVC_POOL_NONE = -1, /* uninitialised, choose one of the others */ + SVC_POOL_GLOBAL, /* no mapping, just a single global pool + * (legacy & UP mode) */ + SVC_POOL_PERCPU, /* one pool per cpu */ + SVC_POOL_PERNODE /* one pool per numa node */ +}; + +/* + * Structure for mapping cpus to pools and vice versa. + * Setup once during sunrpc initialisation. + */ +static struct svc_pool_map { + int mode; /* Note: int not enum to avoid + * warnings about "enumeration value + * not handled in switch" */ + unsigned int npools; + unsigned int *pool_to; /* maps pool id to cpu or node */ + unsigned int *to_pool; /* maps cpu or node to pool id */ +} svc_pool_map = { + .mode = SVC_POOL_NONE +}; + + +/* + * Detect best pool mapping mode heuristically, + * according to the machine's topology. + */ +static int +svc_pool_map_choose_mode(void) +{ + unsigned int node; + + if (num_online_nodes() > 1) { + /* + * Actually have multiple NUMA nodes, + * so split pools on NUMA node boundaries + */ + return SVC_POOL_PERNODE; + } + + node = any_online_node(node_online_map); + if (nr_cpus_node(node) > 2) { + /* + * Non-trivial SMP, or CONFIG_NUMA on + * non-NUMA hardware, e.g. with a generic + * x86_64 kernel on Xeons. In this case we + * want to divide the pools on cpu boundaries. + */ + return SVC_POOL_PERCPU; + } + + /* default: one global pool */ + return SVC_POOL_GLOBAL; +} + +/* + * Allocate the to_pool[] and pool_to[] arrays. + * Returns 0 on success or an errno. + */ +static int +svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools) +{ + m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL); + if (!m->to_pool) + goto fail; + m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL); + if (!m->pool_to) + goto fail_free; + + return 0; + +fail_free: + kfree(m->to_pool); +fail: + return -ENOMEM; +} + +/* + * Initialise the pool map for SVC_POOL_PERCPU mode. + * Returns number of pools or <0 on error. + */ +static int +svc_pool_map_init_percpu(struct svc_pool_map *m) +{ + unsigned int maxpools = highest_possible_processor_id()+1; + unsigned int pidx = 0; + unsigned int cpu; + int err; + + err = svc_pool_map_alloc_arrays(m, maxpools); + if (err) + return err; + + for_each_online_cpu(cpu) { + BUG_ON(pidx > maxpools); + m->to_pool[cpu] = pidx; + m->pool_to[pidx] = cpu; + pidx++; + } + /* cpus brought online later all get mapped to pool0, sorry */ + + return pidx; +}; + + +/* + * Initialise the pool map for SVC_POOL_PERNODE mode. + * Returns number of pools or <0 on error. + */ +static int +svc_pool_map_init_pernode(struct svc_pool_map *m) +{ + unsigned int maxpools = highest_possible_node_id()+1; + unsigned int pidx = 0; + unsigned int node; + int err; + + err = svc_pool_map_alloc_arrays(m, maxpools); + if (err) + return err; + + for_each_node_with_cpus(node) { + /* some architectures (e.g. SN2) have cpuless nodes */ + BUG_ON(pidx > maxpools); + m->to_pool[node] = pidx; + m->pool_to[pidx] = node; + pidx++; + } + /* nodes brought online later all get mapped to pool0, sorry */ + + return pidx; +} + + +/* + * Build the global map of cpus to pools and vice versa. + */ +static unsigned int +svc_pool_map_init(void) +{ + struct svc_pool_map *m = &svc_pool_map; + int npools = -1; + + if (m->mode != SVC_POOL_NONE) + return m->npools; + + m->mode = svc_pool_map_choose_mode(); + + switch (m->mode) { + case SVC_POOL_PERCPU: + npools = svc_pool_map_init_percpu(m); + break; + case SVC_POOL_PERNODE: + npools = svc_pool_map_init_pernode(m); + break; + } + + if (npools < 0) { + /* default, or memory allocation failure */ + npools = 1; + m->mode = SVC_POOL_GLOBAL; + } + m->npools = npools; + + return m->npools; +} + +/* + * Set the current thread's cpus_allowed mask so that it + * will only run on cpus in the given pool. + * + * Returns 1 and fills in oldmask iff a cpumask was applied. + */ +static inline int +svc_pool_map_set_cpumask(unsigned int pidx, cpumask_t *oldmask) +{ + struct svc_pool_map *m = &svc_pool_map; + unsigned int node; /* or cpu */ + + /* + * The caller checks for sv_nrpools > 1, which + * implies that we've been initialized and the + * map mode is not NONE. + */ + BUG_ON(m->mode == SVC_POOL_NONE); + + switch (m->mode) + { + default: + return 0; + case SVC_POOL_PERCPU: + node = m->pool_to[pidx]; + *oldmask = current->cpus_allowed; + set_cpus_allowed(current, cpumask_of_cpu(node)); + return 1; + case SVC_POOL_PERNODE: + node = m->pool_to[pidx]; + *oldmask = current->cpus_allowed; + set_cpus_allowed(current, node_to_cpumask(node)); + return 1; + } +} + +/* + * Use the mapping mode to choose a pool for a given CPU. + * Used when enqueueing an incoming RPC. Always returns + * a non-NULL pool pointer. + */ +struct svc_pool * +svc_pool_for_cpu(struct svc_serv *serv, int cpu) +{ + struct svc_pool_map *m = &svc_pool_map; + unsigned int pidx = 0; + + /* + * SVC_POOL_NONE happens in a pure client when + * lockd is brought up, so silently treat it the + * same as SVC_POOL_GLOBAL. + */ + + switch (m->mode) { + case SVC_POOL_PERCPU: + pidx = m->to_pool[cpu]; + break; + case SVC_POOL_PERNODE: + pidx = m->to_pool[cpu_to_node(cpu)]; + break; + } + return &serv->sv_pools[pidx % serv->sv_nrpools]; +} + + +/* * Create an RPC service */ -struct svc_serv * -svc_create(struct svc_program *prog, unsigned int bufsize) +static struct svc_serv * +__svc_create(struct svc_program *prog, unsigned int bufsize, int npools, + void (*shutdown)(struct svc_serv *serv)) { struct svc_serv *serv; int vers; unsigned int xdrsize; + unsigned int i; if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL))) return NULL; @@ -39,6 +283,7 @@ svc_create(struct svc_program *prog, unsigned int bufsize) serv->sv_nrthreads = 1; serv->sv_stats = prog->pg_stats; serv->sv_bufsz = bufsize? bufsize : 4096; + serv->sv_shutdown = shutdown; xdrsize = 0; while (prog) { prog->pg_lovers = prog->pg_nvers-1; @@ -53,20 +298,68 @@ svc_create(struct svc_program *prog, unsigned int bufsize) prog = prog->pg_next; } serv->sv_xdrsize = xdrsize; - INIT_LIST_HEAD(&serv->sv_threads); - INIT_LIST_HEAD(&serv->sv_sockets); INIT_LIST_HEAD(&serv->sv_tempsocks); INIT_LIST_HEAD(&serv->sv_permsocks); + init_timer(&serv->sv_temptimer); spin_lock_init(&serv->sv_lock); + serv->sv_nrpools = npools; + serv->sv_pools = + kcalloc(sizeof(struct svc_pool), serv->sv_nrpools, + GFP_KERNEL); + if (!serv->sv_pools) { + kfree(serv); + return NULL; + } + + for (i = 0; i < serv->sv_nrpools; i++) { + struct svc_pool *pool = &serv->sv_pools[i]; + + dprintk("initialising pool %u for %s\n", + i, serv->sv_name); + + pool->sp_id = i; + INIT_LIST_HEAD(&pool->sp_threads); + INIT_LIST_HEAD(&pool->sp_sockets); + INIT_LIST_HEAD(&pool->sp_all_threads); + spin_lock_init(&pool->sp_lock); + } + + /* Remove any stale portmap registrations */ svc_register(serv, 0, 0); return serv; } +struct svc_serv * +svc_create(struct svc_program *prog, unsigned int bufsize, + void (*shutdown)(struct svc_serv *serv)) +{ + return __svc_create(prog, bufsize, /*npools*/1, shutdown); +} + +struct svc_serv * +svc_create_pooled(struct svc_program *prog, unsigned int bufsize, + void (*shutdown)(struct svc_serv *serv), + svc_thread_fn func, int sig, struct module *mod) +{ + struct svc_serv *serv; + unsigned int npools = svc_pool_map_init(); + + serv = __svc_create(prog, bufsize, npools, shutdown); + + if (serv != NULL) { + serv->sv_function = func; + serv->sv_kill_signal = sig; + serv->sv_module = mod; + } + + return serv; +} + /* - * Destroy an RPC service + * Destroy an RPC service. Should be called with the BKL held */ void svc_destroy(struct svc_serv *serv) @@ -85,12 +378,17 @@ svc_destroy(struct svc_serv *serv) } else printk("svc_destroy: no threads for serv=%p!\n", serv); + del_timer_sync(&serv->sv_temptimer); + while (!list_empty(&serv->sv_tempsocks)) { svsk = list_entry(serv->sv_tempsocks.next, struct svc_sock, sk_list); svc_delete_socket(svsk); } + if (serv->sv_shutdown) + serv->sv_shutdown(serv); + while (!list_empty(&serv->sv_permsocks)) { svsk = list_entry(serv->sv_permsocks.next, struct svc_sock, @@ -102,6 +400,7 @@ svc_destroy(struct svc_serv *serv) /* Unregister service with the portmapper */ svc_register(serv, 0, 0); + kfree(serv->sv_pools); kfree(serv); } @@ -150,13 +449,18 @@ svc_release_buffer(struct svc_rqst *rqstp) } /* - * Create a server thread + * Create a thread in the given pool. Caller must hold BKL. + * On a NUMA or SMP machine, with a multi-pool serv, the thread + * will be restricted to run on the cpus belonging to the pool. */ -int -svc_create_thread(svc_thread_fn func, struct svc_serv *serv) +static int +__svc_create_thread(svc_thread_fn func, struct svc_serv *serv, + struct svc_pool *pool) { struct svc_rqst *rqstp; int error = -ENOMEM; + int have_oldmask = 0; + cpumask_t oldmask; rqstp = kzalloc(sizeof(*rqstp), GFP_KERNEL); if (!rqstp) @@ -170,8 +474,21 @@ svc_create_thread(svc_thread_fn func, struct svc_serv *serv) goto out_thread; serv->sv_nrthreads++; + spin_lock_bh(&pool->sp_lock); + pool->sp_nrthreads++; + list_add(&rqstp->rq_all, &pool->sp_all_threads); + spin_unlock_bh(&pool->sp_lock); rqstp->rq_server = serv; + rqstp->rq_pool = pool; + + if (serv->sv_nrpools > 1) + have_oldmask = svc_pool_map_set_cpumask(pool->sp_id, &oldmask); + error = kernel_thread((int (*)(void *)) func, rqstp, 0); + + if (have_oldmask) + set_cpus_allowed(current, oldmask); + if (error < 0) goto out_thread; svc_sock_update_bufs(serv); @@ -185,17 +502,136 @@ out_thread: } /* - * Destroy an RPC server thread + * Create a thread in the default pool. Caller must hold BKL. + */ +int +svc_create_thread(svc_thread_fn func, struct svc_serv *serv) +{ + return __svc_create_thread(func, serv, &serv->sv_pools[0]); +} + +/* + * Choose a pool in which to create a new thread, for svc_set_num_threads + */ +static inline struct svc_pool * +choose_pool(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state) +{ + if (pool != NULL) + return pool; + + return &serv->sv_pools[(*state)++ % serv->sv_nrpools]; +} + +/* + * Choose a thread to kill, for svc_set_num_threads + */ +static inline struct task_struct * +choose_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state) +{ + unsigned int i; + struct task_struct *task = NULL; + + if (pool != NULL) { + spin_lock_bh(&pool->sp_lock); + } else { + /* choose a pool in round-robin fashion */ + for (i = 0; i < serv->sv_nrpools; i++) { + pool = &serv->sv_pools[--(*state) % serv->sv_nrpools]; + spin_lock_bh(&pool->sp_lock); + if (!list_empty(&pool->sp_all_threads)) + goto found_pool; + spin_unlock_bh(&pool->sp_lock); + } + return NULL; + } + +found_pool: + if (!list_empty(&pool->sp_all_threads)) { + struct svc_rqst *rqstp; + + /* + * Remove from the pool->sp_all_threads list + * so we don't try to kill it again. + */ + rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all); + list_del_init(&rqstp->rq_all); + task = rqstp->rq_task; + } + spin_unlock_bh(&pool->sp_lock); + + return task; +} + +/* + * Create or destroy enough new threads to make the number + * of threads the given number. If `pool' is non-NULL, applies + * only to threads in that pool, otherwise round-robins between + * all pools. Must be called with a svc_get() reference and + * the BKL held. + * + * Destroying threads relies on the service threads filling in + * rqstp->rq_task, which only the nfs ones do. Assumes the serv + * has been created using svc_create_pooled(). + * + * Based on code that used to be in nfsd_svc() but tweaked + * to be pool-aware. + */ +int +svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) +{ + struct task_struct *victim; + int error = 0; + unsigned int state = serv->sv_nrthreads-1; + + if (pool == NULL) { + /* The -1 assumes caller has done a svc_get() */ + nrservs -= (serv->sv_nrthreads-1); + } else { + spin_lock_bh(&pool->sp_lock); + nrservs -= pool->sp_nrthreads; + spin_unlock_bh(&pool->sp_lock); + } + + /* create new threads */ + while (nrservs > 0) { + nrservs--; + __module_get(serv->sv_module); + error = __svc_create_thread(serv->sv_function, serv, + choose_pool(serv, pool, &state)); + if (error < 0) { + module_put(serv->sv_module); + break; + } + } + /* destroy old threads */ + while (nrservs < 0 && + (victim = choose_victim(serv, pool, &state)) != NULL) { + send_sig(serv->sv_kill_signal, victim, 1); + nrservs++; + } + + return error; +} + +/* + * Called from a server thread as it's exiting. Caller must hold BKL. */ void svc_exit_thread(struct svc_rqst *rqstp) { struct svc_serv *serv = rqstp->rq_server; + struct svc_pool *pool = rqstp->rq_pool; svc_release_buffer(rqstp); kfree(rqstp->rq_resp); kfree(rqstp->rq_argp); kfree(rqstp->rq_auth_data); + + spin_lock_bh(&pool->sp_lock); + pool->sp_nrthreads--; + list_del(&rqstp->rq_all); + spin_unlock_bh(&pool->sp_lock); + kfree(rqstp); /* Release the server */ @@ -248,13 +684,14 @@ svc_register(struct svc_serv *serv, int proto, unsigned short port) * Process the RPC request. */ int -svc_process(struct svc_serv *serv, struct svc_rqst *rqstp) +svc_process(struct svc_rqst *rqstp) { struct svc_program *progp; struct svc_version *versp = NULL; /* compiler food */ struct svc_procedure *procp = NULL; struct kvec * argv = &rqstp->rq_arg.head[0]; struct kvec * resv = &rqstp->rq_res.head[0]; + struct svc_serv *serv = rqstp->rq_server; kxdrproc_t xdr; __be32 *statp; u32 dir, prog, vers, proc; diff --git a/net/sunrpc/svcauth_unix.c b/net/sunrpc/svcauth_unix.c index 1020d54b01d..40d41a2831d 100644 --- a/net/sunrpc/svcauth_unix.c +++ b/net/sunrpc/svcauth_unix.c @@ -348,12 +348,9 @@ int auth_unix_forget_old(struct auth_domain *dom) struct auth_domain *auth_unix_lookup(struct in_addr addr) { - struct ip_map key, *ipm; + struct ip_map *ipm; struct auth_domain *rv; - strcpy(key.m_class, "nfsd"); - key.m_addr = addr; - ipm = ip_map_lookup("nfsd", addr); if (!ipm) diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c index 5b0fe1b66a2..cba85d19522 100644 --- a/net/sunrpc/svcsock.c +++ b/net/sunrpc/svcsock.c @@ -31,6 +31,7 @@ #include <linux/slab.h> #include <linux/netdevice.h> #include <linux/skbuff.h> +#include <linux/file.h> #include <net/sock.h> #include <net/checksum.h> #include <net/ip.h> @@ -45,13 +46,16 @@ /* SMP locking strategy: * - * svc_serv->sv_lock protects most stuff for that service. + * svc_pool->sp_lock protects most of the fields of that pool. + * svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt. + * when both need to be taken (rare), svc_serv->sv_lock is first. + * BKL protects svc_serv->sv_nrthread. + * svc_sock->sk_defer_lock protects the svc_sock->sk_deferred list + * svc_sock->sk_flags.SK_BUSY prevents a svc_sock being enqueued multiply. * * Some flags can be set to certain values at any time * providing that certain rules are followed: * - * SK_BUSY can be set to 0 at any time. - * svc_sock_enqueue must be called afterwards * SK_CONN, SK_DATA, can be set or cleared at any time. * after a set, svc_sock_enqueue must be called. * after a clear, the socket must be read/accepted @@ -73,23 +77,30 @@ static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk); static int svc_deferred_recv(struct svc_rqst *rqstp); static struct cache_deferred_req *svc_defer(struct cache_req *req); +/* apparently the "standard" is that clients close + * idle connections after 5 minutes, servers after + * 6 minutes + * http://www.connectathon.org/talks96/nfstcp.pdf + */ +static int svc_conn_age_period = 6*60; + /* - * Queue up an idle server thread. Must have serv->sv_lock held. + * Queue up an idle server thread. Must have pool->sp_lock held. * Note: this is really a stack rather than a queue, so that we only - * use as many different threads as we need, and the rest don't polute + * use as many different threads as we need, and the rest don't pollute * the cache. */ static inline void -svc_serv_enqueue(struct svc_serv *serv, struct svc_rqst *rqstp) +svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp) { - list_add(&rqstp->rq_list, &serv->sv_threads); + list_add(&rqstp->rq_list, &pool->sp_threads); } /* - * Dequeue an nfsd thread. Must have serv->sv_lock held. + * Dequeue an nfsd thread. Must have pool->sp_lock held. */ static inline void -svc_serv_dequeue(struct svc_serv *serv, struct svc_rqst *rqstp) +svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp) { list_del(&rqstp->rq_list); } @@ -140,7 +151,9 @@ static void svc_sock_enqueue(struct svc_sock *svsk) { struct svc_serv *serv = svsk->sk_server; + struct svc_pool *pool; struct svc_rqst *rqstp; + int cpu; if (!(svsk->sk_flags & ( (1<<SK_CONN)|(1<<SK_DATA)|(1<<SK_CLOSE)|(1<<SK_DEFERRED)) )) @@ -148,10 +161,14 @@ svc_sock_enqueue(struct svc_sock *svsk) if (test_bit(SK_DEAD, &svsk->sk_flags)) return; - spin_lock_bh(&serv->sv_lock); + cpu = get_cpu(); + pool = svc_pool_for_cpu(svsk->sk_server, cpu); + put_cpu(); - if (!list_empty(&serv->sv_threads) && - !list_empty(&serv->sv_sockets)) + spin_lock_bh(&pool->sp_lock); + + if (!list_empty(&pool->sp_threads) && + !list_empty(&pool->sp_sockets)) printk(KERN_ERR "svc_sock_enqueue: threads and sockets both waiting??\n"); @@ -161,73 +178,79 @@ svc_sock_enqueue(struct svc_sock *svsk) goto out_unlock; } - if (test_bit(SK_BUSY, &svsk->sk_flags)) { - /* Don't enqueue socket while daemon is receiving */ + /* Mark socket as busy. It will remain in this state until the + * server has processed all pending data and put the socket back + * on the idle list. We update SK_BUSY atomically because + * it also guards against trying to enqueue the svc_sock twice. + */ + if (test_and_set_bit(SK_BUSY, &svsk->sk_flags)) { + /* Don't enqueue socket while already enqueued */ dprintk("svc: socket %p busy, not enqueued\n", svsk->sk_sk); goto out_unlock; } + BUG_ON(svsk->sk_pool != NULL); + svsk->sk_pool = pool; set_bit(SOCK_NOSPACE, &svsk->sk_sock->flags); - if (((svsk->sk_reserved + serv->sv_bufsz)*2 + if (((atomic_read(&svsk->sk_reserved) + serv->sv_bufsz)*2 > svc_sock_wspace(svsk)) && !test_bit(SK_CLOSE, &svsk->sk_flags) && !test_bit(SK_CONN, &svsk->sk_flags)) { /* Don't enqueue while not enough space for reply */ dprintk("svc: socket %p no space, %d*2 > %ld, not enqueued\n", - svsk->sk_sk, svsk->sk_reserved+serv->sv_bufsz, + svsk->sk_sk, atomic_read(&svsk->sk_reserved)+serv->sv_bufsz, svc_sock_wspace(svsk)); + svsk->sk_pool = NULL; + clear_bit(SK_BUSY, &svsk->sk_flags); goto out_unlock; } clear_bit(SOCK_NOSPACE, &svsk->sk_sock->flags); - /* Mark socket as busy. It will remain in this state until the - * server has processed all pending data and put the socket back - * on the idle list. - */ - set_bit(SK_BUSY, &svsk->sk_flags); - if (!list_empty(&serv->sv_threads)) { - rqstp = list_entry(serv->sv_threads.next, + if (!list_empty(&pool->sp_threads)) { + rqstp = list_entry(pool->sp_threads.next, struct svc_rqst, rq_list); dprintk("svc: socket %p served by daemon %p\n", svsk->sk_sk, rqstp); - svc_serv_dequeue(serv, rqstp); + svc_thread_dequeue(pool, rqstp); if (rqstp->rq_sock) printk(KERN_ERR "svc_sock_enqueue: server %p, rq_sock=%p!\n", rqstp, rqstp->rq_sock); rqstp->rq_sock = svsk; - svsk->sk_inuse++; + atomic_inc(&svsk->sk_inuse); rqstp->rq_reserved = serv->sv_bufsz; - svsk->sk_reserved += rqstp->rq_reserved; + atomic_add(rqstp->rq_reserved, &svsk->sk_reserved); + BUG_ON(svsk->sk_pool != pool); wake_up(&rqstp->rq_wait); } else { dprintk("svc: socket %p put into queue\n", svsk->sk_sk); - list_add_tail(&svsk->sk_ready, &serv->sv_sockets); + list_add_tail(&svsk->sk_ready, &pool->sp_sockets); + BUG_ON(svsk->sk_pool != pool); } out_unlock: - spin_unlock_bh(&serv->sv_lock); + spin_unlock_bh(&pool->sp_lock); } /* - * Dequeue the first socket. Must be called with the serv->sv_lock held. + * Dequeue the first socket. Must be called with the pool->sp_lock held. */ static inline struct svc_sock * -svc_sock_dequeue(struct svc_serv *serv) +svc_sock_dequeue(struct svc_pool *pool) { struct svc_sock *svsk; - if (list_empty(&serv->sv_sockets)) + if (list_empty(&pool->sp_sockets)) return NULL; - svsk = list_entry(serv->sv_sockets.next, + svsk = list_entry(pool->sp_sockets.next, struct svc_sock, sk_ready); list_del_init(&svsk->sk_ready); dprintk("svc: socket %p dequeued, inuse=%d\n", - svsk->sk_sk, svsk->sk_inuse); + svsk->sk_sk, atomic_read(&svsk->sk_inuse)); return svsk; } @@ -241,6 +264,7 @@ svc_sock_dequeue(struct svc_serv *serv) static inline void svc_sock_received(struct svc_sock *svsk) { + svsk->sk_pool = NULL; clear_bit(SK_BUSY, &svsk->sk_flags); svc_sock_enqueue(svsk); } @@ -262,10 +286,8 @@ void svc_reserve(struct svc_rqst *rqstp, int space) if (space < rqstp->rq_reserved) { struct svc_sock *svsk = rqstp->rq_sock; - spin_lock_bh(&svsk->sk_server->sv_lock); - svsk->sk_reserved -= (rqstp->rq_reserved - space); + atomic_sub((rqstp->rq_reserved - space), &svsk->sk_reserved); rqstp->rq_reserved = space; - spin_unlock_bh(&svsk->sk_server->sv_lock); svc_sock_enqueue(svsk); } @@ -277,17 +299,11 @@ void svc_reserve(struct svc_rqst *rqstp, int space) static inline void svc_sock_put(struct svc_sock *svsk) { - struct svc_serv *serv = svsk->sk_server; - - spin_lock_bh(&serv->sv_lock); - if (!--(svsk->sk_inuse) && test_bit(SK_DEAD, &svsk->sk_flags)) { - spin_unlock_bh(&serv->sv_lock); + if (atomic_dec_and_test(&svsk->sk_inuse) && test_bit(SK_DEAD, &svsk->sk_flags)) { dprintk("svc: releasing dead socket\n"); sock_release(svsk->sk_sock); kfree(svsk); } - else - spin_unlock_bh(&serv->sv_lock); } static void @@ -321,25 +337,33 @@ svc_sock_release(struct svc_rqst *rqstp) /* * External function to wake up a server waiting for data + * This really only makes sense for services like lockd + * which have exactly one thread anyway. */ void svc_wake_up(struct svc_serv *serv) { struct svc_rqst *rqstp; - - spin_lock_bh(&serv->sv_lock); - if (!list_empty(&serv->sv_threads)) { - rqstp = list_entry(serv->sv_threads.next, - struct svc_rqst, - rq_list); - dprintk("svc: daemon %p woken up.\n", rqstp); - /* - svc_serv_dequeue(serv, rqstp); - rqstp->rq_sock = NULL; - */ - wake_up(&rqstp->rq_wait); + unsigned int i; + struct svc_pool *pool; + + for (i = 0; i < serv->sv_nrpools; i++) { + pool = &serv->sv_pools[i]; + + spin_lock_bh(&pool->sp_lock); + if (!list_empty(&pool->sp_threads)) { + rqstp = list_entry(pool->sp_threads.next, + struct svc_rqst, + rq_list); + dprintk("svc: daemon %p woken up.\n", rqstp); + /* + svc_thread_dequeue(pool, rqstp); + rqstp->rq_sock = NULL; + */ + wake_up(&rqstp->rq_wait); + } + spin_unlock_bh(&pool->sp_lock); } - spin_unlock_bh(&serv->sv_lock); } /* @@ -429,6 +453,51 @@ out: } /* + * Report socket names for nfsdfs + */ +static int one_sock_name(char *buf, struct svc_sock *svsk) +{ + int len; + + switch(svsk->sk_sk->sk_family) { + case AF_INET: + len = sprintf(buf, "ipv4 %s %u.%u.%u.%u %d\n", + svsk->sk_sk->sk_protocol==IPPROTO_UDP? + "udp" : "tcp", + NIPQUAD(inet_sk(svsk->sk_sk)->rcv_saddr), + inet_sk(svsk->sk_sk)->num); + break; + default: + len = sprintf(buf, "*unknown-%d*\n", + svsk->sk_sk->sk_family); + } + return len; +} + +int +svc_sock_names(char *buf, struct svc_serv *serv, char *toclose) +{ + struct svc_sock *svsk, *closesk = NULL; + int len = 0; + + if (!serv) + return 0; + spin_lock(&serv->sv_lock); + list_for_each_entry(svsk, &serv->sv_permsocks, sk_list) { + int onelen = one_sock_name(buf+len, svsk); + if (toclose && strcmp(toclose, buf+len) == 0) + closesk = svsk; + else + len += onelen; + } + spin_unlock(&serv->sv_lock); + if (closesk) + svc_delete_socket(closesk); + return len; +} +EXPORT_SYMBOL(svc_sock_names); + +/* * Check input queue length */ static int @@ -557,7 +626,10 @@ svc_udp_recvfrom(struct svc_rqst *rqstp) /* udp sockets need large rcvbuf as all pending * requests are still in that buffer. sndbuf must * also be large enough that there is enough space - * for one reply per thread. + * for one reply per thread. We count all threads + * rather than threads in a particular pool, which + * provides an upper bound on the number of threads + * which will access the socket. */ svc_sock_setbufsize(svsk->sk_sock, (serv->sv_nrthreads+3) * serv->sv_bufsz, @@ -844,7 +916,7 @@ svc_tcp_accept(struct svc_sock *svsk) struct svc_sock, sk_list); set_bit(SK_CLOSE, &svsk->sk_flags); - svsk->sk_inuse ++; + atomic_inc(&svsk->sk_inuse); } spin_unlock_bh(&serv->sv_lock); @@ -902,6 +974,11 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp) /* sndbuf needs to have room for one request * per thread, otherwise we can stall even when the * network isn't a bottleneck. + * + * We count all threads rather than threads in a + * particular pool, which provides an upper bound + * on the number of threads which will access the socket. + * * rcvbuf just needs to be able to hold a few requests. * Normally they will be removed from the queue * as soon a a complete request arrives. @@ -1117,12 +1194,16 @@ svc_sock_update_bufs(struct svc_serv *serv) } /* - * Receive the next request on any socket. + * Receive the next request on any socket. This code is carefully + * organised not to touch any cachelines in the shared svc_serv + * structure, only cachelines in the local svc_pool. */ int -svc_recv(struct svc_serv *serv, struct svc_rqst *rqstp, long timeout) +svc_recv(struct svc_rqst *rqstp, long timeout) { struct svc_sock *svsk =NULL; + struct svc_serv *serv = rqstp->rq_server; + struct svc_pool *pool = rqstp->rq_pool; int len; int pages; struct xdr_buf *arg; @@ -1172,32 +1253,15 @@ svc_recv(struct svc_serv *serv, struct svc_rqst *rqstp, long timeout) if (signalled()) return -EINTR; - spin_lock_bh(&serv->sv_lock); - if (!list_empty(&serv->sv_tempsocks)) { - svsk = list_entry(serv->sv_tempsocks.next, - struct svc_sock, sk_list); - /* apparently the "standard" is that clients close - * idle connections after 5 minutes, servers after - * 6 minutes - * http://www.connectathon.org/talks96/nfstcp.pdf - */ - if (get_seconds() - svsk->sk_lastrecv < 6*60 - || test_bit(SK_BUSY, &svsk->sk_flags)) - svsk = NULL; - } - if (svsk) { - set_bit(SK_BUSY, &svsk->sk_flags); - set_bit(SK_CLOSE, &svsk->sk_flags); - rqstp->rq_sock = svsk; - svsk->sk_inuse++; - } else if ((svsk = svc_sock_dequeue(serv)) != NULL) { + spin_lock_bh(&pool->sp_lock); + if ((svsk = svc_sock_dequeue(pool)) != NULL) { rqstp->rq_sock = svsk; - svsk->sk_inuse++; + atomic_inc(&svsk->sk_inuse); rqstp->rq_reserved = serv->sv_bufsz; - svsk->sk_reserved += rqstp->rq_reserved; + atomic_add(rqstp->rq_reserved, &svsk->sk_reserved); } else { /* No data pending. Go to sleep */ - svc_serv_enqueue(serv, rqstp); + svc_thread_enqueue(pool, rqstp); /* * We have to be able to interrupt this wait @@ -1205,26 +1269,26 @@ svc_recv(struct svc_serv *serv, struct svc_rqst *rqstp, long timeout) */ set_current_state(TASK_INTERRUPTIBLE); add_wait_queue(&rqstp->rq_wait, &wait); - spin_unlock_bh(&serv->sv_lock); + spin_unlock_bh(&pool->sp_lock); schedule_timeout(timeout); try_to_freeze(); - spin_lock_bh(&serv->sv_lock); + spin_lock_bh(&pool->sp_lock); remove_wait_queue(&rqstp->rq_wait, &wait); if (!(svsk = rqstp->rq_sock)) { - svc_serv_dequeue(serv, rqstp); - spin_unlock_bh(&serv->sv_lock); + svc_thread_dequeue(pool, rqstp); + spin_unlock_bh(&pool->sp_lock); dprintk("svc: server %p, no data yet\n", rqstp); return signalled()? -EINTR : -EAGAIN; } } - spin_unlock_bh(&serv->sv_lock); + spin_unlock_bh(&pool->sp_lock); - dprintk("svc: server %p, socket %p, inuse=%d\n", - rqstp, svsk, svsk->sk_inuse); + dprintk("svc: server %p, pool %u, socket %p, inuse=%d\n", + rqstp, pool->sp_id, svsk, atomic_read(&svsk->sk_inuse)); len = svsk->sk_recvfrom(rqstp); dprintk("svc: got len=%d\n", len); @@ -1235,13 +1299,7 @@ svc_recv(struct svc_serv *serv, struct svc_rqst *rqstp, long timeout) return -EAGAIN; } svsk->sk_lastrecv = get_seconds(); - if (test_bit(SK_TEMP, &svsk->sk_flags)) { - /* push active sockets to end of list */ - spin_lock_bh(&serv->sv_lock); - if (!list_empty(&svsk->sk_list)) - list_move_tail(&svsk->sk_list, &serv->sv_tempsocks); - spin_unlock_bh(&serv->sv_lock); - } + clear_bit(SK_OLD, &svsk->sk_flags); rqstp->rq_secure = ntohs(rqstp->rq_addr.sin_port) < 1024; rqstp->rq_chandle.defer = svc_defer; @@ -1301,6 +1359,58 @@ svc_send(struct svc_rqst *rqstp) } /* + * Timer function to close old temporary sockets, using + * a mark-and-sweep algorithm. + */ +static void +svc_age_temp_sockets(unsigned long closure) +{ + struct svc_serv *serv = (struct svc_serv *)closure; + struct svc_sock *svsk; + struct list_head *le, *next; + LIST_HEAD(to_be_aged); + + dprintk("svc_age_temp_sockets\n"); + + if (!spin_trylock_bh(&serv->sv_lock)) { + /* busy, try again 1 sec later */ + dprintk("svc_age_temp_sockets: busy\n"); + mod_timer(&serv->sv_temptimer, jiffies + HZ); + return; + } + + list_for_each_safe(le, next, &serv->sv_tempsocks) { + svsk = list_entry(le, struct svc_sock, sk_list); + + if (!test_and_set_bit(SK_OLD, &svsk->sk_flags)) + continue; + if (atomic_read(&svsk->sk_inuse) || test_bit(SK_BUSY, &svsk->sk_flags)) + continue; + atomic_inc(&svsk->sk_inuse); + list_move(le, &to_be_aged); + set_bit(SK_CLOSE, &svsk->sk_flags); + set_bit(SK_DETACHED, &svsk->sk_flags); + } + spin_unlock_bh(&serv->sv_lock); + + while (!list_empty(&to_be_aged)) { + le = to_be_aged.next; + /* fiddling the sk_list node is safe 'cos we're SK_DETACHED */ + list_del_init(le); + svsk = list_entry(le, struct svc_sock, sk_list); + + dprintk("queuing svsk %p for closing, %lu seconds old\n", + svsk, get_seconds() - svsk->sk_lastrecv); + + /* a thread will dequeue and close it soon */ + svc_sock_enqueue(svsk); + svc_sock_put(svsk); + } + + mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ); +} + +/* * Initialize socket for RPC use and create svc_sock struct * XXX: May want to setsockopt SO_SNDBUF and SO_RCVBUF. */ @@ -1337,7 +1447,9 @@ svc_setup_socket(struct svc_serv *serv, struct socket *sock, svsk->sk_odata = inet->sk_data_ready; svsk->sk_owspace = inet->sk_write_space; svsk->sk_server = serv; + atomic_set(&svsk->sk_inuse, 0); svsk->sk_lastrecv = get_seconds(); + spin_lock_init(&svsk->sk_defer_lock); INIT_LIST_HEAD(&svsk->sk_deferred); INIT_LIST_HEAD(&svsk->sk_ready); mutex_init(&svsk->sk_mutex); @@ -1353,6 +1465,13 @@ svc_setup_socket(struct svc_serv *serv, struct socket *sock, set_bit(SK_TEMP, &svsk->sk_flags); list_add(&svsk->sk_list, &serv->sv_tempsocks); serv->sv_tmpcnt++; + if (serv->sv_temptimer.function == NULL) { + /* setup timer to age temp sockets */ + setup_timer(&serv->sv_temptimer, svc_age_temp_sockets, + (unsigned long)serv); + mod_timer(&serv->sv_temptimer, + jiffies + svc_conn_age_period * HZ); + } } else { clear_bit(SK_TEMP, &svsk->sk_flags); list_add(&svsk->sk_list, &serv->sv_permsocks); @@ -1367,6 +1486,38 @@ svc_setup_socket(struct svc_serv *serv, struct socket *sock, return svsk; } +int svc_addsock(struct svc_serv *serv, + int fd, + char *name_return, + int *proto) +{ + int err = 0; + struct socket *so = sockfd_lookup(fd, &err); + struct svc_sock *svsk = NULL; + + if (!so) + return err; + if (so->sk->sk_family != AF_INET) + err = -EAFNOSUPPORT; + else if (so->sk->sk_protocol != IPPROTO_TCP && + so->sk->sk_protocol != IPPROTO_UDP) + err = -EPROTONOSUPPORT; + else if (so->state > SS_UNCONNECTED) + err = -EISCONN; + else { + svsk = svc_setup_socket(serv, so, &err, 1); + if (svsk) + err = 0; + } + if (err) { + sockfd_put(so); + return err; + } + if (proto) *proto = so->sk->sk_protocol; + return one_sock_name(name_return, svsk); +} +EXPORT_SYMBOL_GPL(svc_addsock); + /* * Create socket for RPC service. */ @@ -1434,15 +1585,25 @@ svc_delete_socket(struct svc_sock *svsk) spin_lock_bh(&serv->sv_lock); - list_del_init(&svsk->sk_list); - list_del_init(&svsk->sk_ready); + if (!test_and_set_bit(SK_DETACHED, &svsk->sk_flags)) + list_del_init(&svsk->sk_list); + /* + * We used to delete the svc_sock from whichever list + * it's sk_ready node was on, but we don't actually + * need to. This is because the only time we're called + * while still attached to a queue, the queue itself + * is about to be destroyed (in svc_destroy). + */ if (!test_and_set_bit(SK_DEAD, &svsk->sk_flags)) if (test_bit(SK_TEMP, &svsk->sk_flags)) serv->sv_tmpcnt--; - if (!svsk->sk_inuse) { + if (!atomic_read(&svsk->sk_inuse)) { spin_unlock_bh(&serv->sv_lock); - sock_release(svsk->sk_sock); + if (svsk->sk_sock->file) + sockfd_put(svsk->sk_sock); + else + sock_release(svsk->sk_sock); kfree(svsk); } else { spin_unlock_bh(&serv->sv_lock); @@ -1473,7 +1634,6 @@ svc_makesock(struct svc_serv *serv, int protocol, unsigned short port) static void svc_revisit(struct cache_deferred_req *dreq, int too_many) { struct svc_deferred_req *dr = container_of(dreq, struct svc_deferred_req, handle); - struct svc_serv *serv = dreq->owner; struct svc_sock *svsk; if (too_many) { @@ -1484,9 +1644,9 @@ static void svc_revisit(struct cache_deferred_req *dreq, int too_many) dprintk("revisit queued\n"); svsk = dr->svsk; dr->svsk = NULL; - spin_lock_bh(&serv->sv_lock); + spin_lock_bh(&svsk->sk_defer_lock); list_add(&dr->handle.recent, &svsk->sk_deferred); - spin_unlock_bh(&serv->sv_lock); + spin_unlock_bh(&svsk->sk_defer_lock); set_bit(SK_DEFERRED, &svsk->sk_flags); svc_sock_enqueue(svsk); svc_sock_put(svsk); @@ -1518,10 +1678,8 @@ svc_defer(struct cache_req *req) dr->argslen = rqstp->rq_arg.len >> 2; memcpy(dr->args, rqstp->rq_arg.head[0].iov_base-skip, dr->argslen<<2); } - spin_lock_bh(&rqstp->rq_server->sv_lock); - rqstp->rq_sock->sk_inuse++; + atomic_inc(&rqstp->rq_sock->sk_inuse); dr->svsk = rqstp->rq_sock; - spin_unlock_bh(&rqstp->rq_server->sv_lock); dr->handle.revisit = svc_revisit; return &dr->handle; @@ -1548,11 +1706,10 @@ static int svc_deferred_recv(struct svc_rqst *rqstp) static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk) { struct svc_deferred_req *dr = NULL; - struct svc_serv *serv = svsk->sk_server; if (!test_bit(SK_DEFERRED, &svsk->sk_flags)) return NULL; - spin_lock_bh(&serv->sv_lock); + spin_lock_bh(&svsk->sk_defer_lock); clear_bit(SK_DEFERRED, &svsk->sk_flags); if (!list_empty(&svsk->sk_deferred)) { dr = list_entry(svsk->sk_deferred.next, @@ -1561,6 +1718,6 @@ static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk) list_del_init(&dr->handle.recent); set_bit(SK_DEFERRED, &svsk->sk_flags); } - spin_unlock_bh(&serv->sv_lock); + spin_unlock_bh(&svsk->sk_defer_lock); return dr; } |