Sophie

Sophie

distrib > Scientific%20Linux > 5x > x86_64 > by-pkgid > 89877e42827f16fa5f86b1df0c2860b1 > files > 624

kernel-2.6.18-128.1.10.el5.src.rpm

From: Konrad Rzeszutek <konradr@redhat.com>
Date: Fri, 14 Dec 2007 20:37:41 -0500
Subject: [fs] nfs: byte-range locking support for cfs
Message-id: 20071215013741.GA18638@mars.boston.redhat.com
O-Subject: Re: [RHEL5 PATCH] REPOST: RHBZ 196318: NFS byte-range locking support for cluster file systems.
Bugzilla: 196318

Description:
------------
Byte-range lock requests over NFS do not propagate to the
underlying file-system with current Linux kernels.  This causes
them to work incorrectly with cluster file-systems like GPFS.
This feature adds the missing support. This feature
has a pre-requisite: a fix to BZ 244343, which work is currently
underway.

Here is a repost of the patch with some extra bits from BZ 244343,
and BZ 232649. A huge Thank You goes to the GFS team for finding
the culprits that were causing this feature to have a regression.

Test Status:
------------
Tested four different scenarios using the NFS Connectathon Test-suite:
a). as a NFS client against various OSes.
b). as a NFS server providing a ext3 file-system.
c). as a NFS server providing a GFS cluster file-system.
d). as a NFS server providing a GFS2 (dlm_lock and no_lock) cluster file-system.
e). running the NFS Connectathon locally on the a ext3 fs to check locking.

The a) test works successfully. Running the NFS Connectathon Test-Suite against
NFS shares exported from RHEL3, RHEL4, RHEL5, Sol9, Sol10, and NetApp machines works
with success.

The b) test works successfully. The NFS Connectathon TestSuite on RHEL3, RHEL4,
RHEL5, Sol9 and Sol10 client machines against a NFS share exported (with ext3 as the
under-laying file-system) from a machine running RHEL5 with the patched kernel worked
with success.

The c) and d) works successfully. The NFS Connectathon TestSuite on RHEL3, RHEL4,
RHEL5, Sol9 and Sol10 client machines against a NFS share exported in c)
with GFS exported file-system, and in d) with GFS2 exported file-system from a
machine running RHEL5 with the patched kernel works succesfully.

The e) test scenario works successfully. The NFS Connectathon Testate on RHEL5
against a local directory worked with success.

Acked-by: David Teigland <teigland@redhat.com>
Acked-by: Steve Dickson <SteveD@redhat.com>

