Login
[x]
Log in using an account from:
Fedora Account System
Red Hat Associate
Red Hat Customer
Or login using a Red Hat Bugzilla account
Forgot Password
Login:
Hide Forgot
Create an Account
Red Hat Bugzilla – Attachment 153032 Details for
Bug 216113
dlm: add_to_waiters error 1
[?]
New
Simple Search
Advanced Search
My Links
Browse
Requests
Reports
Current State
Search
Tabular reports
Graphical reports
Duplicates
Other Reports
User Changes
Plotly Reports
Bug Status
Bug Severity
Non-Defaults
|
Product Dashboard
Help
Page Help!
Bug Writing Guidelines
What's new
Browser Support Policy
5.0.4.rh83 Release notes
FAQ
Guides index
User guide
Web Services
Contact
Legal
This site requires JavaScript to be enabled to function correctly, please enable it.
patch posted to rhkernel on 04/19
dlm-overlap.patch (text/plain), 40.28 KB, created by
David Teigland
on 2007-04-19 17:31:58 UTC
(
hide
)
Description:
patch posted to rhkernel on 04/19
Filename:
MIME Type:
Creator:
David Teigland
Created:
2007-04-19 17:31:58 UTC
Size:
40.28 KB
patch
obsolete
>[PATCH] dlm: overlapping cancel and unlock > >bz 216113 > >Full cancel and force-unlock support. In the past, cancel and force-unlock >wouldn't work if there was another operation in progress on the lock. Now, >both cancel and unlock-force can overlap an operation on a lock, meaning there >may be 2 or 3 operations in progress on a lock in parallel. This support is >important not only because cancel and force-unlock are explicit operations >that an app can use, but both are used implicitly when a process exits while >holding locks. > >Summary of changes: > >- add-to and remove-from waiters functions were rewritten to handle situations > with more than one remote operation outstanding on a lock > >- validate_unlock_args detects when an overlapping cancel/unlock-force > can be sent and when it needs to be delayed until a request/lookup > reply is received > >- processing request/lookup replies detects when cancel/unlock-force > occured during the op, and carries out the delayed cancel/unlock-force > >- manipulation of the "waiters" (remote operation) state of a lock moved under > the standard rsb mutex that protects all the other lock state > >- the two recovery routines related to locks on the waiters list changed > according to the way lkb's are now locked before accessing waiters state > >- waiters recovery detects when lkb's being recovered have overlapping > cancel/unlock-force, and may not recover such locks > >- revert_lock (cancel) returns a value to distinguish cases where it did > nothing vs cases where it actually did a cancel; the cancel completion ast > should only be done when cancel did something > >- orphaned locks put on new list so they can be found later for purging > >- cancel must be called on a lock when making it an orphan > >- flag user locks (ENDOFLIFE) at the end of their useful life (to the > application) so we can return an error for any further cancel/unlock-force > >- we weren't setting COMP/BAST ast flags if one was already set, so we'd lose > either a completion or blocking ast > >- clear an unread bast on a lock that's become unlocked > >Index: linux-rhel51-quilt/fs/dlm/lock.c >=================================================================== >--- linux-rhel51-quilt.orig/fs/dlm/lock.c 2007-04-19 10:39:35.000000000 -0500 >+++ linux-rhel51-quilt/fs/dlm/lock.c 2007-04-19 10:41:56.000000000 -0500 >@@ -1,7 +1,7 @@ > /****************************************************************************** > ******************************************************************************* > ** >-** Copyright (C) 2005 Red Hat, Inc. All rights reserved. >+** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved. > ** > ** This copyrighted material is made available to anyone wishing to use, > ** modify, copy, or redistribute it subject to the terms and conditions >@@ -254,6 +254,22 @@ > return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode); > } > >+static inline int is_overlap_unlock(struct dlm_lkb *lkb) >+{ >+ return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK; >+} >+ >+static inline int is_overlap_cancel(struct dlm_lkb *lkb) >+{ >+ return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL; >+} >+ >+static inline int is_overlap(struct dlm_lkb *lkb) >+{ >+ return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK | >+ DLM_IFL_OVERLAP_CANCEL)); >+} >+ > static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) > { > if (is_master_copy(lkb)) >@@ -267,6 +283,12 @@ > dlm_add_ast(lkb, AST_COMP); > } > >+static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) >+{ >+ queue_cast(r, lkb, >+ is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL); >+} >+ > static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) > { > if (is_master_copy(lkb)) >@@ -547,6 +569,7 @@ > lkb->lkb_grmode = DLM_LOCK_IV; > kref_init(&lkb->lkb_ref); > INIT_LIST_HEAD(&lkb->lkb_ownqueue); >+ INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); > > get_random_bytes(&bucket, sizeof(bucket)); > bucket &= (ls->ls_lkbtbl_size - 1); >@@ -735,23 +758,75 @@ > unhold_lkb(lkb); > } > >+static int msg_reply_type(int mstype) >+{ >+ switch (mstype) { >+ case DLM_MSG_REQUEST: >+ return DLM_MSG_REQUEST_REPLY; >+ case DLM_MSG_CONVERT: >+ return DLM_MSG_CONVERT_REPLY; >+ case DLM_MSG_UNLOCK: >+ return DLM_MSG_UNLOCK_REPLY; >+ case DLM_MSG_CANCEL: >+ return DLM_MSG_CANCEL_REPLY; >+ case DLM_MSG_LOOKUP: >+ return DLM_MSG_LOOKUP_REPLY; >+ } >+ return -1; >+} >+ > /* add/remove lkb from global waiters list of lkb's waiting for > a reply from a remote node */ > >-static void add_to_waiters(struct dlm_lkb *lkb, int mstype) >+static int add_to_waiters(struct dlm_lkb *lkb, int mstype) > { > struct dlm_ls *ls = lkb->lkb_resource->res_ls; >+ int error = 0; > > mutex_lock(&ls->ls_waiters_mutex); >- if (lkb->lkb_wait_type) { >- log_print("add_to_waiters error %d", lkb->lkb_wait_type); >+ >+ if (is_overlap_unlock(lkb) || >+ (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) { >+ error = -EINVAL; >+ goto out; >+ } >+ >+ if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) { >+ switch (mstype) { >+ case DLM_MSG_UNLOCK: >+ lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; >+ break; >+ case DLM_MSG_CANCEL: >+ lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; >+ break; >+ default: >+ error = -EBUSY; >+ goto out; >+ } >+ lkb->lkb_wait_count++; >+ hold_lkb(lkb); >+ >+ log_debug(ls, "add overlap %x cur %d new %d count %d flags %x", >+ lkb->lkb_id, lkb->lkb_wait_type, mstype, >+ lkb->lkb_wait_count, lkb->lkb_flags); > goto out; > } >+ >+ DLM_ASSERT(!lkb->lkb_wait_count, >+ dlm_print_lkb(lkb); >+ printk("wait_count %d\n", lkb->lkb_wait_count);); >+ >+ lkb->lkb_wait_count++; > lkb->lkb_wait_type = mstype; >- kref_get(&lkb->lkb_ref); >+ hold_lkb(lkb); > list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); > out: >+ if (error) >+ log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s", >+ lkb->lkb_id, error, lkb->lkb_flags, mstype, >+ lkb->lkb_wait_type, lkb->lkb_resource->res_name); > mutex_unlock(&ls->ls_waiters_mutex); >+ return error; > } > > /* We clear the RESEND flag because we might be taking an lkb off the waiters >@@ -759,34 +834,85 @@ > request reply on the requestqueue) between dlm_recover_waiters_pre() which > set RESEND and dlm_recover_waiters_post() */ > >-static int _remove_from_waiters(struct dlm_lkb *lkb) >+static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype) > { >- int error = 0; >+ struct dlm_ls *ls = lkb->lkb_resource->res_ls; >+ int overlap_done = 0; > >- if (!lkb->lkb_wait_type) { >- log_print("remove_from_waiters error"); >- error = -EINVAL; >- goto out; >+ if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) { >+ lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; >+ overlap_done = 1; >+ goto out_del; >+ } >+ >+ if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) { >+ lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; >+ overlap_done = 1; >+ goto out_del; >+ } >+ >+ /* N.B. type of reply may not always correspond to type of original >+ msg due to lookup->request optimization, verify others? */ >+ >+ if (lkb->lkb_wait_type) { >+ lkb->lkb_wait_type = 0; >+ goto out_del; >+ } >+ >+ log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d", >+ lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type); >+ return -1; >+ >+ out_del: >+ /* the force-unlock/cancel has completed and we haven't recvd a reply >+ to the op that was in progress prior to the unlock/cancel; we >+ give up on any reply to the earlier op. FIXME: not sure when/how >+ this would happen */ >+ >+ if (overlap_done && lkb->lkb_wait_type) { >+ log_error(ls, "remove_from_waiters %x reply %d give up on %d", >+ lkb->lkb_id, mstype, lkb->lkb_wait_type); >+ lkb->lkb_wait_count--; >+ lkb->lkb_wait_type = 0; > } >- lkb->lkb_wait_type = 0; >+ >+ DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb);); >+ > lkb->lkb_flags &= ~DLM_IFL_RESEND; >- list_del(&lkb->lkb_wait_reply); >+ lkb->lkb_wait_count--; >+ if (!lkb->lkb_wait_count) >+ list_del_init(&lkb->lkb_wait_reply); > unhold_lkb(lkb); >- out: >- return error; >+ return 0; > } > >-static int remove_from_waiters(struct dlm_lkb *lkb) >+static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) > { > struct dlm_ls *ls = lkb->lkb_resource->res_ls; > int error; > > mutex_lock(&ls->ls_waiters_mutex); >- error = _remove_from_waiters(lkb); >+ error = _remove_from_waiters(lkb, mstype); > mutex_unlock(&ls->ls_waiters_mutex); > return error; > } > >+/* Handles situations where we might be processing a "fake" or "stub" reply in >+ which we can't try to take waiters_mutex again. */ >+ >+static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) >+{ >+ struct dlm_ls *ls = lkb->lkb_resource->res_ls; >+ int error; >+ >+ if (ms != &ls->ls_stub_ms) >+ mutex_lock(&ls->ls_waiters_mutex); >+ error = _remove_from_waiters(lkb, ms->m_type); >+ if (ms != &ls->ls_stub_ms) >+ mutex_unlock(&ls->ls_waiters_mutex); >+ return error; >+} >+ > static void dir_remove(struct dlm_rsb *r) > { > int to_nodeid; >@@ -988,8 +1114,14 @@ > _remove_lock(r, lkb); > } > >-static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) >+/* returns: 0 did nothing >+ 1 moved lock to granted >+ -1 removed lock */ >+ >+static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) > { >+ int rv = 0; >+ > lkb->lkb_rqmode = DLM_LOCK_IV; > > switch (lkb->lkb_status) { >@@ -997,6 +1129,7 @@ > break; > case DLM_LKSTS_CONVERT: > move_lkb(r, lkb, DLM_LKSTS_GRANTED); >+ rv = 1; > break; > case DLM_LKSTS_WAITING: > del_lkb(r, lkb); >@@ -1004,15 +1137,17 @@ > /* this unhold undoes the original ref from create_lkb() > so this leads to the lkb being freed */ > unhold_lkb(lkb); >+ rv = -1; > break; > default: > log_print("invalid status for revert %d", lkb->lkb_status); > } >+ return rv; > } > >-static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) >+static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) > { >- revert_lock(r, lkb); >+ return revert_lock(r, lkb); > } > > static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) >@@ -1499,7 +1634,7 @@ > struct dlm_lkb *lkb, *safe; > > list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) { >- list_del(&lkb->lkb_rsb_lookup); >+ list_del_init(&lkb->lkb_rsb_lookup); > _request_lock(r, lkb); > schedule(); > } >@@ -1530,7 +1665,7 @@ > if (!list_empty(&r->res_lookup)) { > lkb = list_entry(r->res_lookup.next, struct dlm_lkb, > lkb_rsb_lookup); >- list_del(&lkb->lkb_rsb_lookup); >+ list_del_init(&lkb->lkb_rsb_lookup); > r->res_first_lkid = lkb->lkb_id; > _request_lock(r, lkb); > } else >@@ -1614,6 +1749,9 @@ > DLM_LKF_FORCEUNLOCK)) > return -EINVAL; > >+ if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK) >+ return -EINVAL; >+ > args->flags = flags; > args->astparam = (long) astarg; > return 0; >@@ -1638,6 +1776,9 @@ > > if (lkb->lkb_wait_type) > goto out; >+ >+ if (is_overlap(lkb)) >+ goto out; > } > > lkb->lkb_exflags = args->flags; >@@ -1654,35 +1795,126 @@ > return rv; > } > >+/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0 >+ for success */ >+ >+/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here >+ because there may be a lookup in progress and it's valid to do >+ cancel/unlockf on it */ >+ > static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) > { >+ struct dlm_ls *ls = lkb->lkb_resource->res_ls; > int rv = -EINVAL; > >- if (lkb->lkb_flags & DLM_IFL_MSTCPY) >+ if (lkb->lkb_flags & DLM_IFL_MSTCPY) { >+ log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id); >+ dlm_print_lkb(lkb); > goto out; >+ } >+ >+ /* an lkb may still exist even though the lock is EOL'ed due to a >+ cancel, unlock or failed noqueue request; an app can't use these >+ locks; return same error as if the lkid had not been found at all */ >+ >+ if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) { >+ log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id); >+ rv = -ENOENT; >+ goto out; >+ } >+ >+ /* an lkb may be waiting for an rsb lookup to complete where the >+ lookup was initiated by another lock */ >+ >+ if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { >+ if (!list_empty(&lkb->lkb_rsb_lookup)) { >+ log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id); >+ list_del_init(&lkb->lkb_rsb_lookup); >+ queue_cast(lkb->lkb_resource, lkb, >+ args->flags & DLM_LKF_CANCEL ? >+ -DLM_ECANCEL : -DLM_EUNLOCK); >+ unhold_lkb(lkb); /* undoes create_lkb() */ >+ rv = -EBUSY; >+ goto out; >+ } >+ } >+ >+ /* cancel not allowed with another cancel/unlock in progress */ >+ >+ if (args->flags & DLM_LKF_CANCEL) { >+ if (lkb->lkb_exflags & DLM_LKF_CANCEL) >+ goto out; > >- if (args->flags & DLM_LKF_FORCEUNLOCK) >+ if (is_overlap(lkb)) >+ goto out; >+ >+ if (lkb->lkb_flags & DLM_IFL_RESEND) { >+ lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; >+ rv = -EBUSY; >+ goto out; >+ } >+ >+ switch (lkb->lkb_wait_type) { >+ case DLM_MSG_LOOKUP: >+ case DLM_MSG_REQUEST: >+ lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; >+ rv = -EBUSY; >+ goto out; >+ case DLM_MSG_UNLOCK: >+ case DLM_MSG_CANCEL: >+ goto out; >+ } >+ /* add_to_waiters() will set OVERLAP_CANCEL */ > goto out_ok; >+ } > >- if (args->flags & DLM_LKF_CANCEL && >- lkb->lkb_status == DLM_LKSTS_GRANTED) >- goto out; >+ /* do we need to allow a force-unlock if there's a normal unlock >+ already in progress? in what conditions could the normal unlock >+ fail such that we'd want to send a force-unlock to be sure? */ > >- if (!(args->flags & DLM_LKF_CANCEL) && >- lkb->lkb_status != DLM_LKSTS_GRANTED) >- goto out; >+ if (args->flags & DLM_LKF_FORCEUNLOCK) { >+ if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK) >+ goto out; >+ >+ if (is_overlap_unlock(lkb)) >+ goto out; >+ >+ if (lkb->lkb_flags & DLM_IFL_RESEND) { >+ lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; >+ rv = -EBUSY; >+ goto out; >+ } > >+ switch (lkb->lkb_wait_type) { >+ case DLM_MSG_LOOKUP: >+ case DLM_MSG_REQUEST: >+ lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; >+ rv = -EBUSY; >+ goto out; >+ case DLM_MSG_UNLOCK: >+ goto out; >+ } >+ /* add_to_waiters() will set OVERLAP_UNLOCK */ >+ goto out_ok; >+ } >+ >+ /* normal unlock not allowed if there's any op in progress */ > rv = -EBUSY; >- if (lkb->lkb_wait_type) >+ if (lkb->lkb_wait_type || lkb->lkb_wait_count) > goto out; > > out_ok: >- lkb->lkb_exflags = args->flags; >+ /* an overlapping op shouldn't blow away exflags from other op */ >+ lkb->lkb_exflags |= args->flags; > lkb->lkb_sbflags = 0; > lkb->lkb_astparam = args->astparam; >- > rv = 0; > out: >+ if (rv) >+ log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv, >+ lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags, >+ args->flags, lkb->lkb_wait_type, >+ lkb->lkb_resource->res_name); > return rv; > } > >@@ -1759,17 +1991,19 @@ > return -DLM_EUNLOCK; > } > >-/* FIXME: if revert_lock() finds that the lkb is granted, we should >- skip the queue_cast(ECANCEL). It indicates that the request/convert >- completed (and queued a normal ast) just before the cancel; we don't >- want to clobber the sb_result for the normal ast with ECANCEL. */ >+/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ > > static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) > { >- revert_lock(r, lkb); >- queue_cast(r, lkb, -DLM_ECANCEL); >- grant_pending_locks(r); >- return -DLM_ECANCEL; >+ int error; >+ >+ error = revert_lock(r, lkb); >+ if (error) { >+ queue_cast(r, lkb, -DLM_ECANCEL); >+ grant_pending_locks(r); >+ return -DLM_ECANCEL; >+ } >+ return 0; > } > > /* >@@ -2035,6 +2269,8 @@ > > if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL) > error = 0; >+ if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK))) >+ error = 0; > out_put: > dlm_put_lkb(lkb); > out: >@@ -2176,7 +2412,9 @@ > struct dlm_mhandle *mh; > int to_nodeid, error; > >- add_to_waiters(lkb, mstype); >+ error = add_to_waiters(lkb, mstype); >+ if (error) >+ return error; > > to_nodeid = r->res_nodeid; > >@@ -2192,7 +2430,7 @@ > return 0; > > fail: >- remove_from_waiters(lkb); >+ remove_from_waiters(lkb, msg_reply_type(mstype)); > return error; > } > >@@ -2209,7 +2447,8 @@ > > /* down conversions go without a reply from the master */ > if (!error && down_conversion(lkb)) { >- remove_from_waiters(lkb); >+ remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY); >+ r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; > r->res_ls->ls_stub_ms.m_result = 0; > r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags; > __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); >@@ -2280,7 +2519,9 @@ > struct dlm_mhandle *mh; > int to_nodeid, error; > >- add_to_waiters(lkb, DLM_MSG_LOOKUP); >+ error = add_to_waiters(lkb, DLM_MSG_LOOKUP); >+ if (error) >+ return error; > > to_nodeid = dlm_dir_nodeid(r); > >@@ -2296,7 +2537,7 @@ > return 0; > > fail: >- remove_from_waiters(lkb); >+ remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); > return error; > } > >@@ -2740,7 +2981,7 @@ > { > struct dlm_lkb *lkb; > struct dlm_rsb *r; >- int error, mstype; >+ int error, mstype, result; > > error = find_lkb(ls, ms->m_remid, &lkb); > if (error) { >@@ -2749,20 +2990,15 @@ > } > DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); > >- mstype = lkb->lkb_wait_type; >- error = remove_from_waiters(lkb); >- if (error) { >- log_error(ls, "receive_request_reply not on waiters"); >- goto out; >- } >- >- /* this is the value returned from do_request() on the master */ >- error = ms->m_result; >- > r = lkb->lkb_resource; > hold_rsb(r); > lock_rsb(r); > >+ mstype = lkb->lkb_wait_type; >+ error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); >+ if (error) >+ goto out; >+ > /* Optimization: the dir node was also the master, so it took our > lookup as a request and sent request reply instead of lookup reply */ > if (mstype == DLM_MSG_LOOKUP) { >@@ -2770,14 +3006,15 @@ > lkb->lkb_nodeid = r->res_nodeid; > } > >- switch (error) { >+ /* this is the value returned from do_request() on the master */ >+ result = ms->m_result; >+ >+ switch (result) { > case -EAGAIN: >- /* request would block (be queued) on remote master; >- the unhold undoes the original ref from create_lkb() >- so it leads to the lkb being freed */ >+ /* request would block (be queued) on remote master */ > queue_cast(r, lkb, -EAGAIN); > confirm_master(r, -EAGAIN); >- unhold_lkb(lkb); >+ unhold_lkb(lkb); /* undoes create_lkb() */ > break; > > case -EINPROGRESS: >@@ -2785,41 +3022,62 @@ > /* request was queued or granted on remote master */ > receive_flags_reply(lkb, ms); > lkb->lkb_remid = ms->m_lkid; >- if (error) >+ if (result) > add_lkb(r, lkb, DLM_LKSTS_WAITING); > else { > grant_lock_pc(r, lkb, ms); > queue_cast(r, lkb, 0); > } >- confirm_master(r, error); >+ confirm_master(r, result); > break; > > case -EBADR: > case -ENOTBLK: > /* find_rsb failed to find rsb or rsb wasn't master */ >+ log_debug(ls, "receive_request_reply %x %x master diff %d %d", >+ lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result); > r->res_nodeid = -1; > lkb->lkb_nodeid = -1; >- _request_lock(r, lkb); >+ >+ if (is_overlap(lkb)) { >+ /* we'll ignore error in cancel/unlock reply */ >+ queue_cast_overlap(r, lkb); >+ unhold_lkb(lkb); /* undoes create_lkb() */ >+ } else >+ _request_lock(r, lkb); > break; > > default: >- log_error(ls, "receive_request_reply error %d", error); >+ log_error(ls, "receive_request_reply %x error %d", >+ lkb->lkb_id, result); > } > >+ if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) { >+ log_debug(ls, "receive_request_reply %x result %d unlock", >+ lkb->lkb_id, result); >+ lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; >+ lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; >+ send_unlock(r, lkb); >+ } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) { >+ log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id); >+ lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; >+ lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; >+ send_cancel(r, lkb); >+ } else { >+ lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; >+ lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; >+ } >+ out: > unlock_rsb(r); > put_rsb(r); >- out: > dlm_put_lkb(lkb); > } > > static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, > struct dlm_message *ms) > { >- int error = ms->m_result; >- > /* this is the value returned from do_convert() on the master */ >- >- switch (error) { >+ switch (ms->m_result) { > case -EAGAIN: > /* convert would block (be queued) on remote master */ > queue_cast(r, lkb, -EAGAIN); >@@ -2839,19 +3097,26 @@ > break; > > default: >- log_error(r->res_ls, "receive_convert_reply error %d", error); >+ log_error(r->res_ls, "receive_convert_reply %x error %d", >+ lkb->lkb_id, ms->m_result); > } > } > > static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) > { > struct dlm_rsb *r = lkb->lkb_resource; >+ int error; > > hold_rsb(r); > lock_rsb(r); > >- __receive_convert_reply(r, lkb, ms); >+ /* stub reply can happen with waiters_mutex held */ >+ error = remove_from_waiters_ms(lkb, ms); >+ if (error) >+ goto out; > >+ __receive_convert_reply(r, lkb, ms); >+ out: > unlock_rsb(r); > put_rsb(r); > } >@@ -2868,37 +3133,38 @@ > } > DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); > >- error = remove_from_waiters(lkb); >- if (error) { >- log_error(ls, "receive_convert_reply not on waiters"); >- goto out; >- } >- > _receive_convert_reply(lkb, ms); >- out: > dlm_put_lkb(lkb); > } > > static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) > { > struct dlm_rsb *r = lkb->lkb_resource; >- int error = ms->m_result; >+ int error; > > hold_rsb(r); > lock_rsb(r); > >+ /* stub reply can happen with waiters_mutex held */ >+ error = remove_from_waiters_ms(lkb, ms); >+ if (error) >+ goto out; >+ > /* this is the value returned from do_unlock() on the master */ > >- switch (error) { >+ switch (ms->m_result) { > case -DLM_EUNLOCK: > receive_flags_reply(lkb, ms); > remove_lock_pc(r, lkb); > queue_cast(r, lkb, -DLM_EUNLOCK); > break; >+ case -ENOENT: >+ break; > default: >- log_error(r->res_ls, "receive_unlock_reply error %d", error); >+ log_error(r->res_ls, "receive_unlock_reply %x error %d", >+ lkb->lkb_id, ms->m_result); > } >- >+ out: > unlock_rsb(r); > put_rsb(r); > } >@@ -2915,37 +3181,39 @@ > } > DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); > >- error = remove_from_waiters(lkb); >- if (error) { >- log_error(ls, "receive_unlock_reply not on waiters"); >- goto out; >- } >- > _receive_unlock_reply(lkb, ms); >- out: > dlm_put_lkb(lkb); > } > > static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) > { > struct dlm_rsb *r = lkb->lkb_resource; >- int error = ms->m_result; >+ int error; > > hold_rsb(r); > lock_rsb(r); > >+ /* stub reply can happen with waiters_mutex held */ >+ error = remove_from_waiters_ms(lkb, ms); >+ if (error) >+ goto out; >+ > /* this is the value returned from do_cancel() on the master */ > >- switch (error) { >+ switch (ms->m_result) { > case -DLM_ECANCEL: > receive_flags_reply(lkb, ms); > revert_lock_pc(r, lkb); >- queue_cast(r, lkb, -DLM_ECANCEL); >+ if (ms->m_result) >+ queue_cast(r, lkb, -DLM_ECANCEL); >+ break; >+ case 0: > break; > default: >- log_error(r->res_ls, "receive_cancel_reply error %d", error); >+ log_error(r->res_ls, "receive_cancel_reply %x error %d", >+ lkb->lkb_id, ms->m_result); > } >- >+ out: > unlock_rsb(r); > put_rsb(r); > } >@@ -2962,14 +3230,7 @@ > } > DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); > >- error = remove_from_waiters(lkb); >- if (error) { >- log_error(ls, "receive_cancel_reply not on waiters"); >- goto out; >- } >- > _receive_cancel_reply(lkb, ms); >- out: > dlm_put_lkb(lkb); > } > >@@ -2985,20 +3246,17 @@ > return; > } > >- error = remove_from_waiters(lkb); >- if (error) { >- log_error(ls, "receive_lookup_reply not on waiters"); >- goto out; >- } >- >- /* this is the value returned by dlm_dir_lookup on dir node >+ /* ms->m_result is the value returned by dlm_dir_lookup on dir node > FIXME: will a non-zero error ever be returned? */ >- error = ms->m_result; > > r = lkb->lkb_resource; > hold_rsb(r); > lock_rsb(r); > >+ error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); >+ if (error) >+ goto out; >+ > ret_nodeid = ms->m_nodeid; > if (ret_nodeid == dlm_our_nodeid()) { > r->res_nodeid = 0; >@@ -3009,14 +3267,22 @@ > r->res_nodeid = ret_nodeid; > } > >+ if (is_overlap(lkb)) { >+ log_debug(ls, "receive_lookup_reply %x unlock %x", >+ lkb->lkb_id, lkb->lkb_flags); >+ queue_cast_overlap(r, lkb); >+ unhold_lkb(lkb); /* undoes create_lkb() */ >+ goto out_list; >+ } >+ > _request_lock(r, lkb); > >+ out_list: > if (!ret_nodeid) > process_lookup_list(r); >- >+ out: > unlock_rsb(r); > put_rsb(r); >- out: > dlm_put_lkb(lkb); > } > >@@ -3153,9 +3419,9 @@ > { > if (middle_conversion(lkb)) { > hold_lkb(lkb); >+ ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; > ls->ls_stub_ms.m_result = -EINPROGRESS; > ls->ls_stub_ms.m_flags = lkb->lkb_flags; >- _remove_from_waiters(lkb); > _receive_convert_reply(lkb, &ls->ls_stub_ms); > > /* Same special case as in receive_rcom_lock_args() */ >@@ -3227,18 +3493,18 @@ > > case DLM_MSG_UNLOCK: > hold_lkb(lkb); >+ ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY; > ls->ls_stub_ms.m_result = -DLM_EUNLOCK; > ls->ls_stub_ms.m_flags = lkb->lkb_flags; >- _remove_from_waiters(lkb); > _receive_unlock_reply(lkb, &ls->ls_stub_ms); > dlm_put_lkb(lkb); > break; > > case DLM_MSG_CANCEL: > hold_lkb(lkb); >+ ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY; > ls->ls_stub_ms.m_result = -DLM_ECANCEL; > ls->ls_stub_ms.m_flags = lkb->lkb_flags; >- _remove_from_waiters(lkb); > _receive_cancel_reply(lkb, &ls->ls_stub_ms); > dlm_put_lkb(lkb); > break; >@@ -3252,37 +3518,47 @@ > mutex_unlock(&ls->ls_waiters_mutex); > } > >-static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) >+static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) > { > struct dlm_lkb *lkb; >- int rv = 0; >+ int found = 0; > > mutex_lock(&ls->ls_waiters_mutex); > list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { > if (lkb->lkb_flags & DLM_IFL_RESEND) { >- rv = lkb->lkb_wait_type; >- _remove_from_waiters(lkb); >- lkb->lkb_flags &= ~DLM_IFL_RESEND; >+ hold_lkb(lkb); >+ found = 1; > break; > } > } > mutex_unlock(&ls->ls_waiters_mutex); > >- if (!rv) >+ if (!found) > lkb = NULL; >- *lkb_ret = lkb; >- return rv; >+ return lkb; > } > > /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the > master or dir-node for r. Processing the lkb may result in it being placed > back on waiters. */ > >+/* We do this after normal locking has been enabled and any saved messages >+ (in requestqueue) have been processed. We should be confident that at >+ this point we won't get or process a reply to any of these waiting >+ operations. But, new ops may be coming in on the rsbs/locks here from >+ userspace or remotely. */ >+ >+/* there may have been an overlap unlock/cancel prior to recovery or after >+ recovery. if before, the lkb may still have a pos wait_count; if after, the >+ overlap flag would just have been set and nothing new sent. we can be >+ confident here than any replies to either the initial op or overlap ops >+ prior to recovery have been received. */ >+ > int dlm_recover_waiters_post(struct dlm_ls *ls) > { > struct dlm_lkb *lkb; > struct dlm_rsb *r; >- int error = 0, mstype; >+ int error = 0, mstype, err, oc, ou; > > while (1) { > if (dlm_locking_stopped(ls)) { >@@ -3291,48 +3567,78 @@ > break; > } > >- mstype = remove_resend_waiter(ls, &lkb); >- if (!mstype) >+ lkb = find_resend_waiter(ls); >+ if (!lkb) > break; > > r = lkb->lkb_resource; >+ hold_rsb(r); >+ lock_rsb(r); >+ >+ mstype = lkb->lkb_wait_type; >+ oc = is_overlap_cancel(lkb); >+ ou = is_overlap_unlock(lkb); >+ err = 0; > > log_debug(ls, "recover_waiters_post %x type %d flags %x %s", > lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name); > >- switch (mstype) { >- >- case DLM_MSG_LOOKUP: >- hold_rsb(r); >- lock_rsb(r); >- _request_lock(r, lkb); >- if (is_master(r)) >- confirm_master(r, 0); >- unlock_rsb(r); >- put_rsb(r); >- break; >- >- case DLM_MSG_REQUEST: >- hold_rsb(r); >- lock_rsb(r); >- _request_lock(r, lkb); >- if (is_master(r)) >- confirm_master(r, 0); >- unlock_rsb(r); >- put_rsb(r); >- break; >- >- case DLM_MSG_CONVERT: >- hold_rsb(r); >- lock_rsb(r); >- _convert_lock(r, lkb); >- unlock_rsb(r); >- put_rsb(r); >- break; >- >- default: >- log_error(ls, "recover_waiters_post type %d", mstype); >+ /* At this point we assume that we won't get a reply to any >+ previous op or overlap op on this lock. First, do a big >+ remove_from_waiters() for all previous ops. */ >+ >+ lkb->lkb_flags &= ~DLM_IFL_RESEND; >+ lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; >+ lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; >+ lkb->lkb_wait_type = 0; >+ lkb->lkb_wait_count = 0; >+ mutex_lock(&ls->ls_waiters_mutex); >+ list_del_init(&lkb->lkb_wait_reply); >+ mutex_unlock(&ls->ls_waiters_mutex); >+ unhold_lkb(lkb); /* for waiters list */ >+ >+ if (oc || ou) { >+ /* do an unlock or cancel instead of resending */ >+ switch (mstype) { >+ case DLM_MSG_LOOKUP: >+ case DLM_MSG_REQUEST: >+ queue_cast(r, lkb, ou ? -DLM_EUNLOCK : >+ -DLM_ECANCEL); >+ unhold_lkb(lkb); /* undoes create_lkb() */ >+ break; >+ case DLM_MSG_CONVERT: >+ if (oc) { >+ queue_cast(r, lkb, -DLM_ECANCEL); >+ } else { >+ lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK; >+ _unlock_lock(r, lkb); >+ } >+ break; >+ default: >+ err = 1; >+ } >+ } else { >+ switch (mstype) { >+ case DLM_MSG_LOOKUP: >+ case DLM_MSG_REQUEST: >+ _request_lock(r, lkb); >+ if (is_master(r)) >+ confirm_master(r, 0); >+ break; >+ case DLM_MSG_CONVERT: >+ _convert_lock(r, lkb); >+ break; >+ default: >+ err = 1; >+ } > } >+ >+ if (err) >+ log_error(ls, "recover_waiters_post %x %d %x %d %d", >+ lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou); >+ unlock_rsb(r); >+ put_rsb(r); >+ dlm_put_lkb(lkb); > } > > return error; >@@ -3684,7 +3990,7 @@ > > /* add this new lkb to the per-process list of locks */ > spin_lock(&ua->proc->locks_spin); >- kref_get(&lkb->lkb_ref); >+ hold_lkb(lkb); > list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); > spin_unlock(&ua->proc->locks_spin); > out: >@@ -3774,6 +4080,9 @@ > > if (error == -DLM_EUNLOCK) > error = 0; >+ /* from validate_unlock_args() */ >+ if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK)) >+ error = 0; > if (error) > goto out_put; > >@@ -3786,6 +4095,7 @@ > dlm_put_lkb(lkb); > out: > unlock_recovery(ls); >+ kfree(ua_tmp); > return error; > } > >@@ -3815,33 +4125,37 @@ > > if (error == -DLM_ECANCEL) > error = 0; >- if (error) >- goto out_put; >- >- /* this lkb was removed from the WAITING queue */ >- if (lkb->lkb_grmode == DLM_LOCK_IV) { >- spin_lock(&ua->proc->locks_spin); >- list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); >- spin_unlock(&ua->proc->locks_spin); >- } >+ /* from validate_unlock_args() */ >+ if (error == -EBUSY) >+ error = 0; > out_put: > dlm_put_lkb(lkb); > out: > unlock_recovery(ls); >+ kfree(ua_tmp); > return error; > } > >+/* lkb's that are removed from the waiters list by revert are just left on the >+ orphans list with the granted orphan locks, to be freed by purge */ >+ > static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) > { > struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam; >+ struct dlm_args args; >+ int error; > >- if (ua->lksb.sb_lvbptr) >- kfree(ua->lksb.sb_lvbptr); >- kfree(ua); >- lkb->lkb_astparam = (long)NULL; >+ hold_lkb(lkb); >+ mutex_lock(&ls->ls_orphans_mutex); >+ list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans); >+ mutex_unlock(&ls->ls_orphans_mutex); > >- /* TODO: propogate to master if needed */ >- return 0; >+ set_unlock_args(0, ua, &args); >+ >+ error = cancel_lock(ls, lkb, &args); >+ if (error == -DLM_ECANCEL) >+ error = 0; >+ return error; > } > > /* The force flag allows the unlock to go ahead even if the lkb isn't granted. >@@ -3853,10 +4167,6 @@ > struct dlm_args args; > int error; > >- /* FIXME: we need to handle the case where the lkb is in limbo >- while the rsb is being looked up, currently we assert in >- _unlock_lock/is_remote because rsb nodeid is -1. */ >- > set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args); > > error = unlock_lock(ls, lkb, &args); >@@ -3865,6 +4175,31 @@ > return error; > } > >+/* We have to release clear_proc_locks mutex before calling unlock_proc_lock() >+ (which does lock_rsb) due to deadlock with receiving a message that does >+ lock_rsb followed by dlm_user_add_ast() */ >+ >+static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, >+ struct dlm_user_proc *proc) >+{ >+ struct dlm_lkb *lkb = NULL; >+ >+ mutex_lock(&ls->ls_clear_proc_locks); >+ if (list_empty(&proc->locks)) >+ goto out; >+ >+ lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue); >+ list_del_init(&lkb->lkb_ownqueue); >+ >+ if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) >+ lkb->lkb_flags |= DLM_IFL_ORPHAN; >+ else >+ lkb->lkb_flags |= DLM_IFL_DEAD; >+ out: >+ mutex_unlock(&ls->ls_clear_proc_locks); >+ return lkb; >+} >+ > /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which > 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts, > which we clear here. */ >@@ -3880,18 +4215,15 @@ > struct dlm_lkb *lkb, *safe; > > lock_recovery(ls); >- mutex_lock(&ls->ls_clear_proc_locks); > >- list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) { >- list_del_init(&lkb->lkb_ownqueue); >- >- if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) { >- lkb->lkb_flags |= DLM_IFL_ORPHAN; >+ while (1) { >+ lkb = del_proc_lock(ls, proc); >+ if (!lkb) >+ break; >+ if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) > orphan_proc_lock(ls, lkb); >- } else { >- lkb->lkb_flags |= DLM_IFL_DEAD; >+ else > unlock_proc_lock(ls, lkb); >- } > > /* this removes the reference for the proc->locks list > added by dlm_user_request, it may result in the lkb >@@ -3900,6 +4232,8 @@ > dlm_put_lkb(lkb); > } > >+ mutex_lock(&ls->ls_clear_proc_locks); >+ > /* in-progress unlocks */ > list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { > list_del_init(&lkb->lkb_ownqueue); >Index: linux-rhel51-quilt/fs/dlm/user.c >=================================================================== >--- linux-rhel51-quilt.orig/fs/dlm/user.c 2007-01-24 14:57:10.000000000 -0600 >+++ linux-rhel51-quilt/fs/dlm/user.c 2007-04-19 10:41:56.000000000 -0500 >@@ -1,5 +1,5 @@ > /* >- * Copyright (C) 2006 Red Hat, Inc. All rights reserved. >+ * Copyright (C) 2006-2007 Red Hat, Inc. All rights reserved. > * > * This copyrighted material is made available to anyone wishing to use, > * modify, copy, or redistribute it subject to the terms and conditions >@@ -127,35 +127,30 @@ > } > #endif > >+/* we could possibly check if the cancel of an orphan has resulted in the lkb >+ being removed and then remove that lkb from the orphans list and free it */ > > void dlm_user_add_ast(struct dlm_lkb *lkb, int type) > { > struct dlm_ls *ls; > struct dlm_user_args *ua; > struct dlm_user_proc *proc; >- int remove_ownqueue = 0; >+ int eol = 0, ast_type; > >- /* dlm_clear_proc_locks() sets ORPHAN/DEAD flag on each >- lkb before dealing with it. We need to check this >- flag before taking ls_clear_proc_locks mutex because if >- it's set, dlm_clear_proc_locks() holds the mutex. */ >- >- if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) { >- /* log_print("user_add_ast skip1 %x", lkb->lkb_flags); */ >+ if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) > return; >- } > > ls = lkb->lkb_resource->res_ls; > mutex_lock(&ls->ls_clear_proc_locks); > > /* If ORPHAN/DEAD flag is set, it means the process is dead so an ast > can't be delivered. For ORPHAN's, dlm_clear_proc_locks() freed >- lkb->ua so we can't try to use it. */ >+ lkb->ua so we can't try to use it. This second check is necessary >+ for cases where a completion ast is received for an operation that >+ began before clear_proc_locks did its cancel/unlock. */ > >- if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) { >- /* log_print("user_add_ast skip2 %x", lkb->lkb_flags); */ >+ if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) > goto out; >- } > > DLM_ASSERT(lkb->lkb_astparam, dlm_print_lkb(lkb);); > ua = (struct dlm_user_args *)lkb->lkb_astparam; >@@ -165,28 +160,42 @@ > goto out; > > spin_lock(&proc->asts_spin); >- if (!(lkb->lkb_ast_type & (AST_COMP | AST_BAST))) { >+ >+ ast_type = lkb->lkb_ast_type; >+ lkb->lkb_ast_type |= type; >+ >+ if (!ast_type) { > kref_get(&lkb->lkb_ref); > list_add_tail(&lkb->lkb_astqueue, &proc->asts); >- lkb->lkb_ast_type |= type; > wake_up_interruptible(&proc->wait); > } >- >- /* noqueue requests that fail may need to be removed from the >- proc's locks list, there should be a better way of detecting >- this situation than checking all these things... */ >- >- if (type == AST_COMP && lkb->lkb_grmode == DLM_LOCK_IV && >- ua->lksb.sb_status == -EAGAIN && !list_empty(&lkb->lkb_ownqueue)) >- remove_ownqueue = 1; >- >- /* unlocks or cancels of waiting requests need to be removed from the >- proc's unlocking list, again there must be a better way... */ >- >- if (ua->lksb.sb_status == -DLM_EUNLOCK || >+ if (type == AST_COMP && (ast_type & AST_COMP)) >+ log_debug(ls, "ast overlap %x status %x %x", >+ lkb->lkb_id, ua->lksb.sb_status, lkb->lkb_flags); >+ >+ /* Figure out if this lock is at the end of its life and no longer >+ available for the application to use. The lkb still exists until >+ the final ast is read. A lock becomes EOL in three situations: >+ 1. a noqueue request fails with EAGAIN >+ 2. an unlock completes with EUNLOCK >+ 3. a cancel of a waiting request completes with ECANCEL >+ An EOL lock needs to be removed from the process's list of locks. >+ And we can't allow any new operation on an EOL lock. This is >+ not related to the lifetime of the lkb struct which is managed >+ entirely by refcount. */ >+ >+ if (type == AST_COMP && >+ lkb->lkb_grmode == DLM_LOCK_IV && >+ ua->lksb.sb_status == -EAGAIN) >+ eol = 1; >+ else if (ua->lksb.sb_status == -DLM_EUNLOCK || > (ua->lksb.sb_status == -DLM_ECANCEL && > lkb->lkb_grmode == DLM_LOCK_IV)) >- remove_ownqueue = 1; >+ eol = 1; >+ if (eol) { >+ lkb->lkb_ast_type &= ~AST_BAST; >+ lkb->lkb_flags |= DLM_IFL_ENDOFLIFE; >+ } > > /* We want to copy the lvb to userspace when the completion > ast is read if the status is 0, the lock has an lvb and >@@ -203,11 +212,13 @@ > > spin_unlock(&proc->asts_spin); > >- if (remove_ownqueue) { >+ if (eol) { > spin_lock(&ua->proc->locks_spin); >- list_del_init(&lkb->lkb_ownqueue); >+ if (!list_empty(&lkb->lkb_ownqueue)) { >+ list_del_init(&lkb->lkb_ownqueue); >+ dlm_put_lkb(lkb); >+ } > spin_unlock(&ua->proc->locks_spin); >- dlm_put_lkb(lkb); > } > out: > mutex_unlock(&ls->ls_clear_proc_locks); >Index: linux-rhel51-quilt/fs/dlm/dlm_internal.h >=================================================================== >--- linux-rhel51-quilt.orig/fs/dlm/dlm_internal.h 2007-01-24 15:02:10.000000000 -0600 >+++ linux-rhel51-quilt/fs/dlm/dlm_internal.h 2007-04-19 10:41:56.000000000 -0500 >@@ -2,7 +2,7 @@ > ******************************************************************************* > ** > ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. >-** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. >+** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. > ** > ** This copyrighted material is made available to anyone wishing to use, > ** modify, copy, or redistribute it subject to the terms and conditions >@@ -210,6 +210,9 @@ > #define DLM_IFL_MSTCPY 0x00010000 > #define DLM_IFL_RESEND 0x00020000 > #define DLM_IFL_DEAD 0x00040000 >+#define DLM_IFL_OVERLAP_UNLOCK 0x00080000 >+#define DLM_IFL_OVERLAP_CANCEL 0x00100000 >+#define DLM_IFL_ENDOFLIFE 0x00200000 > #define DLM_IFL_USER 0x00000001 > #define DLM_IFL_ORPHAN 0x00000002 > >@@ -230,8 +233,8 @@ > int8_t lkb_grmode; /* granted lock mode */ > int8_t lkb_bastmode; /* requested mode */ > int8_t lkb_highbast; /* highest mode bast sent for */ >- > int8_t lkb_wait_type; /* type of reply waiting for */ >+ int8_t lkb_wait_count; > int8_t lkb_ast_type; /* type of ast queued for */ > > struct list_head lkb_idtbl_list; /* lockspace lkbtbl */ >@@ -440,6 +443,9 @@ > struct mutex ls_waiters_mutex; > struct list_head ls_waiters; /* lkbs needing a reply */ > >+ struct mutex ls_orphans_mutex; >+ struct list_head ls_orphans; >+ > struct list_head ls_nodes; /* current nodes in ls */ > struct list_head ls_nodes_gone; /* dead node list, recovery */ > int ls_num_nodes; /* number of nodes in ls */ >Index: linux-rhel51-quilt/fs/dlm/lockspace.c >=================================================================== >--- linux-rhel51-quilt.orig/fs/dlm/lockspace.c 2007-01-24 14:58:29.000000000 -0600 >+++ linux-rhel51-quilt/fs/dlm/lockspace.c 2007-04-19 10:41:56.000000000 -0500 >@@ -2,7 +2,7 @@ > ******************************************************************************* > ** > ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. >-** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. >+** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. > ** > ** This copyrighted material is made available to anyone wishing to use, > ** modify, copy, or redistribute it subject to the terms and conditions >@@ -454,6 +454,8 @@ > > INIT_LIST_HEAD(&ls->ls_waiters); > mutex_init(&ls->ls_waiters_mutex); >+ INIT_LIST_HEAD(&ls->ls_orphans); >+ mutex_init(&ls->ls_orphans_mutex); > > INIT_LIST_HEAD(&ls->ls_nodes); > INIT_LIST_HEAD(&ls->ls_nodes_gone);
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Raw
Actions:
View
Attachments on
bug 216113
:
153030
| 153032