Sophie

Sophie

distrib > Scientific%20Linux > 5x > x86_64 > by-pkgid > 27922b4260f65d317aabda37e42bbbff > files > 1334

kernel-2.6.18-238.el5.src.rpm

From: Bob Peterson <rpeterso@redhat.com>
Date: Mon, 7 Jul 2008 09:58:21 -0500
Subject: [gfs2] lock_dlm: deliver callbacks in the right order
Message-id: 1215442701.2891.20.camel@technetium.msp.redhat.com
O-Subject: [RHEL5.3 Patch][GFS2] 447748: GFS2: lock_dlm is not always delivering callbacks in the right order
Bugzilla: 447748
RH-Acked-by: Steven Whitehouse <swhiteho@redhat.com>

Hi,

This patch fixes various GFS2 hangs and inter-node data corruption due
to two things:  (1) A race condition with pending lock demote requests.
(2) locks grants between nodes being processed in the wrong order.
The fix for the race condition (#1) is in glock.c.  The fix for the
lock ordering is basically to make the lock_dlm interface (to dlm) module
single-threaded so the lock replies are processed in the correct order.

These patches are in the -nmw git tree upstream and have been tested
on the "smoke" cluster with multiple runs of both the "dd_io" and
"coherency" tests that caught the problems.

Regards,

Bob Peterson
Red Hat GFS

Signed-off-by: Bob Peterson <rpeterso@redhat.com>
--

diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c
index 6ffecc9..7ceadee 100644
--- a/fs/gfs2/glock.c
+++ b/fs/gfs2/glock.c
@@ -615,6 +615,7 @@ static void run_queue(struct gfs2_glock *gl, const int nonblock)
 		if (nonblock)
 			goto out_sched;
 		set_bit(GLF_DEMOTE_IN_PROGRESS, &gl->gl_flags);
+		GLOCK_BUG_ON(gl, gl->gl_demote_state == LM_ST_EXCLUSIVE);
 		gl->gl_target = gl->gl_demote_state;
 	} else {
 		if (test_bit(GLF_DEMOTE, &gl->gl_flags))
@@ -645,7 +646,9 @@ static void glock_work_func(void *data)
 	if (test_and_clear_bit(GLF_REPLY_PENDING, &gl->gl_flags))
 		finish_xmote(gl, gl->gl_reply);
 	spin_lock(&gl->gl_spin);
-	if (test_and_clear_bit(GLF_PENDING_DEMOTE, &gl->gl_flags)) {
+	if (test_and_clear_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) &&
+	    gl->gl_state != LM_ST_UNLOCKED &&
+	    gl->gl_demote_state != LM_ST_EXCLUSIVE) {
 		unsigned long holdtime, now = jiffies;
 		holdtime = gl->gl_tchange + gl->gl_ops->go_min_hold_time;
 		if (time_before(now, holdtime))
diff --git a/fs/gfs2/locking/dlm/lock.c b/fs/gfs2/locking/dlm/lock.c
index 6474da0..6069e2b 100644
--- a/fs/gfs2/locking/dlm/lock.c
+++ b/fs/gfs2/locking/dlm/lock.c
@@ -11,46 +11,63 @@
 
 static char junk_lvb[GDLM_LVB_SIZE];
 
-static void queue_complete(struct gdlm_lock *lp)
+
+/* convert dlm lock-mode to gfs lock-state */
+
+static s16 gdlm_make_lmstate(s16 dlmmode)
 {
-	struct gdlm_ls *ls = lp->ls;
+	switch (dlmmode) {
+	case DLM_LOCK_IV:
+	case DLM_LOCK_NL:
+		return LM_ST_UNLOCKED;
+	case DLM_LOCK_EX:
+		return LM_ST_EXCLUSIVE;
+	case DLM_LOCK_CW:
+		return LM_ST_DEFERRED;
+	case DLM_LOCK_PR:
+		return LM_ST_SHARED;
+	}
+	gdlm_assert(0, "unknown DLM mode %d", dlmmode);
+	return -1;
+}
 
-	clear_bit(LFL_ACTIVE, &lp->flags);
+/* A lock placed on this queue is re-submitted to DLM as soon as the lock_dlm
+   thread gets to it. */
+
+static void queue_submit(struct gdlm_lock *lp)
+{
+	struct gdlm_ls *ls = lp->ls;
 
 	spin_lock(&ls->async_lock);
-	list_add_tail(&lp->clist, &ls->complete);
+	list_add_tail(&lp->delay_list, &ls->submit);
 	spin_unlock(&ls->async_lock);
 	wake_up(&ls->thread_wait);
 }
 
-static inline void gdlm_ast(void *astarg)
+static void wake_up_ast(struct gdlm_lock *lp)
 {
-	queue_complete(astarg);
+	clear_bit(LFL_AST_WAIT, &lp->flags);
+	smp_mb__after_clear_bit();
+	wake_up_bit(&lp->flags, LFL_AST_WAIT);
 }
 
-static inline void gdlm_bast(void *astarg, int mode)
+static void gdlm_delete_lp(struct gdlm_lock *lp)
 {
-	struct gdlm_lock *lp = astarg;
 	struct gdlm_ls *ls = lp->ls;
 
-	if (!mode) {
-		printk(KERN_INFO "lock_dlm: bast mode zero %x,%llx\n",
-			lp->lockname.ln_type,
-			(unsigned long long)lp->lockname.ln_number);
-		return;
-	}
-
 	spin_lock(&ls->async_lock);
-	if (!lp->bast_mode) {
-		list_add_tail(&lp->blist, &ls->blocking);
-		lp->bast_mode = mode;
-	} else if (lp->bast_mode < mode)
-		lp->bast_mode = mode;
+	if (!list_empty(&lp->delay_list))
+		list_del_init(&lp->delay_list);
+	gdlm_assert(!list_empty(&lp->all_list), "%x,%llx", lp->lockname.ln_type,
+		    (unsigned long long)lp->lockname.ln_number);
+	list_del_init(&lp->all_list);
+	ls->all_locks_count--;
 	spin_unlock(&ls->async_lock);
-	wake_up(&ls->thread_wait);
+
+	kfree(lp);
 }
 
-void gdlm_queue_delayed(struct gdlm_lock *lp)
+static void gdlm_queue_delayed(struct gdlm_lock *lp)
 {
 	struct gdlm_ls *ls = lp->ls;
 
@@ -59,6 +76,247 @@ void gdlm_queue_delayed(struct gdlm_lock *lp)
 	spin_unlock(&ls->async_lock);
 }
 
+static void process_complete(struct gdlm_lock *lp)
+{
+	struct gdlm_ls *ls = lp->ls;
+	struct lm_async_cb acb;
+	s16 prev_mode = lp->cur;
+
+	memset(&acb, 0, sizeof(acb));
+
+	if (lp->lksb.sb_status == -DLM_ECANCEL) {
+		log_info("complete dlm cancel %x,%llx flags %lx",
+		 	 lp->lockname.ln_type,
+			 (unsigned long long)lp->lockname.ln_number,
+			 lp->flags);
+
+		lp->req = lp->cur;
+		acb.lc_ret |= LM_OUT_CANCELED;
+		if (lp->cur == DLM_LOCK_IV)
+			lp->lksb.sb_lkid = 0;
+		goto out;
+	}
+
+	if (test_and_clear_bit(LFL_DLM_UNLOCK, &lp->flags)) {
+		if (lp->lksb.sb_status != -DLM_EUNLOCK) {
+			log_info("unlock sb_status %d %x,%llx flags %lx",
+				 lp->lksb.sb_status, lp->lockname.ln_type,
+				 (unsigned long long)lp->lockname.ln_number,
+				 lp->flags);
+			return;
+		}
+
+		lp->cur = DLM_LOCK_IV;
+		lp->req = DLM_LOCK_IV;
+		lp->lksb.sb_lkid = 0;
+
+		if (test_and_clear_bit(LFL_UNLOCK_DELETE, &lp->flags)) {
+			gdlm_delete_lp(lp);
+			return;
+		}
+		goto out;
+	}
+
+	if (lp->lksb.sb_flags & DLM_SBF_VALNOTVALID)
+		memset(lp->lksb.sb_lvbptr, 0, GDLM_LVB_SIZE);
+
+	if (lp->lksb.sb_flags & DLM_SBF_ALTMODE) {
+		if (lp->req == DLM_LOCK_PR)
+			lp->req = DLM_LOCK_CW;
+		else if (lp->req == DLM_LOCK_CW)
+			lp->req = DLM_LOCK_PR;
+	}
+
+	/*
+	 * A canceled lock request.  The lock was just taken off the delayed
+	 * list and was never even submitted to dlm.
+	 */
+
+	if (test_and_clear_bit(LFL_CANCEL, &lp->flags)) {
+		log_info("complete internal cancel %x,%llx",
+		 	 lp->lockname.ln_type,
+			 (unsigned long long)lp->lockname.ln_number);
+		lp->req = lp->cur;
+		acb.lc_ret |= LM_OUT_CANCELED;
+		goto out;
+	}
+
+	/*
+	 * An error occured.
+	 */
+
+	if (lp->lksb.sb_status) {
+		/* a "normal" error */
+		if ((lp->lksb.sb_status == -EAGAIN) &&
+		    (lp->lkf & DLM_LKF_NOQUEUE)) {
+			lp->req = lp->cur;
+			if (lp->cur == DLM_LOCK_IV)
+				lp->lksb.sb_lkid = 0;
+			goto out;
+		}
+
+		/* this could only happen with cancels I think */
+		log_info("ast sb_status %d %x,%llx flags %lx",
+			 lp->lksb.sb_status, lp->lockname.ln_type,
+			 (unsigned long long)lp->lockname.ln_number,
+			 lp->flags);
+		if (lp->lksb.sb_status == -EDEADLOCK) {
+			lp->req = lp->cur;
+			if (lp->cur == DLM_LOCK_IV)
+				lp->lksb.sb_lkid = 0;
+			goto out;
+		} else
+			return;
+	}
+
+	/*
+	 * This is an AST for an EX->EX conversion for sync_lvb from GFS.
+	 */
+
+	if (test_and_clear_bit(LFL_SYNC_LVB, &lp->flags)) {
+		wake_up_ast(lp);
+		return;
+	}
+
+	/*
+	 * A lock has been demoted to NL because it initially completed during
+	 * BLOCK_LOCKS.  Now it must be requested in the originally requested
+	 * mode.
+	 */
+
+	if (test_and_clear_bit(LFL_REREQUEST, &lp->flags)) {
+		gdlm_assert(lp->req == DLM_LOCK_NL, "%x,%llx",
+			    lp->lockname.ln_type,
+			    (unsigned long long)lp->lockname.ln_number);
+		gdlm_assert(lp->prev_req > DLM_LOCK_NL, "%x,%llx",
+			    lp->lockname.ln_type,
+			    (unsigned long long)lp->lockname.ln_number);
+
+		lp->cur = DLM_LOCK_NL;
+		lp->req = lp->prev_req;
+		lp->prev_req = DLM_LOCK_IV;
+		lp->lkf &= ~DLM_LKF_CONVDEADLK;
+
+		set_bit(LFL_NOCACHE, &lp->flags);
+
+		if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) &&
+		    !test_bit(LFL_NOBLOCK, &lp->flags))
+			gdlm_queue_delayed(lp);
+		else
+			queue_submit(lp);
+		return;
+	}
+
+	/*
+	 * A request is granted during dlm recovery.  It may be granted
+	 * because the locks of a failed node were cleared.  In that case,
+	 * there may be inconsistent data beneath this lock and we must wait
+	 * for recovery to complete to use it.  When gfs recovery is done this
+	 * granted lock will be converted to NL and then reacquired in this
+	 * granted state.
+	 */
+
+	if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) &&
+	    !test_bit(LFL_NOBLOCK, &lp->flags) &&
+	    lp->req != DLM_LOCK_NL) {
+
+		lp->cur = lp->req;
+		lp->prev_req = lp->req;
+		lp->req = DLM_LOCK_NL;
+		lp->lkf |= DLM_LKF_CONVERT;
+		lp->lkf &= ~DLM_LKF_CONVDEADLK;
+
+		log_debug("rereq %x,%llx id %x %d,%d",
+			  lp->lockname.ln_type,
+			  (unsigned long long)lp->lockname.ln_number,
+			  lp->lksb.sb_lkid, lp->cur, lp->req);
+
+		set_bit(LFL_REREQUEST, &lp->flags);
+		queue_submit(lp);
+		return;
+	}
+
+	/*
+	 * DLM demoted the lock to NL before it was granted so GFS must be
+	 * told it cannot cache data for this lock.
+	 */
+
+	if (lp->lksb.sb_flags & DLM_SBF_DEMOTED)
+		set_bit(LFL_NOCACHE, &lp->flags);
+
+out:
+	/*
+	 * This is an internal lock_dlm lock
+	 */
+
+	if (test_bit(LFL_INLOCK, &lp->flags)) {
+		clear_bit(LFL_NOBLOCK, &lp->flags);
+		lp->cur = lp->req;
+		wake_up_ast(lp);
+		return;
+	}
+
+	/*
+	 * Normal completion of a lock request.  Tell GFS it now has the lock.
+	 */
+
+	clear_bit(LFL_NOBLOCK, &lp->flags);
+	lp->cur = lp->req;
+
+	acb.lc_name = lp->lockname;
+	acb.lc_ret |= gdlm_make_lmstate(lp->cur);
+
+	if (!test_and_clear_bit(LFL_NOCACHE, &lp->flags) &&
+	    (lp->cur > DLM_LOCK_NL) && (prev_mode > DLM_LOCK_NL))
+		acb.lc_ret |= LM_OUT_CACHEABLE;
+
+	ls->fscb(ls->sdp, LM_CB_ASYNC, &acb);
+}
+
+static void gdlm_ast(void *astarg)
+{
+	struct gdlm_lock *lp = astarg;
+	clear_bit(LFL_ACTIVE, &lp->flags);
+	process_complete(lp);
+}
+
+static void process_blocking(struct gdlm_lock *lp, int bast_mode)
+{
+	struct gdlm_ls *ls = lp->ls;
+	unsigned int cb = 0;
+
+	switch (gdlm_make_lmstate(bast_mode)) {
+	case LM_ST_EXCLUSIVE:
+		cb = LM_CB_NEED_E;
+		break;
+	case LM_ST_DEFERRED:
+		cb = LM_CB_NEED_D;
+		break;
+	case LM_ST_SHARED:
+		cb = LM_CB_NEED_S;
+		break;
+	default:
+		gdlm_assert(0, "unknown bast mode %u", bast_mode);
+	}
+
+	ls->fscb(ls->sdp, cb, &lp->lockname);
+}
+
+
+static void gdlm_bast(void *astarg, int mode)
+{
+	struct gdlm_lock *lp = astarg;
+
+	if (!mode) {
+		printk(KERN_INFO "lock_dlm: bast mode zero %x,%llx\n",
+			lp->lockname.ln_type,
+			(unsigned long long)lp->lockname.ln_number);
+		return;
+	}
+
+	process_blocking(lp, mode);
+}
+
 /* convert gfs lock-state to dlm lock-mode */
 
 static s16 make_mode(s16 lmstate)