diff --git a/fs/fuse/file.c b/fs/fuse/file.c
index 4e174c8..51a6588 100644
--- a/fs/fuse/file.c
+++ b/fs/fuse/file.c
@@ -743,8 +743,7 @@ static int fuse_file_lock(struct file *file, int cmd, struct file_lock *fl)
 
 	if (cmd == F_GETLK) {
 		if (fc->no_lock) {
-			if (!posix_test_lock(file, fl, fl))
-				fl->fl_type = F_UNLCK;
+			posix_test_lock(file, fl, NULL);
 			err = 0;
 		} else
 			err = fuse_getlk(file, fl);
diff --git a/fs/gfs2/inode.c b/fs/gfs2/inode.c
index 470dc05..90a80ac 100644
--- a/fs/gfs2/inode.c
+++ b/fs/gfs2/inode.c
@@ -132,14 +132,21 @@ static struct inode *gfs2_iget_skip(struct super_block *sb,
 
 void gfs2_set_iop(struct inode *inode)
 {
+	struct gfs2_sbd *sdp = GFS2_SB(inode);
 	umode_t mode = inode->i_mode;
 
 	if (S_ISREG(mode)) {
 		inode->i_op = &gfs2_file_iops;
-		inode->i_fop = &gfs2_file_fops;
+		if (sdp->sd_args.ar_localflocks) 
+			inode->i_fop = &gfs2_file_fops_nolock;
+		else
+			inode->i_fop = &gfs2_file_fops;
 	} else if (S_ISDIR(mode)) {
 		inode->i_op = &gfs2_dir_iops;
-		inode->i_fop = &gfs2_dir_fops;
+		if (sdp->sd_args.ar_localflocks) 
+			inode->i_fop = &gfs2_dir_fops_nolock;
+		else
+			inode->i_fop = &gfs2_dir_fops;
 	} else if (S_ISLNK(mode)) {
 		inode->i_op = &gfs2_symlink_iops;
 	} else {
diff --git a/fs/gfs2/locking/dlm/plock.c b/fs/gfs2/locking/dlm/plock.c
index 84cafd5..7634bc9 100644
--- a/fs/gfs2/locking/dlm/plock.c
+++ b/fs/gfs2/locking/dlm/plock.c
@@ -24,6 +24,15 @@ struct plock_op {
 	struct gdlm_plock_info info;
 };
 
+struct plock_xop {
+	struct plock_op xop;
+	void *callback;
+	void *fl;
+	void *file;
+	struct file_lock flc;
+};
+
+
 static inline void set_version(struct gdlm_plock_info *info)
 {
 	info->version[0] = GDLM_PLOCK_VERSION_MAJOR;
@@ -63,12 +72,14 @@ int gdlm_plock(void *lockspace, struct lm_lockname *name,
 {
 	struct gdlm_ls *ls = lockspace;
 	struct plock_op *op;
+	struct plock_xop *xop;
 	int rv;
 
-	op = kzalloc(sizeof(*op), GFP_KERNEL);
-	if (!op)
+	xop = kzalloc(sizeof(*xop), GFP_KERNEL);
+	if (!xop)
 		return -ENOMEM;
 
+	op = &xop->xop;
 	op->info.optype		= GDLM_PLOCK_OP_LOCK;
 	op->info.pid		= fl->fl_pid;
 	op->info.ex		= (fl->fl_type == F_WRLCK);
@@ -77,10 +88,27 @@ int gdlm_plock(void *lockspace, struct lm_lockname *name,
 	op->info.number		= name->ln_number;
 	op->info.start		= fl->fl_start;
 	op->info.end		= fl->fl_end;
-	op->info.owner		= (__u64)(long) fl->fl_owner;
+	if ((fl->fl_flags & FL_GRANT) &&
+		fl->fl_lmops && fl->fl_lmops->fl_grant) {
+		/* fl_owner is lockd which doesn't distinguish
+		processes on the nfs client */
+		op->info.owner  = (__u64) fl->fl_pid;
+		xop->callback	= fl->fl_lmops->fl_grant;
+		locks_init_lock(&xop->flc);
+		locks_copy_lock(&xop->flc, fl);
+		xop->fl		= fl;
+		xop->file	= file;
+	} else {
+		xop->callback	= NULL;
+		op->info.owner  = (__u64)(long) fl->fl_owner;
+	}
 
 	send_op(op);
-	wait_event(recv_wq, (op->done != 0));
+
+	if (xop->callback == NULL)
+		wait_event(recv_wq, (op->done != 0));
+	else
+		return -EINPROGRESS;
 
 	spin_lock(&ops_lock);
 	if (!list_empty(&op->list)) {
@@ -98,7 +126,63 @@ int gdlm_plock(void *lockspace, struct lm_lockname *name,
 				  (unsigned long long)name->ln_number);
 	}
 
-	kfree(op);
+	kfree(xop);
+	return rv;
+}
+
+/* Returns failure iff a succesful lock operation should be canceled */
+static int gdlm_plock_callback(struct plock_op *op)
+{
+	struct file *file;
+	struct file_lock *fl;
+	struct file_lock *flc;
+	int (*notify)(void *, void *, int) = NULL;
+	struct plock_xop *xop = (struct plock_xop *)op;
+	int rv = 0;
+
+	spin_lock(&ops_lock);
+	if (!list_empty(&op->list)) {
+		printk(KERN_INFO "plock op on list\n");
+		list_del(&op->list);
+	}
+	spin_unlock(&ops_lock);
+
+	/* check if the following 2 are still valid or make a copy */
+	file = xop->file;
+	flc = &xop->flc;
+	fl = xop->fl;
+	notify = xop->callback;
+
+	if (op->info.rv) {
+		notify(flc, NULL, op->info.rv);
+		goto out;
+	}
+
+	/* got fs lock; bookkeep locally as well: */
+	flc->fl_flags &= ~FL_SLEEP;
+	if (posix_lock_file(file, flc)) {
+		/*
+		 * This can only happen in the case of kmalloc() failure.
+		 * The filesystem's own lock is the authoritative lock,
+		 * so a failure to get the lock locally is not a disaster.
+		 * As long as GFS cannot reliably cancel locks (especially
+		 * in a low-memory situation), we're better off ignoring
+		 * this failure than trying to recover.
+		 */
+		log_error("gdlm_plock: vfs lock error file %p fl %p",
+				file, fl);
+	}
+
+	rv = notify(flc, NULL, 0);
+	if (rv) {
+		/* XXX: We need to cancel the fs lock here: */
+		printk(KERN_ERR "gfs2 lock granted after lock request failed;"
+						" dangling lock!\n");
+		goto out;
+	}
+
+out:
+	kfree(xop);
 	return rv;
 }
 
@@ -123,7 +207,11 @@ int gdlm_punlock(void *lockspace, struct lm_lockname *name,
 	op->info.number		= name->ln_number;
 	op->info.start		= fl->fl_start;
 	op->info.end		= fl->fl_end;
-	op->info.owner		= (__u64)(long) fl->fl_owner;
+	if ((fl->fl_flags & FL_GRANT) &&
+		fl->fl_lmops && fl->fl_lmops->fl_grant)
+		op->info.owner  = (__u64) fl->fl_pid;
+	else
+		op->info.owner	= (__u64)(long) fl->fl_owner;
 
 	send_op(op);
 	wait_event(recv_wq, (op->done != 0));
@@ -162,7 +250,12 @@ int gdlm_plock_get(void *lockspace, struct lm_lockname *name,
 	op->info.number		= name->ln_number;
 	op->info.start		= fl->fl_start;
 	op->info.end		= fl->fl_end;
-	op->info.owner          = (__u64)(long) fl->fl_owner;
+	if ((fl->fl_flags & FL_GRANT) &&
+		fl->fl_lmops && fl->fl_lmops->fl_grant)
+		op->info.owner  = (__u64) fl->fl_pid;
+	else
+		op->info.owner	= (__u64)(long) fl->fl_owner;
+
 
 	send_op(op);
 	wait_event(recv_wq, (op->done != 0));
@@ -251,9 +344,14 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count,
 	}
 	spin_unlock(&ops_lock);
 
-	if (found)
-		wake_up(&recv_wq);
-	else
+	if (found) {
+		struct plock_xop *xop;
+		xop = (struct plock_xop *)op;
+		if (xop->callback)
+			count = gdlm_plock_callback(op);
+		else
+			wake_up(&recv_wq);
+	} else
 		printk(KERN_INFO "gdlm dev_write no op %x %llx\n", info.fsid,
 			(unsigned long long)info.number);
 	return count;
diff --git a/fs/gfs2/locking/nolock/main.c b/fs/gfs2/locking/nolock/main.c
index acfbc94..3892f6b 100644
--- a/fs/gfs2/locking/nolock/main.c
+++ b/fs/gfs2/locking/nolock/main.c
@@ -164,13 +164,7 @@ static void nolock_unhold_lvb(void *lock, char *lvb)
 static int nolock_plock_get(void *lockspace, struct lm_lockname *name,
 			    struct file *file, struct file_lock *fl)
 {
-	struct file_lock tmp;
-	int ret;
-
-	ret = posix_test_lock(file, fl, &tmp);
-	fl->fl_type = F_UNLCK;
-	if (ret)
-		memcpy(fl, &tmp, sizeof(struct file_lock));
+	posix_test_lock(file, fl, NULL);
 
 	return 0;
 }
diff --git a/fs/gfs2/ops_file.c b/fs/gfs2/ops_file.c
index 583a61b..9d9e2fd 100644
--- a/fs/gfs2/ops_file.c
+++ b/fs/gfs2/ops_file.c
@@ -530,18 +530,18 @@ static int gfs2_lock(struct file *file, int cmd, struct file_lock *fl)
 
 	if (sdp->sd_args.ar_localflocks) {
 		if (IS_GETLK(cmd)) {
-			struct file_lock tmp;
-			int ret;
-			ret = posix_test_lock(file, fl, &tmp);
-			fl->fl_type = F_UNLCK;
-			if (ret)
-				memcpy(fl, &tmp, sizeof(struct file_lock));
+			posix_test_lock(file, fl, NULL);
 			return 0;
 		} else {
 			return posix_lock_file_wait(file, fl);
 		}
 	}
 
+	if (cmd == F_CANCELLK) {
+		/* Hack: */
+		cmd = F_SETLK;
+		fl->fl_type = F_UNLCK;
+	}
 	if (IS_GETLK(cmd))
 		return gfs2_lm_plock_get(sdp, &name, file, fl);
 	else if (fl->fl_type == F_UNLCK)
@@ -622,16 +622,12 @@ static void do_unflock(struct file *file, struct file_lock *fl)
 static int gfs2_flock(struct file *file, int cmd, struct file_lock *fl)
 {
 	struct gfs2_inode *ip = GFS2_I(file->f_mapping->host);
-	struct gfs2_sbd *sdp = GFS2_SB(file->f_mapping->host);
 
 	if (!(fl->fl_flags & FL_FLOCK))
 		return -ENOLCK;
 	if ((ip->i_inode.i_mode & (S_ISGID | S_IXGRP)) == S_ISGID)
 		return -ENOLCK;
 
-	if (sdp->sd_args.ar_localflocks)
-		return flock_lock_file_wait(file, fl);
-
 	if (fl->fl_type == F_UNLCK) {
 		do_unflock(file, fl);
 		return 0;
@@ -669,3 +665,29 @@ const struct file_operations gfs2_dir_fops = {
 	.flock		= gfs2_flock,
 };
 
+const struct file_operations gfs2_file_fops_nolock = {
+	.llseek		= gfs2_llseek,
+	.read		= generic_file_read,
+	.readv		= generic_file_readv,
+	.aio_read	= generic_file_aio_read,
+	.write		= generic_file_write,
+	.writev		= generic_file_writev,
+	.aio_write	= generic_file_aio_write,
+	.unlocked_ioctl	= gfs2_ioctl,
+	.mmap		= gfs2_mmap,
+	.open		= gfs2_open,
+	.release	= gfs2_close,
+	.fsync		= gfs2_fsync,
+	.sendfile	= generic_file_sendfile,
+	.splice_read	= generic_file_splice_read,
+	.splice_write	= generic_file_splice_write,
+};
+
+const struct file_operations gfs2_dir_fops_nolock = {
+	.readdir	= gfs2_readdir,
+	.unlocked_ioctl	= gfs2_ioctl,
+	.open		= gfs2_open,
+	.release	= gfs2_close,
+	.fsync		= gfs2_fsync,
+};
+
diff --git a/fs/gfs2/ops_file.h b/fs/gfs2/ops_file.h
index 7e5d8ec..ff39b7a 100644
--- a/fs/gfs2/ops_file.h
+++ b/fs/gfs2/ops_file.h
@@ -20,5 +20,7 @@ extern int gfs2_internal_read(struct gfs2_inode *ip,
 extern void gfs2_set_inode_flags(struct inode *inode);
 extern const struct file_operations gfs2_file_fops;
 extern const struct file_operations gfs2_dir_fops;
+extern const struct file_operations gfs2_file_fops_nolock;
+extern const struct file_operations gfs2_dir_fops_nolock;
 
 #endif /* __OPS_FILE_DOT_H__ */
diff --git a/fs/lockd/svc4proc.c b/fs/lockd/svc4proc.c
index a2dd9cc..2eb0a61 100644
--- a/fs/lockd/svc4proc.c
+++ b/fs/lockd/svc4proc.c
@@ -53,6 +53,9 @@ nlm4svc_retrieve_args(struct svc_rqst *rqstp, struct nlm_args *argp,
 		lock->fl.fl_file  = file->f_file;
 		lock->fl.fl_owner = (fl_owner_t) host;
 		lock->fl.fl_lmops = &nlmsvc_lock_operations;
+		/* kABI toggle to let the caller know that f_lmops has
+		the grant callback. */
+		lock->fl.fl_flags |= FL_GRANT;
 	}
 
 	return 0;
@@ -96,10 +99,14 @@ nlm4svc_proc_test(struct svc_rqst *rqstp, struct nlm_args *argp,
 
 	/* Obtain client and file */
 	if ((resp->status = nlm4svc_retrieve_args(rqstp, argp, &host, &file)))
-		return rpc_success;
+		return resp->status == nlm_drop_reply ?
+			rpc_drop_reply :rpc_success;
 
 	/* Now check for conflicting locks */
-	resp->status = nlmsvc_testlock(file, &argp->lock, &resp->lock);
+	resp->status = nlmsvc_testlock(rqstp, file, &argp->lock,
+				       &resp->lock, &resp->cookie);
+	if (resp->status == nlm_drop_reply)
+		return rpc_drop_reply;
 
 	dprintk("lockd: TEST4          status %d\n", ntohl(resp->status));
 	nlm_release_host(host);
@@ -126,7 +133,8 @@ nlm4svc_proc_lock(struct svc_rqst *rqstp, struct nlm_args *argp,
 
 	/* Obtain client and file */
 	if ((resp->status = nlm4svc_retrieve_args(rqstp, argp, &host, &file)))
-		return rpc_success;
+		return resp->status == nlm_drop_reply ?
+			rpc_drop_reply :rpc_success;
 
 #if 0
 	/* If supplied state doesn't match current state, we assume it's
@@ -143,6 +151,8 @@ nlm4svc_proc_lock(struct svc_rqst *rqstp, struct nlm_args *argp,
 	/* Now try to lock the file */
 	resp->status = nlmsvc_lock(rqstp, file, &argp->lock,
 					argp->block, &argp->cookie);
+	if (resp->status == nlm_drop_reply)
+		return rpc_drop_reply;
 
 	dprintk("lockd: LOCK          status %d\n", ntohl(resp->status));
 	nlm_release_host(host);
@@ -169,7 +179,8 @@ nlm4svc_proc_cancel(struct svc_rqst *rqstp, struct nlm_args *argp,
 
 	/* Obtain client and file */
 	if ((resp->status = nlm4svc_retrieve_args(rqstp, argp, &host, &file)))
-		return rpc_success;
+		return resp->status == nlm_drop_reply ?
+			rpc_drop_reply :rpc_success;
 
 	/* Try to cancel request. */
 	resp->status = nlmsvc_cancel_blocked(file, &argp->lock);
@@ -202,7 +213,8 @@ nlm4svc_proc_unlock(struct svc_rqst *rqstp, struct nlm_args *argp,
 
 	/* Obtain client and file */
 	if ((resp->status = nlm4svc_retrieve_args(rqstp, argp, &host, &file)))
-		return rpc_success;
+		return resp->status == nlm_drop_reply ?
+			rpc_drop_reply :rpc_success;
 
 	/* Now try to remove the lock */
 	resp->status = nlmsvc_unlock(file, &argp->lock);
@@ -337,7 +349,8 @@ nlm4svc_proc_share(struct svc_rqst *rqstp, struct nlm_args *argp,
 
 	/* Obtain client and file */
 	if ((resp->status = nlm4svc_retrieve_args(rqstp, argp, &host, &file)))
-		return rpc_success;
+		return resp->status == nlm_drop_reply ?
+			rpc_drop_reply :rpc_success;
 
 	/* Now try to create the share */
 	resp->status = nlmsvc_share_file(host, file, argp);
@@ -370,7 +383,8 @@ nlm4svc_proc_unshare(struct svc_rqst *rqstp, struct nlm_args *argp,
 
 	/* Obtain client and file */
 	if ((resp->status = nlm4svc_retrieve_args(rqstp, argp, &host, &file)))
-		return rpc_success;
+		return resp->status == nlm_drop_reply ?
+			rpc_drop_reply :rpc_success;
 
 	/* Now try to lock the file */
 	resp->status = nlmsvc_unshare_file(host, file, argp);
diff --git a/fs/lockd/svclock.c b/fs/lockd/svclock.c
index 0377832..aa093bb 100644
--- a/fs/lockd/svclock.c
+++ b/fs/lockd/svclock.c
@@ -172,7 +172,7 @@ nlmsvc_find_block(struct nlm_cookie *cookie,  struct sockaddr_in *sin)
  */
 static inline struct nlm_block *
 nlmsvc_create_block(struct svc_rqst *rqstp, struct nlm_file *file,
-				struct nlm_lock *lock, struct nlm_cookie *cookie)
+		struct nlm_lock *lock, struct nlm_cookie *cookie, int conf)
 {
 	struct nlm_block	*block;
 	struct nlm_host		*host;
@@ -196,13 +196,19 @@ nlmsvc_create_block(struct svc_rqst *rqstp, struct nlm_file *file,
 	if (!nlmsvc_setgrantargs(call, lock))
 		goto failed_free;
 
-	/* Set notifier function for VFS, and init args */
-	call->a_args.lock.fl.fl_flags |= FL_SLEEP;
+	/* Set notifier function for VFS, kABI toggle to let the caller
+	 know that f_lmops has the grant callback, and init args */
+	call->a_args.lock.fl.fl_flags |= FL_SLEEP | FL_GRANT;
 	call->a_args.lock.fl.fl_lmops = &nlmsvc_lock_operations;
 	call->a_args.cookie = *cookie;	/* see above */
 
 	dprintk("lockd: created block %p...\n", block);
 
+	if (conf) {
+		block->b_fl = kzalloc(sizeof(struct file_lock), GFP_KERNEL);
+		if (!block->b_fl)
+			goto failed_free;
+	}
 	/* Create and initialize the block */
 	block->b_daemon = rqstp->rq_server;
 	block->b_host   = host;
@@ -265,6 +271,7 @@ static void nlmsvc_free_block(struct kref *kref)
 	nlmsvc_freegrantargs(block->b_call);
 	nlm_release_call(block->b_call);
 	nlm_release_file(block->b_file);
+	kfree(block->b_fl);
 	kfree(block);
 }
 
@@ -350,6 +357,31 @@ static void nlmsvc_freegrantargs(struct nlm_rqst *call)
 }
 
 /*
+ * Deferred lock request handling for non-blocking lock
+ */
+static u32
+nlmsvc_defer_lock_rqst(struct svc_rqst *rqstp, struct nlm_block *block)
+{
+	u32 status = nlm_lck_denied_nolocks;
+
+	block->b_flags |= B_QUEUED;
+
+	nlmsvc_insert_block(block, NLM_TIMEOUT);
+
+	block->b_cache_req = &rqstp->rq_chandle;
+	if (rqstp->rq_chandle.defer) {
+		block->b_deferred_req =
+			rqstp->rq_chandle.defer(block->b_cache_req);
+		if (block->b_deferred_req != NULL)
+			status = nlm_drop_reply;
+	}
+	dprintk("lockd: nlmsvc_defer_lock_rqst block %p flags %d status %d\n",
+		block, block->b_flags, status);
+
+	return status;
+}
+
+/*
  * Attempt to establish a lock, and if it can't be granted, block it
  * if required.
  */
@@ -357,7 +389,7 @@ u32
 nlmsvc_lock(struct svc_rqst *rqstp, struct nlm_file *file,
 			struct nlm_lock *lock, int wait, struct nlm_cookie *cookie)
 {
-	struct nlm_block	*block, *newblock = NULL;
+	struct nlm_block	*block = NULL;
 	int			error;
 	u32			ret;
 
@@ -370,29 +402,58 @@ nlmsvc_lock(struct svc_rqst *rqstp, struct nlm_file *file,
 				wait);
 
 
-	lock->fl.fl_flags &= ~FL_SLEEP;
-again:
 	/* Lock file against concurrent access */
 	down(&file->f_sema);
-	/* Get existing block (in case client is busy-waiting) */
+	/* Get existing block (in case client is busy-waiting)
+	 * or create new block
+	 */
 	block = nlmsvc_lookup_block(file, lock);
 	if (block == NULL) {
-		if (newblock != NULL)
-			lock = &newblock->b_call->a_args.lock;
-	} else
+		block = nlmsvc_create_block(rqstp, file, lock, cookie, 0);
+		ret = nlm_lck_denied_nolocks;
+		if (block == NULL)
+			goto out;
 		lock = &block->b_call->a_args.lock;
+	} else
+		lock->fl.fl_flags &= ~FL_SLEEP;
 
-	error = posix_lock_file(file->f_file, &lock->fl);
-	lock->fl.fl_flags &= ~FL_SLEEP;
+	if (block->b_flags & B_QUEUED) {
+		dprintk("lockd: nlmsvc_lock deferred block %p flags %d\n",
+							block, block->b_flags);
+		if (block->b_granted) {
+			nlmsvc_unlink_block(block);
+			ret = nlm_granted;
+			goto out;
+		}
+		if (block->b_flags & B_TIMED_OUT) {
+			nlmsvc_unlink_block(block);
+			ret = nlm_lck_denied;
+			goto out;
+		}
+		ret = nlm_drop_reply;
+		goto out;
+	}
 
-	dprintk("lockd: posix_lock_file returned %d\n", error);
+	if (!wait)
+		lock->fl.fl_flags &= ~FL_SLEEP;
+	error = vfs_lock_file(file->f_file, F_SETLK, &lock->fl, NULL);
+	lock->fl.fl_flags &= ~FL_SLEEP;
 
+	dprintk("lockd: vfs_lock_file returned %d\n", error);
 	switch(error) {
 		case 0:
 			ret = nlm_granted;
 			goto out;
 		case -EAGAIN:
+			ret = nlm_lck_denied;
 			break;
+		case -EINPROGRESS:
+			if (wait)
+				break;
+			/* Filesystem lock operation is in progress
+			   Add it to the queue waiting for callback */
+			ret = nlmsvc_defer_lock_rqst(rqstp, block);
+			goto out;
 		case -EDEADLK:
 			ret = nlm_deadlock;
 			goto out;
@@ -406,26 +467,11 @@ again:
 		goto out;
 
 	ret = nlm_lck_blocked;
-	if (block != NULL)
-		goto out;
-
-	/* If we don't have a block, create and initialize it. Then
-	 * retry because we may have slept in kmalloc. */
-	/* We have to release f_sema as nlmsvc_create_block may try to
-	 * to claim it while doing host garbage collection */
-	if (newblock == NULL) {
-		up(&file->f_sema);
-		dprintk("lockd: blocking on this lock (allocating).\n");
-		if (!(newblock = nlmsvc_create_block(rqstp, file, lock, cookie)))
-			return nlm_lck_denied_nolocks;
-		goto again;
-	}
 
 	/* Append to list of blocked */
-	nlmsvc_insert_block(newblock, NLM_NEVER);
+	nlmsvc_insert_block(block, NLM_NEVER);
 out:
 	up(&file->f_sema);
-	nlmsvc_release_block(newblock);
 	nlmsvc_release_block(block);
 	dprintk("lockd: nlmsvc_lock returned %u\n", ret);
 	return ret;
@@ -435,9 +481,14 @@ out:
  * Test for presence of a conflicting lock.
  */
 u32
-nlmsvc_testlock(struct nlm_file *file, struct nlm_lock *lock,
-				       struct nlm_lock *conflock)
+nlmsvc_testlock(struct svc_rqst *rqstp, struct nlm_file *file,
+		struct nlm_lock *lock, struct nlm_lock *conflock,
+		struct nlm_cookie *cookie)
 {
+	struct nlm_block 	*block = NULL;
+	int			error;
+	__be32			ret;
+
 	dprintk("lockd: nlmsvc_testlock(%s/%ld, ty=%d, %Ld-%Ld)\n",
 				file->f_file->f_dentry->d_inode->i_sb->s_id,
 				file->f_file->f_dentry->d_inode->i_ino,
@@ -445,18 +496,62 @@ nlmsvc_testlock(struct nlm_file *file, struct nlm_lock *lock,
 				(long long)lock->fl.fl_start,
 				(long long)lock->fl.fl_end);
 
-	if (posix_test_lock(file->f_file, &lock->fl, &conflock->fl)) {
-		dprintk("lockd: conflicting lock(ty=%d, %Ld-%Ld)\n",
-				conflock->fl.fl_type,
-				(long long)conflock->fl.fl_start,
-				(long long)conflock->fl.fl_end);
-		conflock->caller = "somehost";	/* FIXME */
-		conflock->oh.len = 0;		/* don't return OH info */
-		conflock->svid = conflock->fl.fl_pid;
-		return nlm_lck_denied;
+	/* Get existing block (in case client is busy-waiting) */
+	block = nlmsvc_lookup_block(file, lock);
+
+	if (block == NULL) {
+		block = nlmsvc_create_block(rqstp, file, lock, cookie, 1);
+		if (block == NULL)
+			return nlm_granted;
+	}
+	if (block->b_flags & B_QUEUED) {
+		dprintk("lockd: nlmsvc_testlock deferred block %p flags %d fl %p\n",
+			block, block->b_flags, block->b_fl);
+		if (block->b_flags & B_TIMED_OUT) {
+			nlmsvc_unlink_block(block);
+			return nlm_lck_denied;
+		}
+		if (block->b_flags & B_GOT_CALLBACK) {
+			if (block->b_fl != NULL
+					&& block->b_fl->fl_type != F_UNLCK) {
+				lock->fl = *block->b_fl;
+				goto conf_lock;
+			} else {
+				nlmsvc_unlink_block(block);
+				return nlm_granted;
+			}
+		}
+		return nlm_drop_reply;
+	}
+
+	error = vfs_test_lock(file->f_file, &lock->fl);
+	if (error == -EINPROGRESS)
+		return nlmsvc_defer_lock_rqst(rqstp, block);
+	if (error) {
+		ret = nlm_lck_denied_nolocks;
+		goto out;
+	}
+	if (lock->fl.fl_type == F_UNLCK) {
+		ret = nlm_granted;
+		goto out;
 	}
 
-	return nlm_granted;
+conf_lock:
+	dprintk("lockd: conflicting lock(ty=%d, %Ld-%Ld)\n",
+		lock->fl.fl_type, (long long)lock->fl.fl_start,
+		(long long)lock->fl.fl_end);
+	conflock->caller = "somehost";	/* FIXME */
+	conflock->len = strlen(conflock->caller);
+	conflock->oh.len = 0;		/* don't return OH info */
+	conflock->svid = lock->fl.fl_pid;
+	conflock->fl.fl_type = lock->fl.fl_type;
+	conflock->fl.fl_start = lock->fl.fl_start;
+	conflock->fl.fl_end = lock->fl.fl_end;
+	ret = nlm_lck_denied;
+out:
+	if (block)
+		nlmsvc_release_block(block);
+	return ret;
 }
 
 /*
@@ -482,7 +577,7 @@ nlmsvc_unlock(struct nlm_file *file, struct nlm_lock *lock)
 	nlmsvc_cancel_blocked(file, lock);
 
 	lock->fl.fl_type = F_UNLCK;
-	error = posix_lock_file(file->f_file, &lock->fl);
+	error = vfs_lock_file(file->f_file, F_SETLK, &lock->fl, NULL);
 
 	return (error < 0)? nlm_lck_denied_nolocks : nlm_granted;
 }
@@ -511,6 +606,8 @@ nlmsvc_cancel_blocked(struct nlm_file *file, struct nlm_lock *lock)
 	block = nlmsvc_lookup_block(file, lock);
 	up(&file->f_sema);
 	if (block != NULL) {
+		vfs_cancel_lock(block->b_file->f_file,
+			&block->b_call->a_args.lock.fl);
 		status = nlmsvc_unlink_block(block);
 		nlmsvc_release_block(block);
 	}
@@ -518,6 +615,64 @@ nlmsvc_cancel_blocked(struct nlm_file *file, struct nlm_lock *lock)
 }
 
 /*
+ * This is a callback from the filesystem for VFS file lock requests.
+ * It will be used if fl_grant is defined and the filesystem can not
+ * respond to the request immediately.
+ * For GETLK request it will copy the reply to the nlm_block.
+ * For SETLK or SETLKW request it will get the local posix lock.
+ * In all cases it will move the block to the head of nlm_blocked q where
+ * nlmsvc_retry_blocked() can send back a reply for SETLKW or revisit the
+ * deferred rpc for GETLK and SETLK.
+ */
+static void
+nlmsvc_update_deferred_block(struct nlm_block *block, struct file_lock *conf,
+			     int result)
+{
+	block->b_flags |= B_GOT_CALLBACK;
+	if (result == 0)
+		block->b_granted = 1;
+	else
+		block->b_flags |= B_TIMED_OUT;
+	if (conf) {
+		if (block->b_fl)
+			locks_copy_lock(block->b_fl, conf);
+	}
+}
+
+static int nlmsvc_grant_deferred(struct file_lock *fl, struct file_lock *conf,
+					int result)
+{
+	struct nlm_block *block;
+	int rc = -ENOENT;
+
+	lock_kernel();
+	for (block = nlm_blocked; block; block = block->b_next) {
+		if (nlm_compare_locks(&block->b_call->a_args.lock.fl, fl)) {
+			dprintk("lockd: nlmsvc_notify_blocked block %p flags %d\n",
+							block, block->b_flags);
+			if (block->b_flags & B_QUEUED) {
+				if (block->b_flags & B_TIMED_OUT) {
+					rc = -ENOLCK;
+					break;
+				}
+				nlmsvc_update_deferred_block(block, conf,
+							     result);
+			} else if (result == 0)
+				block->b_granted = 1;
+
+			nlmsvc_insert_block(block, 0);
+			svc_wake_up(block->b_daemon);
+			rc = 0;
+			break;
+		}
+	}
+	unlock_kernel();
+	if (rc == -ENOENT)
+		printk(KERN_WARNING "lockd: grant for unknown block\n");
+	return rc;
+}
+
+/*
  * Unblock a blocked lock request. This is a callback invoked from the
  * VFS layer when a lock on which we blocked is removed.
  *
@@ -549,6 +704,7 @@ static int nlmsvc_same_owner(struct file_lock *fl1, struct file_lock *fl2)
 struct lock_manager_operations nlmsvc_lock_operations = {
 	.fl_compare_owner = nlmsvc_same_owner,
 	.fl_notify = nlmsvc_notify_blocked,
+	.fl_grant = nlmsvc_grant_deferred,
 };
 
 /*
@@ -571,6 +727,8 @@ nlmsvc_grant_blocked(struct nlm_block *block)
 
 	dprintk("lockd: grant blocked lock %p\n", block);
 
+	kref_get(&block->b_count);
+
 	/* Unlink block request from list */
 	nlmsvc_unlink_block(block);
 
@@ -584,20 +742,23 @@ nlmsvc_grant_blocked(struct nlm_block *block)
 
 	/* Try the lock operation again */
 	lock->fl.fl_flags |= FL_SLEEP;
-	error = posix_lock_file(file->f_file, &lock->fl);
+	error = vfs_lock_file(file->f_file, F_SETLK, &lock->fl, NULL);
 	lock->fl.fl_flags &= ~FL_SLEEP;
 
 	switch (error) {
 	case 0:
 		break;
 	case -EAGAIN:
-		dprintk("lockd: lock still blocked\n");
+	case -EINPROGRESS:
+		dprintk("lockd: lock still blocked error %d\n", error);
 		nlmsvc_insert_block(block, NLM_NEVER);
+		nlmsvc_release_block(block);
 		return;
 	default:
 		printk(KERN_WARNING "lockd: unexpected error %d in %s!\n",
 				-error, __FUNCTION__);
 		nlmsvc_insert_block(block, 10 * HZ);
+		nlmsvc_release_block(block);
 		return;
 	}
 
@@ -686,6 +847,23 @@ nlmsvc_grant_reply(struct svc_rqst *rqstp, struct nlm_cookie *cookie, u32 status
 	nlmsvc_release_block(block);
 }
 
+/* Helper function to handle retry of a deferred block.
+ * If it is a blocking lock, call grant_blocked.
+ * For a non-blocking lock or test lock, revisit the request.
+ */
+static void
+retry_deferred_block(struct nlm_block *block)
+{
+	if (!(block->b_flags & B_GOT_CALLBACK))
+		block->b_flags |= B_TIMED_OUT;
+	nlmsvc_insert_block(block, NLM_TIMEOUT);
+	dprintk("revisit block %p flags %d\n",	block, block->b_flags);
+	if (block->b_deferred_req) {
+		block->b_deferred_req->revisit(block->b_deferred_req, 0);
+		block->b_deferred_req = NULL;
+	}
+}
+
 /*
  * Retry all blocked locks that have been notified. This is where lockd
  * picks up locks that can be granted, or grant notifications that must
@@ -706,9 +884,12 @@ nlmsvc_retry_blocked(void)
 			break;
 		dprintk("nlmsvc_retry_blocked(%p, when=%ld)\n",
 			block, block->b_when);
-		kref_get(&block->b_count);
-		nlmsvc_grant_blocked(block);
-		nlmsvc_release_block(block);
+		if (block->b_flags & B_QUEUED) {
+			dprintk("nlmsvc_retry_blocked delete block (%p, granted=%d, flags=%d)\n",
+				block, block->b_granted, block->b_flags);
+			retry_deferred_block(block);
+		} else
+			nlmsvc_grant_blocked(block);
 	}
 
 	if ((block = nlm_blocked) && block->b_when != NLM_NEVER)
diff --git a/fs/lockd/svcproc.c b/fs/lockd/svcproc.c
index dbb66a3..dc6e5f7 100644
--- a/fs/lockd/svcproc.c
+++ b/fs/lockd/svcproc.c
@@ -33,6 +33,7 @@ cast_to_nlm(u32 status, u32 vers)
 		case nlm_lck_denied_nolocks:
 		case nlm_lck_blocked:
 		case nlm_lck_denied_grace_period:
+		case nlm_drop_reply:
 			break;
 		case nlm4_deadlock:
 			status = nlm_lck_denied;
@@ -59,7 +60,7 @@ nlmsvc_retrieve_args(struct svc_rqst *rqstp, struct nlm_args *argp,
 	struct nlm_host		*host = NULL;
 	struct nlm_file		*file = NULL;
 	struct nlm_lock		*lock = &argp->lock;
-	u32			error;
+	u32			error = 0;
 
 	/* nfsd callbacks must have been installed for this procedure */
 	if (!nlmsvc_ops)
@@ -81,6 +82,9 @@ nlmsvc_retrieve_args(struct svc_rqst *rqstp, struct nlm_args *argp,
 		lock->fl.fl_file  = file->f_file;
 		lock->fl.fl_owner = (fl_owner_t) host;
 		lock->fl.fl_lmops = &nlmsvc_lock_operations;
+		/* kABI toggle to let the caller know that f_lmops has
+		the grant callback. */
+		lock->fl.fl_flags |= FL_GRANT;
 	}
 
 	return 0;
@@ -88,6 +92,8 @@ nlmsvc_retrieve_args(struct svc_rqst *rqstp, struct nlm_args *argp,
 no_locks:
 	if (host)
 		nlm_release_host(host);
+	if (error)
+		return error;
 	return nlm_lck_denied_nolocks;
 }
 
@@ -122,10 +128,14 @@ nlmsvc_proc_test(struct svc_rqst *rqstp, struct nlm_args *argp,
 
 	/* Obtain client and file */
 	if ((resp->status = nlmsvc_retrieve_args(rqstp, argp, &host, &file)))
-		return rpc_success;
+		return resp->status == nlm_drop_reply ?
+			rpc_drop_reply :rpc_success;
 
 	/* Now check for conflicting locks */
-	resp->status = cast_status(nlmsvc_testlock(file, &argp->lock, &resp->lock));
+	resp->status = cast_status(nlmsvc_testlock(rqstp, file, &argp->lock,
+				   &resp->lock, &resp->cookie));
+	if (resp->status == nlm_drop_reply)
+		return rpc_drop_reply;
 
 	dprintk("lockd: TEST          status %d vers %d\n",
 		ntohl(resp->status), rqstp->rq_vers);
@@ -153,7 +163,8 @@ nlmsvc_proc_lock(struct svc_rqst *rqstp, struct nlm_args *argp,
 
 	/* Obtain client and file */
 	if ((resp->status = nlmsvc_retrieve_args(rqstp, argp, &host, &file)))
-		return rpc_success;
+		return resp->status == nlm_drop_reply ?
+			rpc_drop_reply :rpc_success;
 
 #if 0
 	/* If supplied state doesn't match current state, we assume it's
@@ -170,6 +181,8 @@ nlmsvc_proc_lock(struct svc_rqst *rqstp, struct nlm_args *argp,
 	/* Now try to lock the file */
 	resp->status = cast_status(nlmsvc_lock(rqstp, file, &argp->lock,
 					       argp->block, &argp->cookie));
+	if (resp->status == nlm_drop_reply)
+		return rpc_drop_reply;
 
 	dprintk("lockd: LOCK          status %d\n", ntohl(resp->status));
 	nlm_release_host(host);
@@ -196,7 +209,8 @@ nlmsvc_proc_cancel(struct svc_rqst *rqstp, struct nlm_args *argp,
 
 	/* Obtain client and file */
 	if ((resp->status = nlmsvc_retrieve_args(rqstp, argp, &host, &file)))
-		return rpc_success;
+		return resp->status == nlm_drop_reply ?
+			rpc_drop_reply :rpc_success;
 
 	/* Try to cancel request. */
 	resp->status = cast_status(nlmsvc_cancel_blocked(file, &argp->lock));
@@ -229,7 +243,8 @@ nlmsvc_proc_unlock(struct svc_rqst *rqstp, struct nlm_args *argp,
 
 	/* Obtain client and file */
 	if ((resp->status = nlmsvc_retrieve_args(rqstp, argp, &host, &file)))
-		return rpc_success;
+		return resp->status == nlm_drop_reply ?
+			rpc_drop_reply :rpc_success;
 
 	/* Now try to remove the lock */
 	resp->status = cast_status(nlmsvc_unlock(file, &argp->lock));
@@ -366,7 +381,8 @@ nlmsvc_proc_share(struct svc_rqst *rqstp, struct nlm_args *argp,
 
 	/* Obtain client and file */
 	if ((resp->status = nlmsvc_retrieve_args(rqstp, argp, &host, &file)))
-		return rpc_success;
+		return resp->status == nlm_drop_reply ?
+			rpc_drop_reply :rpc_success;
 
 	/* Now try to create the share */
 	resp->status = cast_status(nlmsvc_share_file(host, file, argp));
@@ -399,7 +415,8 @@ nlmsvc_proc_unshare(struct svc_rqst *rqstp, struct nlm_args *argp,
 
 	/* Obtain client and file */
 	if ((resp->status = nlmsvc_retrieve_args(rqstp, argp, &host, &file)))
-		return rpc_success;
+		return resp->status == nlm_drop_reply ?
+			rpc_drop_reply :rpc_success;
 
 	/* Now try to unshare the file */
 	resp->status = cast_status(nlmsvc_unshare_file(host, file, argp));
diff --git a/fs/lockd/svcsubs.c b/fs/lockd/svcsubs.c
index 01b4db9..54ff71d 100644
--- a/fs/lockd/svcsubs.c
+++ b/fs/lockd/svcsubs.c
@@ -135,12 +135,6 @@ out_unlock:
 
 out_free:
 	kfree(file);
-#ifdef CONFIG_LOCKD_V4
-	if (nfserr == 1)
-		nfserr = nlm4_stale_fh;
-	else
-#endif
-	nfserr = nlm_lck_denied;
 	goto out_unlock;
 }
 
@@ -201,7 +195,8 @@ again:
 			lock.fl_type  = F_UNLCK;
 			lock.fl_start = 0;
 			lock.fl_end   = OFFSET_MAX;
-			if (posix_lock_file(file->f_file, &lock) < 0) {
+			if (vfs_lock_file(file->f_file, F_SETLK,
+					  &lock, NULL) < 0) {
 				printk("lockd: unlock failure in %s:%d\n",
 						__FILE__, __LINE__);
 				return 1;
diff --git a/fs/locks.c b/fs/locks.c
index 4cb1ab8..ee06d53 100644
--- a/fs/locks.c
+++ b/fs/locks.c
@@ -161,6 +161,9 @@ static void locks_release_private(struct file_lock *fl)
 		if (fl->fl_lmops->fl_release_private)
 			fl->fl_lmops->fl_release_private(fl);
 		fl->fl_lmops = NULL;
+		/* Clear kABI toggle which lets the caller know if
+		fl_lmops has the grant callback. */
+		fl->fl_flags &= ~FL_GRANT;
 	}
 
 }
@@ -221,6 +224,9 @@ static void locks_copy_private(struct file_lock *new, struct file_lock *fl)
 		if (fl->fl_lmops->fl_copy_lock)
 			fl->fl_lmops->fl_copy_lock(new, fl);
 		new->fl_lmops = fl->fl_lmops;
+		/* Toggle kABI switch to let the caller
+		know that fl_lmops has the grant callback.*/
+		new->fl_flags |= (fl->fl_flags & FL_GRANT);
 	}
 }
 
@@ -232,7 +238,8 @@ static void __locks_copy_lock(struct file_lock *new, const struct file_lock *fl)
 	new->fl_owner = fl->fl_owner;
 	new->fl_pid = fl->fl_pid;
 	new->fl_file = NULL;
-	new->fl_flags = fl->fl_flags;
+	/* Clear the kABI toggle switch since the fl_lmops is cleared. */
+	new->fl_flags = fl->fl_flags & ~FL_GRANT;
 	new->fl_type = fl->fl_type;
 	new->fl_start = fl->fl_start;
 	new->fl_end = fl->fl_end;
@@ -679,10 +686,14 @@ posix_test_lock(struct file *filp, struct file_lock *fl,
 			break;
 	}
 	if (cfl) {
-		__locks_copy_lock(conflock, cfl);
+		if (conflock)
+			__locks_copy_lock(conflock, cfl);
+		else
+			__locks_copy_lock(fl, cfl);
 		unlock_kernel();
 		return 1;
-	}
+	} else
+		fl->fl_type = F_UNLCK;
 	unlock_kernel();
 	return 0;
 }
@@ -1614,12 +1625,64 @@ asmlinkage long sys_flock(unsigned int fd, unsigned int cmd)
 	return error;
 }
 
+/**
+ * vfs_test_lock - test file byte range lock
+ * @filp: The file to test lock for
+ * @fl: The lock to test
+ * @conf: Place to return a copy of the conflicting lock, if found
+ *
+ * Returns -ERRNO on failure.  Indicates presence of conflicting lock by
+ * setting conf->fl_type to something other than F_UNLCK.
+ */
+int vfs_test_lock(struct file *filp, struct file_lock *fl)
+{
+	fl->fl_type = F_UNLCK;
+	if (filp->f_op && filp->f_op->lock)
+		return filp->f_op->lock(filp, F_GETLK, fl);
+	posix_test_lock(filp, fl, NULL);
+	return 0;
+}
+EXPORT_SYMBOL_GPL(vfs_test_lock);
+
+static int posix_lock_to_flock(struct flock *flock, struct file_lock *fl)
+{
+	flock->l_pid = fl->fl_pid;
+#if BITS_PER_LONG == 32
+	/*
+	 * Make sure we can represent the posix lock via
+	 * legacy 32bit flock.
+	 */
+	if (fl->fl_start > OFFT_OFFSET_MAX)
+		return -EOVERFLOW;
+	if (fl->fl_end != OFFSET_MAX && fl->fl_end > OFFT_OFFSET_MAX)
+		return -EOVERFLOW;
+#endif
+	flock->l_start = fl->fl_start;
+	flock->l_len = fl->fl_end == OFFSET_MAX ? 0 :
+		fl->fl_end - fl->fl_start + 1;
+	flock->l_whence = 0;
+	flock->l_type = fl->fl_type;
+	return 0;
+}
+
+#if BITS_PER_LONG == 32
+static void posix_lock_to_flock64(struct flock64 *flock, struct file_lock *fl)
+{
+	flock->l_pid = fl->fl_pid;
+	flock->l_start = fl->fl_start;
+	flock->l_len = fl->fl_end == OFFSET_MAX ? 0 :
+		fl->fl_end - fl->fl_start + 1;
+	flock->l_whence = 0;
+	flock->l_type = fl->fl_type;
+}
+#endif
+
 /* Report the first existing lock that would conflict with l.
  * This implements the F_GETLK command of fcntl().
  */
 int fcntl_getlk(struct file *filp, struct flock __user *l)
 {
-	struct file_lock *fl, cfl, file_lock;
+	struct file_lock file_lock;
 	struct flock flock;
 	int error;
 
@@ -1634,38 +1697,15 @@ int fcntl_getlk(struct file *filp, struct flock __user *l)
 	if (error)
 		goto out;
 
-	if (filp->f_op && filp->f_op->lock) {
-		error = filp->f_op->lock(filp, F_GETLK, &file_lock);
-		if (file_lock.fl_ops && file_lock.fl_ops->fl_release_private)
-			file_lock.fl_ops->fl_release_private(&file_lock);
-		if (error < 0)
-			goto out;
-		else
-		  fl = (file_lock.fl_type == F_UNLCK ? NULL : &file_lock);
-	} else {
-		fl = (posix_test_lock(filp, &file_lock, &cfl) ? &cfl : NULL);
-	}
+	error = vfs_test_lock(filp, &file_lock);
+	if (error)
+		goto out;
  
-	flock.l_type = F_UNLCK;
-	if (fl != NULL) {
-		flock.l_pid = fl->fl_pid;
-#if BITS_PER_LONG == 32
-		/*
-		 * Make sure we can represent the posix lock via
-		 * legacy 32bit flock.
-		 */
-		error = -EOVERFLOW;
-		if (fl->fl_start > OFFT_OFFSET_MAX)
-			goto out;
-		if ((fl->fl_end != OFFSET_MAX)
-		    && (fl->fl_end > OFFT_OFFSET_MAX))
+	flock.l_type = file_lock.fl_type;
+	if (file_lock.fl_type != F_UNLCK) {
+		error = posix_lock_to_flock(&flock, &file_lock);
+		if (error)
 			goto out;
-#endif
-		flock.l_start = fl->fl_start;
-		flock.l_len = fl->fl_end == OFFSET_MAX ? 0 :
-			fl->fl_end - fl->fl_start + 1;
-		flock.l_whence = 0;
-		flock.l_type = fl->fl_type;
 	}
 	error = -EFAULT;
 	if (!copy_to_user(l, &flock, sizeof(flock)))
@@ -1674,6 +1714,42 @@ out:
 	return error;
 }
 
+/**
+ * vfs_lock_file - file byte range lock
+ * @filp: The file to apply the lock to
+ * @cmd: type of locking operation (F_SETLK, F_GETLK, etc.)
+ * @fl: The lock to be applied
+ * @conf: Place to return a copy of the conflicting lock, if found.
+ *
+ * To avoid blocking kernel daemons, such as lockd, that need to acquire POSIX
+ * locks, the ->lock() interface may return asynchronously, before the lock has
+ * been granted or denied by the underlying filesystem, if (and only if)
+ * fl_grant is set. Callers expecting ->lock() to return asynchronously
+ * will only use F_SETLK, not F_SETLKW; they will set FL_SLEEP if (and only if)
+ * the request is for a blocking lock. When ->lock() does return asynchronously,
+ * it must return -EINPROGRESS, and call ->fl_grant() when the lock
+ * request completes.
+ * If the request is for non-blocking lock the file system should return
+ * -EINPROGRESS then try to get the lock and call the callback routine with
+ * the result. If the request timed out the callback routine will return a
+ * nonzero return code and the file system should release the lock. The file
+ * system is also responsible to keep a corresponding posix lock when it
+ * grants a lock so the VFS can find out which locks are locally held and do
+ * the correct lock cleanup when required.
+ * The underlying filesystem must not drop the kernel lock or call
+ * ->fl_grant() before returning to the caller with a -EINPROGRESS
+ * return code.
+ */
+int vfs_lock_file(struct file *filp, unsigned int cmd, struct file_lock *fl,
+		  struct file_lock *conf)
+{
+	if (filp->f_op && filp->f_op->lock)
+		return filp->f_op->lock(filp, cmd, fl);
+	else
+		return posix_lock_file_conf(filp, fl, conf);
+}
+EXPORT_SYMBOL_GPL(vfs_lock_file);
+
 /* Apply the lock described by l to an open file descriptor.
  * This implements both the F_SETLK and F_SETLKW commands of fcntl().
  */
@@ -1736,21 +1812,17 @@ again:
 	if (error)
 		goto out;
 
-	if (filp->f_op && filp->f_op->lock != NULL)
-		error = filp->f_op->lock(filp, cmd, file_lock);
-	else {
-		for (;;) {
-			error = posix_lock_file(filp, file_lock);
-			if ((error != -EAGAIN) || (cmd == F_SETLK))
-				break;
-			error = wait_event_interruptible(file_lock->fl_wait,
-					!file_lock->fl_next);
-			if (!error)
-				continue;
-
-			locks_delete_block(file_lock);
+	for (;;) {
+		error = vfs_lock_file(filp, cmd, file_lock, NULL);
+		if (error != -EAGAIN || cmd == F_SETLK)
 			break;
-		}
+		error = wait_event_interruptible(file_lock->fl_wait,
+				!file_lock->fl_next);
+		if (!error)
+			continue;
+
+		locks_delete_block(file_lock);
+		break;
 	}
 
 	/*
@@ -1773,7 +1845,7 @@ out:
  */
 int fcntl_getlk64(struct file *filp, struct flock64 __user *l)
 {
-	struct file_lock *fl, cfl, file_lock;
+	struct file_lock file_lock;
 	struct flock64 flock;
 	int error;
 
@@ -1788,27 +1860,14 @@ int fcntl_getlk64(struct file *filp, struct flock64 __user *l)
 	if (error)
 		goto out;
 
-	if (filp->f_op && filp->f_op->lock) {
-		error = filp->f_op->lock(filp, F_GETLK, &file_lock);
-		if (file_lock.fl_ops && file_lock.fl_ops->fl_release_private)
-			file_lock.fl_ops->fl_release_private(&file_lock);
-		if (error < 0)
-			goto out;
-		else
-		  fl = (file_lock.fl_type == F_UNLCK ? NULL : &file_lock);
-	} else {
-		fl = (posix_test_lock(filp, &file_lock, &cfl) ? &cfl : NULL);
-	}
- 
-	flock.l_type = F_UNLCK;
-	if (fl != NULL) {
-		flock.l_pid = fl->fl_pid;
-		flock.l_start = fl->fl_start;
-		flock.l_len = fl->fl_end == OFFSET_MAX ? 0 :
-			fl->fl_end - fl->fl_start + 1;
-		flock.l_whence = 0;
-		flock.l_type = fl->fl_type;
-	}
+	error = vfs_test_lock(filp, &file_lock);
+	if (error)
+		goto out;
+
+	flock.l_type = file_lock.fl_type;
+	if (file_lock.fl_type != F_UNLCK)
+		posix_lock_to_flock64(&flock, &file_lock);
+
 	error = -EFAULT;
 	if (!copy_to_user(l, &flock, sizeof(flock)))
 		error = 0;
@@ -1879,21 +1938,17 @@ again:
 	if (error)
 		goto out;
 
-	if (filp->f_op && filp->f_op->lock != NULL)
-		error = filp->f_op->lock(filp, cmd, file_lock);
-	else {
-		for (;;) {
-			error = posix_lock_file(filp, file_lock);
-			if ((error != -EAGAIN) || (cmd == F_SETLK64))
-				break;
-			error = wait_event_interruptible(file_lock->fl_wait,
-					!file_lock->fl_next);
-			if (!error)
-				continue;
-
-			locks_delete_block(file_lock);
+	for (;;) {
+		error = vfs_lock_file(filp, cmd, file_lock, NULL);
+		if (error != -EAGAIN || cmd == F_SETLK64)
 			break;
-		}
+		error = wait_event_interruptible(file_lock->fl_wait,
+				!file_lock->fl_next);
+		if (!error)
+			continue;
+
+		locks_delete_block(file_lock);
+		break;
 	}
 
 	/*
@@ -1938,10 +1993,7 @@ void locks_remove_posix(struct file *filp, fl_owner_t owner)
 	lock.fl_ops = NULL;
 	lock.fl_lmops = NULL;
 
-	if (filp->f_op && filp->f_op->lock != NULL)
-		filp->f_op->lock(filp, F_SETLK, &lock);
-	else
-		posix_lock_file(filp, &lock);
+	vfs_lock_file(filp, F_SETLK, &lock, NULL);
 
 	if (lock.fl_ops && lock.fl_ops->fl_release_private)
 		lock.fl_ops->fl_release_private(&lock);
@@ -2018,6 +2070,21 @@ posix_unblock_lock(struct file *filp, struct file_lock *waiter)
 
 EXPORT_SYMBOL(posix_unblock_lock);
 
+/**
+ * vfs_cancel_lock - file byte range unblock lock
+ * @filp: The file to apply the unblock to
+ * @fl: The lock to be unblocked
+ *
+ * Used by lock managers to cancel blocked requests
+ */
+int vfs_cancel_lock(struct file *filp, struct file_lock *fl)
+{
+	if (filp->f_op && filp->f_op->lock)
+		return filp->f_op->lock(filp, F_CANCELLK, fl);
+	return 0;
+}
+EXPORT_SYMBOL_GPL(vfs_cancel_lock);
+
 static void lock_get_status(char* out, struct file_lock *fl, int id, char *pfx)
 {
 	struct inode *inode = NULL;
diff --git a/fs/nfs/file.c b/fs/nfs/file.c
index 81a73f7..d949d4a 100644
--- a/fs/nfs/file.c
+++ b/fs/nfs/file.c
@@ -421,19 +421,13 @@ out_swapfile:
 
 static int do_getlk(struct file *filp, int cmd, struct file_lock *fl)
 {
-	struct file_lock cfl;
 	struct inode *inode = filp->f_mapping->host;
 	int status = 0;
 
 	lock_kernel();
 	/* Try local locking first */
-	if (posix_test_lock(filp, fl, &cfl)) {
-		fl->fl_start = cfl.fl_start;
-		fl->fl_end = cfl.fl_end;
-		fl->fl_type = cfl.fl_type;
-		fl->fl_pid = cfl.fl_pid;
+	if (posix_test_lock(filp, fl, NULL))
 		goto out;
-	}
 
 	if (nfs_have_delegation(inode, FMODE_READ))
 		goto out_noconflict;
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index 577f68f..3d47bc6 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -3169,6 +3169,8 @@ static int _nfs4_proc_getlk(struct nfs4_state *state, int cmd, struct file_lock
 			status = 0;
 	}
 out:
+	if (request->fl_ops)
+		request->fl_ops->fl_release_private(request);
 	up_read(&clp->cl_sem);
 	return status;
 }
diff --git a/fs/nfsd/lockd.c b/fs/nfsd/lockd.c
index 7b889ff..d160080 100644
--- a/fs/nfsd/lockd.c
+++ b/fs/nfsd/lockd.c
@@ -39,18 +39,19 @@ nlm_fopen(struct svc_rqst *rqstp, struct nfs_fh *f, struct file **filp)
 	fh_put(&fh);
 	rqstp->rq_client = NULL;
 	exp_readunlock();
- 	/* nlm and nfsd don't share error codes.
-	 * we invent: 0 = no error
-	 *            1 = stale file handle
-	 *	      2 = other error
-	 */
+	/* We return nlm error codes as nlm doesn't know
+	 * about nfsd, but nfsd does know about nlm.. */
 	switch (nfserr) {
 	case nfs_ok:
 		return 0;
+	case nfserr_dropit:
+		return nlm_drop_reply;
+#ifdef CONFIG_LOCKD_V4
 	case nfserr_stale:
-		return 1;
+		return nlm4_stale_fh;
+#endif
 	default:
-		return 2;
+		return nlm_lck_denied;
 	}
 }
 
diff --git a/fs/nfsd/nfs4state.c b/fs/nfsd/nfs4state.c
index e280ad4..933f153 100644
--- a/fs/nfsd/nfs4state.c
+++ b/fs/nfsd/nfs4state.c
@@ -50,6 +50,7 @@
 #include <linux/nfsd/xdr4.h>
 #include <linux/namei.h>
 #include <linux/mutex.h>
+#include <linux/lockd/bind.h>
 
 #define NFSDDBG_FACILITY                NFSDDBG_PROC
 
@@ -2650,6 +2651,7 @@ nfsd4_lock(struct svc_rqst *rqstp, struct svc_fh *current_fh, struct nfsd4_lock
 	struct file_lock conflock;
 	int status = 0;
 	unsigned int strhashval;
+	unsigned int cmd;
 
 	dprintk("NFSD: nfsd4_lock: start=%Ld length=%Ld\n",
 		(long long) lock->lk_offset,
@@ -2730,10 +2732,12 @@ nfsd4_lock(struct svc_rqst *rqstp, struct svc_fh *current_fh, struct nfsd4_lock
 		case NFS4_READ_LT:
 		case NFS4_READW_LT:
 			file_lock.fl_type = F_RDLCK;
+			cmd = F_SETLK;
 		break;
 		case NFS4_WRITE_LT:
 		case NFS4_WRITEW_LT:
 			file_lock.fl_type = F_WRLCK;
+			cmd = F_SETLK;
 		break;
 		default:
 			status = nfserr_inval;
@@ -2760,10 +2764,8 @@ nfsd4_lock(struct svc_rqst *rqstp, struct svc_fh *current_fh, struct nfsd4_lock
 
 	/* XXX?: Just to divert the locks_release_private at the start of
 	 * locks_copy_lock: */
-	conflock.fl_ops = NULL;
-	conflock.fl_lmops = NULL;
-	status = posix_lock_file_conf(filp, &file_lock, &conflock);
-	dprintk("NFSD: nfsd4_lock: posix_lock_file_conf status %d\n",status);
+	locks_init_lock(&conflock);
+	status = vfs_lock_file(filp, cmd, &file_lock, &conflock);
 	switch (-status) {
 	case 0: /* success! */
 		update_stateid(&lock_stp->st_stateid);
@@ -2779,7 +2781,7 @@ nfsd4_lock(struct svc_rqst *rqstp, struct svc_fh *current_fh, struct nfsd4_lock
 		status = nfserr_deadlock;
 		break;
 	default:        
-		dprintk("NFSD: nfsd4_lock: posix_lock_file_conf() failed! status %d\n",status);
+		dprintk("NFSD: nfsd4_lock: vfs_lock_file() failed! status %d\n",status);
 		status = nfserr_resource;
 		break;
 	}
@@ -2803,7 +2805,7 @@ nfsd4_lockt(struct svc_rqst *rqstp, struct svc_fh *current_fh, struct nfsd4_lock
 	struct inode *inode;
 	struct file file;
 	struct file_lock file_lock;
-	struct file_lock conflock;
+	int error;
 	int status;
 
 	if (nfs4_in_grace())
@@ -2859,18 +2861,23 @@ nfsd4_lockt(struct svc_rqst *rqstp, struct svc_fh *current_fh, struct nfsd4_lock
 
 	nfs4_transform_lock_offset(&file_lock);
 
-	/* posix_test_lock uses the struct file _only_ to resolve the inode.
+	/* vfs_test_lock uses the struct file _only_ to resolve the inode.
 	 * since LOCKT doesn't require an OPEN, and therefore a struct
-	 * file may not exist, pass posix_test_lock a struct file with
+	 * file may not exist, pass vfs_test_lock a struct file with
 	 * only the dentry:inode set.
 	 */
 	memset(&file, 0, sizeof (struct file));
 	file.f_dentry = current_fh->fh_dentry;
 
 	status = nfs_ok;
-	if (posix_test_lock(&file, &file_lock, &conflock)) {
+	error = vfs_test_lock(&file, &file_lock);
+	if (error) {
+		status = nfserrno(error);
+		goto out;
+	}
+	if (file_lock.fl_type != F_UNLCK) {
 		status = nfserr_denied;
-		nfs4_set_lock_denied(&conflock, &lockt->lt_denied);
+		nfs4_set_lock_denied(&file_lock, &lockt->lt_denied);
 	}
 out:
 	nfs4_unlock_state();
@@ -2921,9 +2928,9 @@ nfsd4_locku(struct svc_rqst *rqstp, struct svc_fh *current_fh, struct nfsd4_lock
 	/*
 	*  Try to unlock the file in the VFS.
 	*/
-	status = posix_lock_file(filp, &file_lock); 
+	status = vfs_lock_file(filp, F_SETLK, &file_lock, NULL);
 	if (status) {
-		dprintk("NFSD: nfs4_locku: posix_lock_file failed!\n");
+		dprintk("NFSD: nfs4_locku: vfs_lock_file failed!\n");
 		goto out_nfserr;
 	}
 	/*
diff --git a/include/linux/fcntl.h b/include/linux/fcntl.h
index 996f561..40b9326 100644
--- a/include/linux/fcntl.h
+++ b/include/linux/fcntl.h
@@ -3,6 +3,10 @@
 
 #include <asm/fcntl.h>
 
+/* Cancel a blocking posix lock; internal use only until we expose an
+ * asynchronous lock api to userspace: */
+#define F_CANCELLK	(F_LINUX_SPECIFIC_BASE+5)
+
 #define F_SETLEASE	(F_LINUX_SPECIFIC_BASE+0)
 #define F_GETLEASE	(F_LINUX_SPECIFIC_BASE+1)
 
diff --git a/include/linux/fs.h b/include/linux/fs.h
index 0a92925..7952faf 100644
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -749,6 +749,8 @@ extern spinlock_t files_lock;
 
 #define FL_POSIX	1
 #define FL_FLOCK	2
+#define FL_GRANT	4	/* kABI toggle to let the caller know
+				that fl_lmops has the grant callback */
 #define FL_ACCESS	8	/* not trying to lock, just looking */
 #define FL_EXISTS	16	/* when unlocking, test for existence */
 #define FL_LEASE	32	/* lease held on this file */
@@ -779,6 +781,9 @@ struct lock_manager_operations {
 	void (*fl_break)(struct file_lock *);
 	int (*fl_mylease)(struct file_lock *, struct file_lock *);
 	int (*fl_change)(struct file_lock **, int);
+#ifndef __GENKSYMS__
+	int (*fl_grant)(struct file_lock *, struct file_lock *, int);
+#endif
 };
 
 /* that will die - we need it for nfs_lock_info */
@@ -845,6 +850,10 @@ extern int posix_lock_file_conf(struct file *, struct file_lock *, struct file_l
 extern int posix_lock_file(struct file *, struct file_lock *);
 extern int posix_lock_file_wait(struct file *, struct file_lock *);
 extern int posix_unblock_lock(struct file *, struct file_lock *);
+extern int vfs_test_lock(struct file *, struct file_lock *);
+extern int vfs_lock_file(struct file *, unsigned int, struct file_lock *,
+			 struct file_lock *);
+extern int vfs_cancel_lock(struct file *filp, struct file_lock *fl);
 extern int flock_lock_file_wait(struct file *filp, struct file_lock *fl);
 extern int __break_lease(struct inode *inode, unsigned int flags);
 extern void lease_get_mtime(struct inode *, struct timespec *time);
diff --git a/include/linux/lockd/bind.h b/include/linux/lockd/bind.h
index 71d85b6..10a8322 100644
--- a/include/linux/lockd/bind.h
+++ b/include/linux/lockd/bind.h
@@ -10,6 +10,11 @@
 #define LINUX_LOCKD_BIND_H
 
 #include <linux/lockd/nlm.h>
+/* need xdr-encoded error codes too, so... */
+#include <linux/lockd/xdr.h>
+#ifdef CONFIG_LOCKD_V4
+#include <linux/lockd/xdr4.h>
+#endif
 
 /* Dummy declarations */
 struct svc_rqst;
diff --git a/include/linux/lockd/lockd.h b/include/linux/lockd/lockd.h
index 0d92c46..0f92d53 100644
--- a/include/linux/lockd/lockd.h
+++ b/include/linux/lockd/lockd.h
@@ -112,6 +112,9 @@ struct nlm_file {
  * couldn't be granted because of a conflicting lock).
  */
 #define NLM_NEVER		(~(unsigned long) 0)
+/* timeout on non-blocking call: */
+#define NLM_TIMEOUT		(7 * HZ)
+
 struct nlm_block {
 	struct kref		b_count;	/* Reference count */
 	struct nlm_block *	b_next;		/* linked list (all blocks) */
@@ -124,6 +127,13 @@ struct nlm_block {
 	unsigned char		b_queued;	/* re-queued */
 	unsigned char		b_granted;	/* VFS granted lock */
 	struct nlm_file *	b_file;		/* file in question */
+	struct cache_req *	b_cache_req;	/* deferred request handling */
+	struct file_lock *	b_fl;		/* set for GETLK */
+	struct cache_deferred_req * b_deferred_req;
+	unsigned int		b_flags;	/* block flags */
+#define B_QUEUED		1	/* lock queued */
+#define B_GOT_CALLBACK		2	/* got lock or conflicting lock */
+#define B_TIMED_OUT		4	/* too late for non-blocking lock */
 };
 
 /*
@@ -178,8 +188,9 @@ extern struct nlm_host *nlm_find_client(void);
 u32		  nlmsvc_lock(struct svc_rqst *, struct nlm_file *,
 					struct nlm_lock *, int, struct nlm_cookie *);
 u32		  nlmsvc_unlock(struct nlm_file *, struct nlm_lock *);
-u32		  nlmsvc_testlock(struct nlm_file *, struct nlm_lock *,
-					struct nlm_lock *);
+u32		  nlmsvc_testlock(struct svc_rqst *, struct nlm_file *,
+				  struct nlm_lock *, struct nlm_lock *,
+				  struct nlm_cookie *);
 u32		  nlmsvc_cancel_blocked(struct nlm_file *, struct nlm_lock *);
 unsigned long	  nlmsvc_retry_blocked(void);
 void		  nlmsvc_traverse_blocks(struct nlm_host *, struct nlm_file *,
diff --git a/include/linux/lockd/xdr.h b/include/linux/lockd/xdr.h
index bb0a0f1..66fdae3 100644
--- a/include/linux/lockd/xdr.h
+++ b/include/linux/lockd/xdr.h
@@ -13,6 +13,8 @@
 #include <linux/nfs.h>
 #include <linux/sunrpc/xdr.h>
 
+struct svc_rqst;
+
 #define NLM_MAXCOOKIELEN    	32
 #define NLM_MAXSTRLEN		1024
 
@@ -22,6 +24,8 @@
 #define	nlm_lck_blocked		__constant_htonl(NLM_LCK_BLOCKED)
 #define	nlm_lck_denied_grace_period	__constant_htonl(NLM_LCK_DENIED_GRACE_PERIOD)
 
+#define nlm_drop_reply		__constant_htonl(30000)
+
 /* Lock info passed via NLM */
 struct nlm_lock {
 	char *			caller;
diff --git a/include/linux/sunrpc/msg_prot.h b/include/linux/sunrpc/msg_prot.h
index a519176..412cfde 100644
--- a/include/linux/sunrpc/msg_prot.h
+++ b/include/linux/sunrpc/msg_prot.h
@@ -53,7 +53,9 @@ enum rpc_accept_stat {
 	RPC_PROG_MISMATCH = 2,
 	RPC_PROC_UNAVAIL = 3,
 	RPC_GARBAGE_ARGS = 4,
-	RPC_SYSTEM_ERR = 5
+	RPC_SYSTEM_ERR = 5,
+	/* internal use only */
+	RPC_DROP_REPLY = 60000
 };
 
 enum rpc_reject_stat {
diff --git a/include/linux/sunrpc/xdr.h b/include/linux/sunrpc/xdr.h
index e6d3d34..8fd1aa1 100644
--- a/include/linux/sunrpc/xdr.h
+++ b/include/linux/sunrpc/xdr.h
@@ -74,6 +74,7 @@ struct xdr_buf {
 #define	rpc_proc_unavail	__constant_htonl(RPC_PROC_UNAVAIL)
 #define	rpc_garbage_args	__constant_htonl(RPC_GARBAGE_ARGS)
 #define	rpc_system_err		__constant_htonl(RPC_SYSTEM_ERR)
+#define	rpc_drop_reply		__constant_htonl(RPC_DROP_REPLY)
 
 #define	rpc_auth_ok		__constant_htonl(RPC_AUTH_OK)
 #define	rpc_autherr_badcred	__constant_htonl(RPC_AUTH_BADCRED)
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index ba2fe53..3153395 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -389,6 +389,11 @@ svc_process(struct svc_serv *serv, struct svc_rqst *rqstp)
 		*statp = procp->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp);
 
 		/* Encode reply */
+		if (*statp == rpc_drop_reply) {
+			if (procp->pc_release)
+				procp->pc_release(rqstp, NULL, rqstp->rq_resp);
+				goto dropit;
+		}
 		if (*statp == rpc_success && (xdr = procp->pc_encode)
 		 && !xdr(rqstp, resv->iov_base+resv->iov_len, rqstp->rq_resp)) {
 			dprintk("svc: failed to encode reply\n");