diff options
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r-- | fs/dlm/lock.c | 249 |
1 files changed, 177 insertions, 72 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 3915b8e1414..ff4a198fa67 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1,7 +1,7 @@ /****************************************************************************** ******************************************************************************* ** -** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved. +** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -88,7 +88,6 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, static int receive_extralen(struct dlm_message *ms); static void do_purge(struct dlm_ls *ls, int nodeid, int pid); static void del_timeout(struct dlm_lkb *lkb); -void dlm_timeout_warn(struct dlm_lkb *lkb); /* * Lock compatibilty matrix - thanks Steve @@ -335,7 +334,7 @@ static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len) { struct dlm_rsb *r; - r = allocate_rsb(ls, len); + r = dlm_allocate_rsb(ls, len); if (!r) return NULL; @@ -478,7 +477,7 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen, error = _search_rsb(ls, name, namelen, bucket, 0, &tmp); if (!error) { write_unlock(&ls->ls_rsbtbl[bucket].lock); - free_rsb(r); + dlm_free_rsb(r); r = tmp; goto out; } @@ -490,12 +489,6 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen, return error; } -int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen, - unsigned int flags, struct dlm_rsb **r_ret) -{ - return find_rsb(ls, name, namelen, flags, r_ret); -} - /* This is only called to add a reference when the code already holds a valid reference to the rsb, so there's no need for locking. */ @@ -519,7 +512,7 @@ static void toss_rsb(struct kref *kref) list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss); r->res_toss_time = jiffies; if (r->res_lvbptr) { - free_lvb(r->res_lvbptr); + dlm_free_lvb(r->res_lvbptr); r->res_lvbptr = NULL; } } @@ -589,7 +582,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) uint32_t lkid = 0; uint16_t bucket; - lkb = allocate_lkb(ls); + lkb = dlm_allocate_lkb(ls); if (!lkb) return -ENOMEM; @@ -683,8 +676,8 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) /* for local/process lkbs, lvbptr points to caller's lksb */ if (lkb->lkb_lvbptr && is_master_copy(lkb)) - free_lvb(lkb->lkb_lvbptr); - free_lkb(lkb); + dlm_free_lvb(lkb->lkb_lvbptr); + dlm_free_lkb(lkb); return 1; } else { write_unlock(&ls->ls_lkbtbl[bucket].lock); @@ -988,7 +981,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b) if (is_master(r)) dir_remove(r); - free_rsb(r); + dlm_free_rsb(r); count++; } else { write_unlock(&ls->ls_rsbtbl[b].lock); @@ -1171,7 +1164,7 @@ static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) return; if (!r->res_lvbptr) - r->res_lvbptr = allocate_lvb(r->res_ls); + r->res_lvbptr = dlm_allocate_lvb(r->res_ls); if (!r->res_lvbptr) return; @@ -1203,7 +1196,7 @@ static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) return; if (!r->res_lvbptr) - r->res_lvbptr = allocate_lvb(r->res_ls); + r->res_lvbptr = dlm_allocate_lvb(r->res_ls); if (!r->res_lvbptr) return; @@ -1852,7 +1845,7 @@ static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb) static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) { struct dlm_ls *ls = r->res_ls; - int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid(); + int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid(); if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) { rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); @@ -1886,7 +1879,7 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) return 1; } - for (;;) { + for (i = 0; i < 2; i++) { /* It's possible for dlm_scand to remove an old rsb for this same resource from the toss list, us to create a new one, look up the master locally, and find it @@ -1900,6 +1893,8 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) log_debug(ls, "dir_lookup error %d %s", error, r->res_name); schedule(); } + if (error && error != -EEXIST) + return error; if (ret_nodeid == our_nodeid) { r->res_first_lkid = 0; @@ -1941,8 +1936,11 @@ static void confirm_master(struct dlm_rsb *r, int error) break; case -EAGAIN: - /* the remote master didn't queue our NOQUEUE request; - make a waiting lkb the first_lkid */ + case -EBADR: + case -ENOTBLK: + /* the remote request failed and won't be retried (it was + a NOQUEUE, or has been canceled/unlocked); make a waiting + lkb the first_lkid */ r->res_first_lkid = 0; @@ -2108,17 +2106,18 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) /* an lkb may be waiting for an rsb lookup to complete where the lookup was initiated by another lock */ - if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { - if (!list_empty(&lkb->lkb_rsb_lookup)) { + if (!list_empty(&lkb->lkb_rsb_lookup)) { + if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id); list_del_init(&lkb->lkb_rsb_lookup); queue_cast(lkb->lkb_resource, lkb, args->flags & DLM_LKF_CANCEL ? -DLM_ECANCEL : -DLM_EUNLOCK); unhold_lkb(lkb); /* undoes create_lkb() */ - rv = -EBUSY; - goto out; } + /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */ + rv = -EBUSY; + goto out; } /* cancel not allowed with another cancel/unlock in progress */ @@ -2986,7 +2985,7 @@ static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, if (lkb->lkb_exflags & DLM_LKF_VALBLK) { if (!lkb->lkb_lvbptr) - lkb->lkb_lvbptr = allocate_lvb(ls); + lkb->lkb_lvbptr = dlm_allocate_lvb(ls); if (!lkb->lkb_lvbptr) return -ENOMEM; len = receive_extralen(ms); @@ -3006,11 +3005,9 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST); lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP); - DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb);); - if (lkb->lkb_exflags & DLM_LKF_VALBLK) { /* lkb was just created so there won't be an lvb yet */ - lkb->lkb_lvbptr = allocate_lvb(ls); + lkb->lkb_lvbptr = dlm_allocate_lvb(ls); if (!lkb->lkb_lvbptr) return -ENOMEM; } @@ -3021,16 +3018,6 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_message *ms) { - if (lkb->lkb_nodeid != ms->m_header.h_nodeid) { - log_error(ls, "convert_args nodeid %d %d lkid %x %x", - lkb->lkb_nodeid, ms->m_header.h_nodeid, - lkb->lkb_id, lkb->lkb_remid); - return -EINVAL; - } - - if (!is_master_copy(lkb)) - return -EINVAL; - if (lkb->lkb_status != DLM_LKSTS_GRANTED) return -EBUSY; @@ -3046,8 +3033,6 @@ static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_message *ms) { - if (!is_master_copy(lkb)) - return -EINVAL; if (receive_lvb(ls, lkb, ms)) return -ENOMEM; return 0; @@ -3063,6 +3048,50 @@ static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms) lkb->lkb_remid = ms->m_lkid; } +/* This is called after the rsb is locked so that we can safely inspect + fields in the lkb. */ + +static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) +{ + int from = ms->m_header.h_nodeid; + int error = 0; + + switch (ms->m_type) { + case DLM_MSG_CONVERT: + case DLM_MSG_UNLOCK: + case DLM_MSG_CANCEL: + if (!is_master_copy(lkb) || lkb->lkb_nodeid != from) + error = -EINVAL; + break; + + case DLM_MSG_CONVERT_REPLY: + case DLM_MSG_UNLOCK_REPLY: + case DLM_MSG_CANCEL_REPLY: + case DLM_MSG_GRANT: + case DLM_MSG_BAST: + if (!is_process_copy(lkb) || lkb->lkb_nodeid != from) + error = -EINVAL; + break; + + case DLM_MSG_REQUEST_REPLY: + if (!is_process_copy(lkb)) + error = -EINVAL; + else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from) + error = -EINVAL; + break; + + default: + error = -EINVAL; + } + + if (error) + log_error(lkb->lkb_resource->res_ls, + "ignore invalid message %d from %d %x %x %x %d", + ms->m_type, from, lkb->lkb_id, lkb->lkb_remid, + lkb->lkb_flags, lkb->lkb_nodeid); + return error; +} + static void receive_request(struct dlm_ls *ls, struct dlm_message *ms) { struct dlm_lkb *lkb; @@ -3124,17 +3153,21 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + receive_flags(lkb, ms); error = receive_convert_args(ls, lkb, ms); if (error) - goto out; + goto out_reply; reply = !down_conversion(lkb); error = do_convert(r, lkb); - out: + out_reply: if (reply) send_convert_reply(r, lkb, error); - + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -3160,15 +3193,19 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + receive_flags(lkb, ms); error = receive_unlock_args(ls, lkb, ms); if (error) - goto out; + goto out_reply; error = do_unlock(r, lkb); - out: + out_reply: send_unlock_reply(r, lkb, error); - + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -3196,9 +3233,13 @@ static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + error = do_cancel(r, lkb); send_cancel_reply(r, lkb, error); - + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -3217,22 +3258,26 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_grant no lkb"); + log_debug(ls, "receive_grant from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); r = lkb->lkb_resource; hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + receive_flags_reply(lkb, ms); if (is_altmode(lkb)) munge_altmode(lkb, ms); grant_lock_pc(r, lkb, ms); queue_cast(r, lkb, 0); - + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -3246,18 +3291,22 @@ static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_bast no lkb"); + log_debug(ls, "receive_bast from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); r = lkb->lkb_resource; hold_rsb(r); lock_rsb(r); - queue_bast(r, lkb, ms->m_bastmode); + error = validate_message(lkb, ms); + if (error) + goto out; + queue_bast(r, lkb, ms->m_bastmode); + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -3323,15 +3372,19 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_request_reply no lkb"); + log_debug(ls, "receive_request_reply from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); r = lkb->lkb_resource; hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + mstype = lkb->lkb_wait_type; error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); if (error) @@ -3383,6 +3436,7 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) if (is_overlap(lkb)) { /* we'll ignore error in cancel/unlock reply */ queue_cast_overlap(r, lkb); + confirm_master(r, result); unhold_lkb(lkb); /* undoes create_lkb() */ } else _request_lock(r, lkb); @@ -3463,6 +3517,10 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + /* stub reply can happen with waiters_mutex held */ error = remove_from_waiters_ms(lkb, ms); if (error) @@ -3481,10 +3539,10 @@ static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_convert_reply no lkb"); + log_debug(ls, "receive_convert_reply from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); _receive_convert_reply(lkb, ms); dlm_put_lkb(lkb); @@ -3498,6 +3556,10 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + /* stub reply can happen with waiters_mutex held */ error = remove_from_waiters_ms(lkb, ms); if (error) @@ -3529,10 +3591,10 @@ static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_unlock_reply no lkb"); + log_debug(ls, "receive_unlock_reply from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); _receive_unlock_reply(lkb, ms); dlm_put_lkb(lkb); @@ -3546,6 +3608,10 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) hold_rsb(r); lock_rsb(r); + error = validate_message(lkb, ms); + if (error) + goto out; + /* stub reply can happen with waiters_mutex held */ error = remove_from_waiters_ms(lkb, ms); if (error) @@ -3577,10 +3643,10 @@ static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) error = find_lkb(ls, ms->m_remid, &lkb); if (error) { - log_error(ls, "receive_cancel_reply no lkb"); + log_debug(ls, "receive_cancel_reply from %d no lkb %x", + ms->m_header.h_nodeid, ms->m_remid); return; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); _receive_cancel_reply(lkb, ms); dlm_put_lkb(lkb); @@ -3640,6 +3706,13 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms) { + if (!dlm_is_member(ls, ms->m_header.h_nodeid)) { + log_debug(ls, "ignore non-member message %d from %d %x %x %d", + ms->m_type, ms->m_header.h_nodeid, ms->m_lkid, + ms->m_remid, ms->m_result); + return; + } + switch (ms->m_type) { /* messages sent to a master node */ @@ -3778,8 +3851,9 @@ void dlm_receive_buffer(struct dlm_header *hd, int nodeid) ls = dlm_find_lockspace_global(hd->h_lockspace); if (!ls) { - log_print("invalid h_lockspace %x from %d cmd %d type %d", - hd->h_lockspace, nodeid, hd->h_cmd, type); + if (dlm_config.ci_log_debug) + log_print("invalid lockspace %x from %d cmd %d type %d", + hd->h_lockspace, nodeid, hd->h_cmd, type); if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) dlm_send_ls_not_ready(nodeid, rc); @@ -3806,6 +3880,7 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; ls->ls_stub_ms.m_result = -EINPROGRESS; ls->ls_stub_ms.m_flags = lkb->lkb_flags; + ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; _receive_convert_reply(lkb, &ls->ls_stub_ms); /* Same special case as in receive_rcom_lock_args() */ @@ -3847,6 +3922,7 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb) void dlm_recover_waiters_pre(struct dlm_ls *ls) { struct dlm_lkb *lkb, *safe; + int wait_type, stub_unlock_result, stub_cancel_result; mutex_lock(&ls->ls_waiters_mutex); @@ -3865,7 +3941,33 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) if (!waiter_needs_recovery(ls, lkb)) continue; - switch (lkb->lkb_wait_type) { + wait_type = lkb->lkb_wait_type; + stub_unlock_result = -DLM_EUNLOCK; + stub_cancel_result = -DLM_ECANCEL; + + /* Main reply may have been received leaving a zero wait_type, + but a reply for the overlapping op may not have been + received. In that case we need to fake the appropriate + reply for the overlap op. */ + + if (!wait_type) { + if (is_overlap_cancel(lkb)) { + wait_type = DLM_MSG_CANCEL; + if (lkb->lkb_grmode == DLM_LOCK_IV) + stub_cancel_result = 0; + } + if (is_overlap_unlock(lkb)) { + wait_type = DLM_MSG_UNLOCK; + if (lkb->lkb_grmode == DLM_LOCK_IV) + stub_unlock_result = -ENOENT; + } + + log_debug(ls, "rwpre overlap %x %x %d %d %d", + lkb->lkb_id, lkb->lkb_flags, wait_type, + stub_cancel_result, stub_unlock_result); + } + + switch (wait_type) { case DLM_MSG_REQUEST: lkb->lkb_flags |= DLM_IFL_RESEND; @@ -3878,8 +3980,9 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) case DLM_MSG_UNLOCK: hold_lkb(lkb); ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY; - ls->ls_stub_ms.m_result = -DLM_EUNLOCK; + ls->ls_stub_ms.m_result = stub_unlock_result; ls->ls_stub_ms.m_flags = lkb->lkb_flags; + ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; _receive_unlock_reply(lkb, &ls->ls_stub_ms); dlm_put_lkb(lkb); break; @@ -3887,15 +3990,16 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) case DLM_MSG_CANCEL: hold_lkb(lkb); ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY; - ls->ls_stub_ms.m_result = -DLM_ECANCEL; + ls->ls_stub_ms.m_result = stub_cancel_result; ls->ls_stub_ms.m_flags = lkb->lkb_flags; + ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; _receive_cancel_reply(lkb, &ls->ls_stub_ms); dlm_put_lkb(lkb); break; default: - log_error(ls, "invalid lkb wait_type %d", - lkb->lkb_wait_type); + log_error(ls, "invalid lkb wait_type %d %d", + lkb->lkb_wait_type, wait_type); } schedule(); } @@ -4184,7 +4288,7 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP); if (lkb->lkb_exflags & DLM_LKF_VALBLK) { - lkb->lkb_lvbptr = allocate_lvb(ls); + lkb->lkb_lvbptr = dlm_allocate_lvb(ls); if (!lkb->lkb_lvbptr) return -ENOMEM; lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) - @@ -4259,7 +4363,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) put_rsb(r); out: if (error) - log_print("recover_master_copy %d %x", error, rl->rl_lkid); + log_debug(ls, "recover_master_copy %d %x", error, rl->rl_lkid); rl->rl_result = error; return error; } @@ -4342,7 +4446,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, } } - /* After ua is attached to lkb it will be freed by free_lkb(). + /* After ua is attached to lkb it will be freed by dlm_free_lkb(). When DLM_IFL_USER is set, the dlm knows that this is a userspace lock and that lkb_astparam is the dlm_user_args structure. */ @@ -4679,6 +4783,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) } list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) { + lkb->lkb_ast_type = 0; list_del(&lkb->lkb_astqueue); dlm_put_lkb(lkb); } |