diff options
Diffstat (limited to 'fs/dlm/lowcomms-tcp.c')
-rw-r--r-- | fs/dlm/lowcomms-tcp.c | 254 |
1 files changed, 51 insertions, 203 deletions
diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms-tcp.c index b4fb5783ef8..86e5f81da7c 100644 --- a/fs/dlm/lowcomms-tcp.c +++ b/fs/dlm/lowcomms-tcp.c @@ -115,6 +115,8 @@ struct connection { atomic_t waiting_requests; #define MAX_CONNECT_RETRIES 3 struct connection *othercon; + struct work_struct rwork; /* Receive workqueue */ + struct work_struct swork; /* Send workqueue */ }; #define sock2con(x) ((struct connection *)(x)->sk_user_data) @@ -131,14 +133,9 @@ struct writequeue_entry { static struct sockaddr_storage dlm_local_addr; -/* Manage daemons */ -static struct task_struct *recv_task; -static struct task_struct *send_task; - -static wait_queue_t lowcomms_send_waitq_head; -static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq); -static wait_queue_t lowcomms_recv_waitq_head; -static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq); +/* Work queues */ +static struct workqueue_struct *recv_workqueue; +static struct workqueue_struct *send_workqueue; /* An array of pointers to connections, indexed by NODEID */ static struct connection **connections; @@ -146,17 +143,8 @@ static DECLARE_MUTEX(connections_lock); static struct kmem_cache *con_cache; static int conn_array_size; -/* List of sockets that have reads pending */ -static LIST_HEAD(read_sockets); -static DEFINE_SPINLOCK(read_sockets_lock); - -/* List of sockets which have writes pending */ -static LIST_HEAD(write_sockets); -static DEFINE_SPINLOCK(write_sockets_lock); - -/* List of sockets which have connects pending */ -static LIST_HEAD(state_sockets); -static DEFINE_SPINLOCK(state_sockets_lock); +static void process_recv_sockets(struct work_struct *work); +static void process_send_sockets(struct work_struct *work); static struct connection *nodeid2con(int nodeid, gfp_t allocation) { @@ -189,6 +177,8 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation) init_rwsem(&con->sock_sem); INIT_LIST_HEAD(&con->writequeue); spin_lock_init(&con->writequeue_lock); + INIT_WORK(&con->swork, process_send_sockets); + INIT_WORK(&con->rwork, process_recv_sockets); connections[nodeid] = con; } @@ -203,41 +193,22 @@ static void lowcomms_data_ready(struct sock *sk, int count_unused) { struct connection *con = sock2con(sk); - atomic_inc(&con->waiting_requests); - if (test_and_set_bit(CF_READ_PENDING, &con->flags)) - return; - - spin_lock_bh(&read_sockets_lock); - list_add_tail(&con->read_list, &read_sockets); - spin_unlock_bh(&read_sockets_lock); - - wake_up_interruptible(&lowcomms_recv_waitq); + if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) + queue_work(recv_workqueue, &con->rwork); } static void lowcomms_write_space(struct sock *sk) { struct connection *con = sock2con(sk); - if (test_and_set_bit(CF_WRITE_PENDING, &con->flags)) - return; - - spin_lock_bh(&write_sockets_lock); - list_add_tail(&con->write_list, &write_sockets); - spin_unlock_bh(&write_sockets_lock); - - wake_up_interruptible(&lowcomms_send_waitq); + if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) + queue_work(send_workqueue, &con->swork); } static inline void lowcomms_connect_sock(struct connection *con) { - if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) - return; - - spin_lock_bh(&state_sockets_lock); - list_add_tail(&con->state_list, &state_sockets); - spin_unlock_bh(&state_sockets_lock); - - wake_up_interruptible(&lowcomms_send_waitq); + if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) + queue_work(send_workqueue, &con->swork); } static void lowcomms_state_change(struct sock *sk) @@ -388,7 +359,8 @@ out: return 0; out_resched: - lowcomms_data_ready(con->sock->sk, 0); + if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) + queue_work(recv_workqueue, &con->rwork); up_read(&con->sock_sem); cond_resched(); return 0; @@ -477,6 +449,8 @@ static int accept_from_sock(struct connection *con) othercon->nodeid = nodeid; othercon->rx_action = receive_from_sock; init_rwsem(&othercon->sock_sem); + INIT_WORK(&othercon->swork, process_send_sockets); + INIT_WORK(&othercon->rwork, process_recv_sockets); set_bit(CF_IS_OTHERCON, &othercon->flags); newcon->othercon = othercon; } @@ -498,7 +472,8 @@ static int accept_from_sock(struct connection *con) * beween processing the accept adding the socket * to the read_sockets list */ - lowcomms_data_ready(newsock->sk, 0); + if (!test_and_set_bit(CF_READ_PENDING, &newcon->flags)) + queue_work(recv_workqueue, &newcon->rwork); up_read(&con->sock_sem); return 0; @@ -757,12 +732,8 @@ void dlm_lowcomms_commit_buffer(void *mh) kunmap(e->page); spin_unlock(&con->writequeue_lock); - if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) { - spin_lock_bh(&write_sockets_lock); - list_add_tail(&con->write_list, &write_sockets); - spin_unlock_bh(&write_sockets_lock); - - wake_up_interruptible(&lowcomms_send_waitq); + if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { + queue_work(send_workqueue, &con->swork); } return; @@ -803,6 +774,7 @@ static void send_to_sock(struct connection *con) offset = e->offset; BUG_ON(len == 0 && e->users == 0); spin_unlock(&con->writequeue_lock); + kmap(e->page); ret = 0; if (len) { @@ -884,85 +856,29 @@ out: } /* Look for activity on active sockets */ -static void process_sockets(void) +static void process_recv_sockets(struct work_struct *work) { - struct list_head *list; - struct list_head *temp; - int count = 0; - - spin_lock_bh(&read_sockets_lock); - list_for_each_safe(list, temp, &read_sockets) { - - struct connection *con = - list_entry(list, struct connection, read_list); - list_del(&con->read_list); - clear_bit(CF_READ_PENDING, &con->flags); - - spin_unlock_bh(&read_sockets_lock); + struct connection *con = container_of(work, struct connection, rwork); + int err; - /* This can reach zero if we are processing requests - * as they come in. - */ - if (atomic_read(&con->waiting_requests) == 0) { - spin_lock_bh(&read_sockets_lock); - continue; - } - - do { - con->rx_action(con); - - /* Don't starve out everyone else */ - if (++count >= MAX_RX_MSG_COUNT) { - cond_resched(); - count = 0; - } - - } while (!atomic_dec_and_test(&con->waiting_requests) && - !kthread_should_stop()); - - spin_lock_bh(&read_sockets_lock); - } - spin_unlock_bh(&read_sockets_lock); + clear_bit(CF_READ_PENDING, &con->flags); + do { + err = con->rx_action(con); + } while (!err); } -/* Try to send any messages that are pending - */ -static void process_output_queue(void) -{ - struct list_head *list; - struct list_head *temp; - - spin_lock_bh(&write_sockets_lock); - list_for_each_safe(list, temp, &write_sockets) { - struct connection *con = - list_entry(list, struct connection, write_list); - clear_bit(CF_WRITE_PENDING, &con->flags); - list_del(&con->write_list); - - spin_unlock_bh(&write_sockets_lock); - send_to_sock(con); - spin_lock_bh(&write_sockets_lock); - } - spin_unlock_bh(&write_sockets_lock); -} -static void process_state_queue(void) +static void process_send_sockets(struct work_struct *work) { - struct list_head *list; - struct list_head *temp; - - spin_lock_bh(&state_sockets_lock); - list_for_each_safe(list, temp, &state_sockets) { - struct connection *con = - list_entry(list, struct connection, state_list); - list_del(&con->state_list); - clear_bit(CF_CONNECT_PENDING, &con->flags); - spin_unlock_bh(&state_sockets_lock); + struct connection *con = container_of(work, struct connection, swork); + if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { connect_to_sock(con); - spin_lock_bh(&state_sockets_lock); } - spin_unlock_bh(&state_sockets_lock); + + if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags)) { + send_to_sock(con); + } } @@ -979,97 +895,29 @@ static void clean_writequeues(void) } } -static int read_list_empty(void) -{ - int status; - - spin_lock_bh(&read_sockets_lock); - status = list_empty(&read_sockets); - spin_unlock_bh(&read_sockets_lock); - - return status; -} - -/* DLM Transport comms receive daemon */ -static int dlm_recvd(void *data) +static void work_stop(void) { - init_waitqueue_entry(&lowcomms_recv_waitq_head, current); - add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head); - - while (!kthread_should_stop()) { - set_current_state(TASK_INTERRUPTIBLE); - if (read_list_empty()) - schedule(); - set_current_state(TASK_RUNNING); - - process_sockets(); - } - - return 0; + destroy_workqueue(recv_workqueue); + destroy_workqueue(send_workqueue); } -static int write_and_state_lists_empty(void) +static int work_start(void) { - int status; - - spin_lock_bh(&write_sockets_lock); - status = list_empty(&write_sockets); - spin_unlock_bh(&write_sockets_lock); - - spin_lock_bh(&state_sockets_lock); - if (list_empty(&state_sockets) == 0) - status = 0; - spin_unlock_bh(&state_sockets_lock); - - return status; -} - -/* DLM Transport send daemon */ -static int dlm_sendd(void *data) -{ - init_waitqueue_entry(&lowcomms_send_waitq_head, current); - add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head); - - while (!kthread_should_stop()) { - set_current_state(TASK_INTERRUPTIBLE); - if (write_and_state_lists_empty()) - schedule(); - set_current_state(TASK_RUNNING); - - process_state_queue(); - process_output_queue(); - } - - return 0; -} - -static void daemons_stop(void) -{ - kthread_stop(recv_task); - kthread_stop(send_task); -} - -static int daemons_start(void) -{ - struct task_struct *p; int error; - - p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); - error = IS_ERR(p); + recv_workqueue = create_workqueue("dlm_recv"); + error = IS_ERR(recv_workqueue); if (error) { - log_print("can't start dlm_recvd %d", error); + log_print("can't start dlm_recv %d", error); return error; } - recv_task = p; - p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); - error = IS_ERR(p); + send_workqueue = create_singlethread_workqueue("dlm_send"); + error = IS_ERR(send_workqueue); if (error) { - log_print("can't start dlm_sendd %d", error); - kthread_stop(recv_task); + log_print("can't start dlm_send %d", error); + destroy_workqueue(recv_workqueue); return error; } - send_task = p; return 0; } @@ -1086,7 +934,7 @@ void dlm_lowcomms_stop(void) connections[i]->flags |= 0xFF; } - daemons_stop(); + work_stop(); clean_writequeues(); for (i = 0; i < conn_array_size; i++) { @@ -1138,7 +986,7 @@ int dlm_lowcomms_start(void) if (error) goto fail_unlisten; - error = daemons_start(); + error = work_start(); if (error) goto fail_unlisten; |