@@ -77,24 +335,6 @@ static s16 make_mode(s16 lmstate)
 	return -1;
 }
 
-/* convert dlm lock-mode to gfs lock-state */
-
-s16 gdlm_make_lmstate(s16 dlmmode)
-{
-	switch (dlmmode) {
-	case DLM_LOCK_IV:
-	case DLM_LOCK_NL:
-		return LM_ST_UNLOCKED;
-	case DLM_LOCK_EX:
-		return LM_ST_EXCLUSIVE;
-	case DLM_LOCK_CW:
-		return LM_ST_DEFERRED;
-	case DLM_LOCK_PR:
-		return LM_ST_SHARED;
-	}
-	gdlm_assert(0, "unknown DLM mode %d", dlmmode);
-	return -1;
-}
 
 /* verify agreement with GFS on the current lock state, NB: DLM_LOCK_NL and
    DLM_LOCK_IV are both considered LM_ST_UNLOCKED by GFS. */
@@ -164,7 +404,7 @@ static int gdlm_create_lp(struct gdlm_ls *ls, struct lm_lockname *name,
 {
 	struct gdlm_lock *lp;
 
-	lp = kzalloc(sizeof(struct gdlm_lock), GFP_KERNEL);
+	lp = kzalloc(sizeof(struct gdlm_lock), GFP_NOFS);
 	if (!lp)
 		return -ENOMEM;
 
@@ -172,10 +412,6 @@ static int gdlm_create_lp(struct gdlm_ls *ls, struct lm_lockname *name,
 	make_strname(name, &lp->strname);
 	lp->ls = ls;
 	lp->cur = DLM_LOCK_IV;
-	lp->lvb = NULL;
-	lp->hold_null = NULL;
-	INIT_LIST_HEAD(&lp->clist);
-	INIT_LIST_HEAD(&lp->blist);
 	INIT_LIST_HEAD(&lp->delay_list);
 
 	spin_lock(&ls->async_lock);
@@ -187,26 +423,6 @@ static int gdlm_create_lp(struct gdlm_ls *ls, struct lm_lockname *name,
 	return 0;
 }
 
-void gdlm_delete_lp(struct gdlm_lock *lp)
-{
-	struct gdlm_ls *ls = lp->ls;
-
-	spin_lock(&ls->async_lock);
-	if (!list_empty(&lp->clist))
-		list_del_init(&lp->clist);
-	if (!list_empty(&lp->blist))
-		list_del_init(&lp->blist);
-	if (!list_empty(&lp->delay_list))
-		list_del_init(&lp->delay_list);
-	gdlm_assert(!list_empty(&lp->all_list), "%x,%llx", lp->lockname.ln_type,
-		    (unsigned long long)lp->lockname.ln_number);
-	list_del_init(&lp->all_list);
-	ls->all_locks_count--;
-	spin_unlock(&ls->async_lock);
-
-	kfree(lp);
-}
-
 int gdlm_get_lock(void *lockspace, struct lm_lockname *name,
 		  void **lockp)
 {
@@ -260,7 +476,7 @@ unsigned int gdlm_do_lock(struct gdlm_lock *lp)
 
 	if ((error == -EAGAIN) && (lp->lkf & DLM_LKF_NOQUEUE)) {
 		lp->lksb.sb_status = -EAGAIN;
-		queue_complete(lp);
+		gdlm_ast(lp);
 		error = 0;
 	}
 
@@ -353,7 +569,7 @@ void gdlm_cancel(void *lock)
 	if (delay_list) {
 		set_bit(LFL_CANCEL, &lp->flags);
 		set_bit(LFL_ACTIVE, &lp->flags);
-		queue_complete(lp);
+		gdlm_ast(lp);
 		return;
 	}
 
@@ -385,7 +601,7 @@ static int gdlm_add_lvb(struct gdlm_lock *lp)
 {
 	char *lvb;
 
-	lvb = kzalloc(GDLM_LVB_SIZE, GFP_KERNEL);
+	lvb = kzalloc(GDLM_LVB_SIZE, GFP_NOFS);
 	if (!lvb)
 		return -ENOMEM;
 
diff --git a/fs/gfs2/locking/dlm/lock_dlm.h b/fs/gfs2/locking/dlm/lock_dlm.h
index e6e8ca2..ad944c6 100644
--- a/fs/gfs2/locking/dlm/lock_dlm.h
+++ b/fs/gfs2/locking/dlm/lock_dlm.h
@@ -13,7 +13,6 @@
 #include <linux/module.h>
 #include <linux/slab.h>
 #include <linux/spinlock.h>
-#include <linux/module.h>
 #include <linux/types.h>
 #include <linux/string.h>
 #include <linux/list.h>
@@ -73,15 +72,12 @@ struct gdlm_ls {
 	int			recover_jid_done;
 	int			recover_jid_status;
 	spinlock_t		async_lock;
-	struct list_head	complete;
-	struct list_head	blocking;
 	struct list_head	delayed;
 	struct list_head	submit;
 	struct list_head	all_locks;
 	u32		all_locks_count;
 	wait_queue_head_t	wait_control;
-	struct task_struct	*thread1;
-	struct task_struct	*thread2;
+	struct task_struct	*thread;
 	wait_queue_head_t	thread_wait;
 	unsigned long		drop_time;
 	int			drop_locks_count;
@@ -118,10 +114,6 @@ struct gdlm_lock {
 	u32			lkf;		/* dlm flags DLM_LKF_ */
 	unsigned long		flags;		/* lock_dlm flags LFL_ */
 
-	int			bast_mode;	/* protected by async_lock */
-
-	struct list_head	clist;		/* complete */
-	struct list_head	blist;		/* blocking */
 	struct list_head	delay_list;	/* delayed */
 	struct list_head	all_list;	/* all locks for the fs */
 	struct gdlm_lock	*hold_null;	/* NL lock for hold_lvb */
@@ -160,11 +152,8 @@ void gdlm_release_threads(struct gdlm_ls *);
 
 /* lock.c */
 
-s16 gdlm_make_lmstate(s16);
-void gdlm_queue_delayed(struct gdlm_lock *);
 void gdlm_submit_delayed(struct gdlm_ls *);
 int gdlm_release_all_locks(struct gdlm_ls *);
-void gdlm_delete_lp(struct gdlm_lock *);
 unsigned int gdlm_do_lock(struct gdlm_lock *);
 
 int gdlm_get_lock(void *, struct lm_lockname *, void **);
@@ -175,5 +164,9 @@ void gdlm_cancel(void *);
 int gdlm_hold_lvb(void *, char **);
 void gdlm_unhold_lvb(void *, char *);
 
+/* mount.c */
+
+extern const struct lm_lockops gdlm_ops;
+
 #endif
 
diff --git a/fs/gfs2/locking/dlm/main.c b/fs/gfs2/locking/dlm/main.c
index 867763c..b9a03a7 100644
--- a/fs/gfs2/locking/dlm/main.c
+++ b/fs/gfs2/locking/dlm/main.c
@@ -11,8 +11,6 @@
 
 #include "lock_dlm.h"
 
-extern struct lm_lockops gdlm_ops;
-
 static int __init init_lock_dlm(void)
 {
 	int error;
diff --git a/fs/gfs2/locking/dlm/mount.c b/fs/gfs2/locking/dlm/mount.c
index d428b47..de0b7be 100644
--- a/fs/gfs2/locking/dlm/mount.c
+++ b/fs/gfs2/locking/dlm/mount.c
@@ -28,15 +28,11 @@ static struct gdlm_ls *init_gdlm(lm_callback_t cb, struct gfs2_sbd *sdp,
 	ls->sdp = sdp;
 	ls->fsflags = flags;
 	spin_lock_init(&ls->async_lock);
-	INIT_LIST_HEAD(&ls->complete);
-	INIT_LIST_HEAD(&ls->blocking);
 	INIT_LIST_HEAD(&ls->delayed);
 	INIT_LIST_HEAD(&ls->submit);
 	INIT_LIST_HEAD(&ls->all_locks);
 	init_waitqueue_head(&ls->thread_wait);
 	init_waitqueue_head(&ls->wait_control);
-	ls->thread1 = NULL;
-	ls->thread2 = NULL;
 	ls->drop_time = jiffies;
 	ls->jid = -1;
 
@@ -67,6 +63,11 @@ static int make_args(struct gdlm_ls *ls, char *data_arg, int *nodir)
 	memset(data, 0, 256);
 	strncpy(data, data_arg, 255);
 
+	if (!strlen(data)) {
+		log_error("no mount options, (u)mount helpers not installed");
+		return -EINVAL;
+	}
+
 	for (options = data; (x = strsep(&options, ":")); ) {
 		if (!*x)
 			continue;
diff --git a/fs/gfs2/locking/dlm/sysfs.c b/fs/gfs2/locking/dlm/sysfs.c
index 4746b88..1373aec 100644
--- a/fs/gfs2/locking/dlm/sysfs.c
+++ b/fs/gfs2/locking/dlm/sysfs.c
@@ -12,8 +12,6 @@
 
 #include "lock_dlm.h"
 
-extern struct lm_lockops gdlm_ops;
-
 static ssize_t proto_name_show(struct gdlm_ls *ls, char *buf)
 {
 	return sprintf(buf, "%s\n", gdlm_ops.lm_proto_name);
diff --git a/fs/gfs2/locking/dlm/thread.c b/fs/gfs2/locking/dlm/thread.c
index 521694f..f30350a 100644
--- a/fs/gfs2/locking/dlm/thread.c
+++ b/fs/gfs2/locking/dlm/thread.c
@@ -9,247 +9,12 @@
 
 #include "lock_dlm.h"
 
-/* A lock placed on this queue is re-submitted to DLM as soon as the lock_dlm
-   thread gets to it. */
-
-static void queue_submit(struct gdlm_lock *lp)
-{
-	struct gdlm_ls *ls = lp->ls;
-
-	spin_lock(&ls->async_lock);
-	list_add_tail(&lp->delay_list, &ls->submit);
-	spin_unlock(&ls->async_lock);
-	wake_up(&ls->thread_wait);
-}
-
-static void process_blocking(struct gdlm_lock *lp, int bast_mode)
-{
-	struct gdlm_ls *ls = lp->ls;
-	unsigned int cb = 0;
-
-	switch (gdlm_make_lmstate(bast_mode)) {
-	case LM_ST_EXCLUSIVE:
-		cb = LM_CB_NEED_E;
-		break;
-	case LM_ST_DEFERRED:
-		cb = LM_CB_NEED_D;
-		break;
-	case LM_ST_SHARED:
-		cb = LM_CB_NEED_S;
-		break;
-	default:
-		gdlm_assert(0, "unknown bast mode %u", lp->bast_mode);
-	}
-
-	ls->fscb(ls->sdp, cb, &lp->lockname);
-}
-
-static void wake_up_ast(struct gdlm_lock *lp)
-{
-	clear_bit(LFL_AST_WAIT, &lp->flags);
-	smp_mb__after_clear_bit();
-	wake_up_bit(&lp->flags, LFL_AST_WAIT);
-}
-
-static void process_complete(struct gdlm_lock *lp)
-{
-	struct gdlm_ls *ls = lp->ls;
-	struct lm_async_cb acb;
-	s16 prev_mode = lp->cur;
-
-	memset(&acb, 0, sizeof(acb));
-
-	if (lp->lksb.sb_status == -DLM_ECANCEL) {
-		log_info("complete dlm cancel %x,%llx flags %lx",
-		 	 lp->lockname.ln_type,
-			 (unsigned long long)lp->lockname.ln_number,
-			 lp->flags);
-
-		lp->req = lp->cur;
-		acb.lc_ret |= LM_OUT_CANCELED;
-		if (lp->cur == DLM_LOCK_IV)
-			lp->lksb.sb_lkid = 0;
-		goto out;
-	}
-
-	if (test_and_clear_bit(LFL_DLM_UNLOCK, &lp->flags)) {
-		if (lp->lksb.sb_status != -DLM_EUNLOCK) {
-			log_info("unlock sb_status %d %x,%llx flags %lx",
-				 lp->lksb.sb_status, lp->lockname.ln_type,
-				 (unsigned long long)lp->lockname.ln_number,
-				 lp->flags);
-			return;
-		}
-
-		lp->cur = DLM_LOCK_IV;
-		lp->req = DLM_LOCK_IV;
-		lp->lksb.sb_lkid = 0;
-
-		if (test_and_clear_bit(LFL_UNLOCK_DELETE, &lp->flags)) {
-			gdlm_delete_lp(lp);
-			return;
-		}
-		goto out;
-	}
-
-	if (lp->lksb.sb_flags & DLM_SBF_VALNOTVALID)
-		memset(lp->lksb.sb_lvbptr, 0, GDLM_LVB_SIZE);
-
-	if (lp->lksb.sb_flags & DLM_SBF_ALTMODE) {
-		if (lp->req == DLM_LOCK_PR)
-			lp->req = DLM_LOCK_CW;
-		else if (lp->req == DLM_LOCK_CW)
-			lp->req = DLM_LOCK_PR;
-	}
-
-	/*
-	 * A canceled lock request.  The lock was just taken off the delayed
-	 * list and was never even submitted to dlm.
-	 */
-
-	if (test_and_clear_bit(LFL_CANCEL, &lp->flags)) {
-		log_info("complete internal cancel %x,%llx",
-		 	 lp->lockname.ln_type,
-			 (unsigned long long)lp->lockname.ln_number);
-		lp->req = lp->cur;
-		acb.lc_ret |= LM_OUT_CANCELED;
-		goto out;
-	}
-
-	/*
-	 * An error occured.
-	 */
-
-	if (lp->lksb.sb_status) {
-		/* a "normal" error */
-		if ((lp->lksb.sb_status == -EAGAIN) &&
-		    (lp->lkf & DLM_LKF_NOQUEUE)) {
-			lp->req = lp->cur;
-			if (lp->cur == DLM_LOCK_IV)
-				lp->lksb.sb_lkid = 0;
-			goto out;
-		}
-
-		/* this could only happen with cancels I think */
-		log_info("ast sb_status %d %x,%llx flags %lx",
-			 lp->lksb.sb_status, lp->lockname.ln_type,
-			 (unsigned long long)lp->lockname.ln_number,
-			 lp->flags);
-		return;
-	}
-
-	/*
-	 * This is an AST for an EX->EX conversion for sync_lvb from GFS.
-	 */
-
-	if (test_and_clear_bit(LFL_SYNC_LVB, &lp->flags)) {
-		wake_up_ast(lp);
-		return;
-	}
-
-	/*
-	 * A lock has been demoted to NL because it initially completed during
-	 * BLOCK_LOCKS.  Now it must be requested in the originally requested
-	 * mode.
-	 */
-
-	if (test_and_clear_bit(LFL_REREQUEST, &lp->flags)) {
-		gdlm_assert(lp->req == DLM_LOCK_NL, "%x,%llx",
-			    lp->lockname.ln_type,
-			    (unsigned long long)lp->lockname.ln_number);
-		gdlm_assert(lp->prev_req > DLM_LOCK_NL, "%x,%llx",
-			    lp->lockname.ln_type,
-			    (unsigned long long)lp->lockname.ln_number);
-
-		lp->cur = DLM_LOCK_NL;
-		lp->req = lp->prev_req;
-		lp->prev_req = DLM_LOCK_IV;
-		lp->lkf &= ~DLM_LKF_CONVDEADLK;
-
-		set_bit(LFL_NOCACHE, &lp->flags);
-
-		if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) &&
-		    !test_bit(LFL_NOBLOCK, &lp->flags))
-			gdlm_queue_delayed(lp);
-		else
-			queue_submit(lp);
-		return;
-	}
-
-	/*
-	 * A request is granted during dlm recovery.  It may be granted
-	 * because the locks of a failed node were cleared.  In that case,
-	 * there may be inconsistent data beneath this lock and we must wait
-	 * for recovery to complete to use it.  When gfs recovery is done this
-	 * granted lock will be converted to NL and then reacquired in this
-	 * granted state.
-	 */
-
-	if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) &&
-	    !test_bit(LFL_NOBLOCK, &lp->flags) &&
-	    lp->req != DLM_LOCK_NL) {
-
-		lp->cur = lp->req;
-		lp->prev_req = lp->req;
-		lp->req = DLM_LOCK_NL;
-		lp->lkf |= DLM_LKF_CONVERT;
-		lp->lkf &= ~DLM_LKF_CONVDEADLK;
-
-		log_debug("rereq %x,%llx id %x %d,%d",
-			  lp->lockname.ln_type,
-			  (unsigned long long)lp->lockname.ln_number,
-			  lp->lksb.sb_lkid, lp->cur, lp->req);
-
-		set_bit(LFL_REREQUEST, &lp->flags);
-		queue_submit(lp);
-		return;
-	}
-
-	/*
-	 * DLM demoted the lock to NL before it was granted so GFS must be
-	 * told it cannot cache data for this lock.
-	 */
-
-	if (lp->lksb.sb_flags & DLM_SBF_DEMOTED)
-		set_bit(LFL_NOCACHE, &lp->flags);
-
-out:
-	/*
-	 * This is an internal lock_dlm lock
-	 */
-
-	if (test_bit(LFL_INLOCK, &lp->flags)) {
-		clear_bit(LFL_NOBLOCK, &lp->flags);
-		lp->cur = lp->req;
-		wake_up_ast(lp);
-		return;
-	}
-
-	/*
-	 * Normal completion of a lock request.  Tell GFS it now has the lock.
-	 */
-
-	clear_bit(LFL_NOBLOCK, &lp->flags);
-	lp->cur = lp->req;
-
-	acb.lc_name = lp->lockname;
-	acb.lc_ret |= gdlm_make_lmstate(lp->cur);
-
-	if (!test_and_clear_bit(LFL_NOCACHE, &lp->flags) &&
-	    (lp->cur > DLM_LOCK_NL) && (prev_mode > DLM_LOCK_NL))
-		acb.lc_ret |= LM_OUT_CACHEABLE;
-
-	ls->fscb(ls->sdp, LM_CB_ASYNC, &acb);
-}
-
-static inline int no_work(struct gdlm_ls *ls, int blocking)
+static inline int no_work(struct gdlm_ls *ls)
 {
 	int ret;
 
 	spin_lock(&ls->async_lock);
-	ret = list_empty(&ls->complete) && list_empty(&ls->submit);
-	if (ret && blocking)
-		ret = list_empty(&ls->blocking);
+	ret = list_empty(&ls->submit);
 	spin_unlock(&ls->async_lock);
 
 	return ret;
@@ -268,100 +33,55 @@ static inline int check_drop(struct gdlm_ls *ls)
 	return 0;
 }
 
-static int gdlm_thread(void *data, int blist)
+static int gdlm_thread(void *data)
 {
 	struct gdlm_ls *ls = (struct gdlm_ls *) data;
 	struct gdlm_lock *lp = NULL;
-	uint8_t complete, blocking, submit, drop;
-
-	/* Only thread1 is allowed to do blocking callbacks since gfs
-	   may wait for a completion callback within a blocking cb. */
 
 	while (!kthread_should_stop()) {
 		wait_event_interruptible(ls->thread_wait,
-				!no_work(ls, blist) || kthread_should_stop());
-
-		complete = blocking = submit = drop = 0;
+				!no_work(ls) || kthread_should_stop());
 
 		spin_lock(&ls->async_lock);
 
-		if (blist && !list_empty(&ls->blocking)) {
-			lp = list_entry(ls->blocking.next, struct gdlm_lock,
-					blist);
-			list_del_init(&lp->blist);
-			blocking = lp->bast_mode;
-			lp->bast_mode = 0;
-		} else if (!list_empty(&ls->complete)) {
-			lp = list_entry(ls->complete.next, struct gdlm_lock,
-					clist);
-			list_del_init(&lp->clist);
-			complete = 1;
-		} else if (!list_empty(&ls->submit)) {
+		if (!list_empty(&ls->submit)) {
 			lp = list_entry(ls->submit.next, struct gdlm_lock,
 					delay_list);
 			list_del_init(&lp->delay_list);
-			submit = 1;
-		}
-
-		drop = check_drop(ls);
-		spin_unlock(&ls->async_lock);
-
-		if (complete)
-			process_complete(lp);
-
-		else if (blocking)
-			process_blocking(lp, blocking);
-
-		else if (submit)
+			spin_unlock(&ls->async_lock);
 			gdlm_do_lock(lp);
-
-		if (drop)
+			spin_lock(&ls->async_lock);
+		}
+		/* Does this ever happen these days? I hope not anyway */
+		if (check_drop(ls)) {
+			spin_unlock(&ls->async_lock);
 			ls->fscb(ls->sdp, LM_CB_DROPLOCKS, NULL);
-
-		schedule();
+			spin_lock(&ls->async_lock);
+		}
+		spin_unlock(&ls->async_lock);
 	}
 
 	return 0;
 }
 
-static int gdlm_thread1(void *data)
-{
-	return gdlm_thread(data, 1);
-}
-
-static int gdlm_thread2(void *data)
-{
-	return gdlm_thread(data, 0);
-}
-
 int gdlm_init_threads(struct gdlm_ls *ls)
 {
 	struct task_struct *p;
 	int error;
 
-	p = kthread_run(gdlm_thread1, ls, "lock_dlm1");
-	error = IS_ERR(p);
-	if (error) {
-		log_error("can't start lock_dlm1 thread %d", error);
-		return error;
-	}
-	ls->thread1 = p;
-
-	p = kthread_run(gdlm_thread2, ls, "lock_dlm2");
+	p = kthread_run(gdlm_thread, ls, "lock_dlm");
 	error = IS_ERR(p);
 	if (error) {
-		log_error("can't start lock_dlm2 thread %d", error);
-		kthread_stop(ls->thread1);
+		log_error("can't start lock_dlm thread %d", error);
 		return error;
 	}
-	ls->thread2 = p;
+	ls->thread = p;
 
 	return 0;
 }
 
 void gdlm_release_threads(struct gdlm_ls *ls)
 {
-	kthread_stop(ls->thread1);
-	kthread_stop(ls->thread2);
+	kthread_stop(ls->thread);
 }