diff options
Diffstat (limited to 'fs/afs/rxrpc.c')
-rw-r--r-- | fs/afs/rxrpc.c | 137 |
1 files changed, 107 insertions, 30 deletions
diff --git a/fs/afs/rxrpc.c b/fs/afs/rxrpc.c index b92774231b3..e86c527d87a 100644 --- a/fs/afs/rxrpc.c +++ b/fs/afs/rxrpc.c @@ -17,6 +17,8 @@ static struct socket *afs_socket; /* my RxRPC socket */ static struct workqueue_struct *afs_async_calls; +static atomic_t afs_outstanding_calls; +static atomic_t afs_outstanding_skbs; static void afs_wake_up_call_waiter(struct afs_call *); static int afs_wait_for_call_to_complete(struct afs_call *); @@ -45,6 +47,7 @@ static const struct afs_wait_mode afs_async_incoming_call = { /* asynchronous incoming call initial processing */ static const struct afs_call_type afs_RXCMxxxx = { + .name = "CB.xxxx", .deliver = afs_deliver_cm_op_id, .abort_to_error = afs_abort_to_error, }; @@ -118,10 +121,67 @@ void afs_close_socket(void) _debug("dework"); destroy_workqueue(afs_async_calls); + + ASSERTCMP(atomic_read(&afs_outstanding_skbs), ==, 0); + ASSERTCMP(atomic_read(&afs_outstanding_calls), ==, 0); _leave(""); } /* + * note that the data in a socket buffer is now delivered and that the buffer + * should be freed + */ +static void afs_data_delivered(struct sk_buff *skb) +{ + if (!skb) { + _debug("DLVR NULL [%d]", atomic_read(&afs_outstanding_skbs)); + dump_stack(); + } else { + _debug("DLVR %p{%u} [%d]", + skb, skb->mark, atomic_read(&afs_outstanding_skbs)); + if (atomic_dec_return(&afs_outstanding_skbs) == -1) + BUG(); + rxrpc_kernel_data_delivered(skb); + } +} + +/* + * free a socket buffer + */ +static void afs_free_skb(struct sk_buff *skb) +{ + if (!skb) { + _debug("FREE NULL [%d]", atomic_read(&afs_outstanding_skbs)); + dump_stack(); + } else { + _debug("FREE %p{%u} [%d]", + skb, skb->mark, atomic_read(&afs_outstanding_skbs)); + if (atomic_dec_return(&afs_outstanding_skbs) == -1) + BUG(); + rxrpc_kernel_free_skb(skb); + } +} + +/* + * free a call + */ +static void afs_free_call(struct afs_call *call) +{ + _debug("DONE %p{%s} [%d]", + call, call->type->name, atomic_read(&afs_outstanding_calls)); + if (atomic_dec_return(&afs_outstanding_calls) == -1) + BUG(); + + ASSERTCMP(call->rxcall, ==, NULL); + ASSERT(!work_pending(&call->async_work)); + ASSERT(skb_queue_empty(&call->rx_queue)); + ASSERT(call->type->name != NULL); + + kfree(call->request); + kfree(call); +} + +/* * allocate a call with flat request and reply buffers */ struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type, @@ -133,30 +193,32 @@ struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type, if (!call) goto nomem_call; + _debug("CALL %p{%s} [%d]", + call, type->name, atomic_read(&afs_outstanding_calls)); + atomic_inc(&afs_outstanding_calls); + + call->type = type; + call->request_size = request_size; + call->reply_max = reply_size; + if (request_size) { call->request = kmalloc(request_size, GFP_NOFS); if (!call->request) - goto nomem_request; + goto nomem_free; } if (reply_size) { call->buffer = kmalloc(reply_size, GFP_NOFS); if (!call->buffer) - goto nomem_buffer; + goto nomem_free; } - call->type = type; - call->request_size = request_size; - call->reply_max = reply_size; - init_waitqueue_head(&call->waitq); skb_queue_head_init(&call->rx_queue); return call; -nomem_buffer: - kfree(call->request); -nomem_request: - kfree(call); +nomem_free: + afs_free_call(call); nomem_call: return NULL; } @@ -188,6 +250,12 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp, _enter("%x,{%d},", addr->s_addr, ntohs(call->port)); + ASSERT(call->type != NULL); + ASSERT(call->type->name != NULL); + + _debug("MAKE %p{%s} [%d]", + call, call->type->name, atomic_read(&afs_outstanding_calls)); + call->wait_mode = wait_mode; INIT_WORK(&call->async_work, afs_process_async_call); @@ -203,6 +271,7 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp, /* create a call */ rxcall = rxrpc_kernel_begin_call(afs_socket, &srx, call->key, (unsigned long) call, gfp); + call->key = NULL; if (IS_ERR(rxcall)) { ret = PTR_ERR(rxcall); goto error_kill_call; @@ -237,10 +306,10 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp, error_do_abort: rxrpc_kernel_abort_call(rxcall, RX_USER_ABORT); rxrpc_kernel_end_call(rxcall); + call->rxcall = NULL; error_kill_call: call->type->destructor(call); - ASSERT(skb_queue_empty(&call->rx_queue)); - kfree(call); + afs_free_call(call); _leave(" = %d", ret); return ret; } @@ -257,15 +326,19 @@ static void afs_rx_interceptor(struct sock *sk, unsigned long user_call_ID, _enter("%p,,%u", call, skb->mark); + _debug("ICPT %p{%u} [%d]", + skb, skb->mark, atomic_read(&afs_outstanding_skbs)); + ASSERTCMP(sk, ==, afs_socket->sk); + atomic_inc(&afs_outstanding_skbs); if (!call) { /* its an incoming call for our callback service */ - __skb_queue_tail(&afs_incoming_calls, skb); + skb_queue_tail(&afs_incoming_calls, skb); schedule_work(&afs_collect_incoming_call_work); } else { /* route the messages directly to the appropriate call */ - __skb_queue_tail(&call->rx_queue, skb); + skb_queue_tail(&call->rx_queue, skb); call->wait_mode->rx_wakeup(call); } @@ -317,9 +390,9 @@ static void afs_deliver_to_call(struct afs_call *call) call->state = AFS_CALL_ERROR; break; } - rxrpc_kernel_data_delivered(skb); + afs_data_delivered(skb); skb = NULL; - break; + continue; case RXRPC_SKB_MARK_FINAL_ACK: _debug("Rcv ACK"); call->state = AFS_CALL_COMPLETE; @@ -350,19 +423,19 @@ static void afs_deliver_to_call(struct afs_call *call) break; } - rxrpc_kernel_free_skb(skb); + afs_free_skb(skb); } /* make sure the queue is empty if the call is done with (we might have * aborted the call early because of an unmarshalling error) */ if (call->state >= AFS_CALL_COMPLETE) { while ((skb = skb_dequeue(&call->rx_queue))) - rxrpc_kernel_free_skb(skb); + afs_free_skb(skb); if (call->incoming) { rxrpc_kernel_end_call(call->rxcall); + call->rxcall = NULL; call->type->destructor(call); - ASSERT(skb_queue_empty(&call->rx_queue)); - kfree(call); + afs_free_call(call); } } @@ -409,14 +482,14 @@ static int afs_wait_for_call_to_complete(struct afs_call *call) _debug("call incomplete"); rxrpc_kernel_abort_call(call->rxcall, RX_CALL_DEAD); while ((skb = skb_dequeue(&call->rx_queue))) - rxrpc_kernel_free_skb(skb); + afs_free_skb(skb); } _debug("call complete"); rxrpc_kernel_end_call(call->rxcall); + call->rxcall = NULL; call->type->destructor(call); - ASSERT(skb_queue_empty(&call->rx_queue)); - kfree(call); + afs_free_call(call); _leave(" = %d", ret); return ret; } @@ -459,9 +532,7 @@ static void afs_delete_async_call(struct work_struct *work) _enter(""); - ASSERT(skb_queue_empty(&call->rx_queue)); - ASSERT(!work_pending(&call->async_work)); - kfree(call); + afs_free_call(call); _leave(""); } @@ -489,6 +560,7 @@ static void afs_process_async_call(struct work_struct *work) /* kill the call */ rxrpc_kernel_end_call(call->rxcall); + call->rxcall = NULL; if (call->type->destructor) call->type->destructor(call); @@ -526,7 +598,7 @@ static void afs_collect_incoming_call(struct work_struct *work) _debug("new call"); /* don't need the notification */ - rxrpc_kernel_free_skb(skb); + afs_free_skb(skb); if (!call) { call = kzalloc(sizeof(struct afs_call), GFP_KERNEL); @@ -541,6 +613,11 @@ static void afs_collect_incoming_call(struct work_struct *work) init_waitqueue_head(&call->waitq); skb_queue_head_init(&call->rx_queue); call->state = AFS_CALL_AWAIT_OP_ID; + + _debug("CALL %p{%s} [%d]", + call, call->type->name, + atomic_read(&afs_outstanding_calls)); + atomic_inc(&afs_outstanding_calls); } rxcall = rxrpc_kernel_accept_call(afs_socket, @@ -551,7 +628,8 @@ static void afs_collect_incoming_call(struct work_struct *work) } } - kfree(call); + if (call) + afs_free_call(call); } /* @@ -629,8 +707,7 @@ void afs_send_empty_reply(struct afs_call *call) rxrpc_kernel_end_call(call->rxcall); call->rxcall = NULL; call->type->destructor(call); - ASSERT(skb_queue_empty(&call->rx_queue)); - kfree(call); + afs_free_call(call); _leave(" [error]"); return; } |