Sophie

Sophie

distrib > Scientific%20Linux > 5x > x86_64 > by-pkgid > fc11cd6e1c513a17304da94a5390f3cd > files > 2660

kernel-2.6.18-194.11.1.el5.src.rpm

From: Jeff Layton <jlayton@redhat.com>
Date: Thu, 5 Jun 2008 07:51:51 -0400
Subject: [nfs] sunrpc: fix a race in rpciod_down
Message-id: 1212666712-30755-2-git-send-email-jlayton@redhat.com
O-Subject: [RHEL5.3 PATCH 1/2] BZ#448754: SUNRPC: Fix a race in rpciod_down()
Bugzilla: 448754
RH-Acked-by: Steve Dickson <SteveD@redhat.com>

The patch that *did* get committed for bug 246642 can lead to deadlock.
To ensure that an RPC client could always send an async RPC call, that
patch made it so that a new RPC client would take a reference to the
rpciod workqueue. Because of this, the last reference holder to the
workqueue might be put by work running on that workqueue.

Original problem description from upstream is here:

--------------[snip]----------------
The commit 4ada539ed77c7a2bbcb75cafbbd7bd8d2b9bef7b lead to the
unpleasant possibility of an asynchronous rpc_task being required to
call rpciod_down() when it is complete. This again means that the rpciod
workqueue may get to call destroy_workqueue on itself -> hang...

Change rpciod_up/rpciod_down to just get/put the module, and then
create/destroy the workqueues on module load/unload.

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
--------------[snip]----------------

I've been able to reproduce this deadlock with a (rather contrived)
series of steps that I was using to reproduce a different problem. This
patch does fix it.

This patch does change rpciod behavior somewhat. Rather than starting
rpciod on the first rpciod_up, it starts rpciod on module load and stops
it when the module is unloaded. This is probably not something most
users will notice or care about, but it's worth noting.

diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 8dde394..7e870ed 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -41,7 +41,6 @@ static mempool_t	*rpc_task_mempool __read_mostly;
 static mempool_t	*rpc_buffer_mempool __read_mostly;
 
 static void			__rpc_default_timer(struct rpc_task *task);
-static void			rpciod_killall(void);
 static void			rpc_async_schedule(void *);
 static void			 rpc_release_task(struct rpc_task *task);
 
@@ -64,8 +63,6 @@ static LIST_HEAD(all_tasks);
 /*
  * rpciod-related stuff
  */
-static DEFINE_MUTEX(rpciod_mutex);
-static unsigned int		rpciod_users;
 struct workqueue_struct *rpciod_workqueue;
 
 /*
@@ -1060,84 +1057,43 @@ void rpc_killall_tasks(struct rpc_clnt *clnt)
 	spin_unlock(&rpc_sched_lock);
 }
 
-static DECLARE_MUTEX_LOCKED(rpciod_running);
-
-static void rpciod_killall(void)
+int rpciod_up(void)
 {
-	unsigned long flags;
-
-	while (!list_empty(&all_tasks)) {
-		clear_thread_flag(TIF_SIGPENDING);
-		rpc_killall_tasks(NULL);
-		flush_workqueue(rpciod_workqueue);
-		if (!list_empty(&all_tasks)) {
-			dprintk("rpciod_killall: waiting for tasks to exit\n");
-			yield();
-		}
-	}
+	return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
+}
 
-	spin_lock_irqsave(&current->sighand->siglock, flags);
-	recalc_sigpending();
-	spin_unlock_irqrestore(&current->sighand->siglock, flags);
+void rpciod_down(void)
+{
+	module_put(THIS_MODULE);
 }
 
 /*
- * Start up the rpciod process if it's not already running.
+ * Start up the rpciod workqueue.
  */
-int
-rpciod_up(void)
+static int rpciod_start(void)
 {
 	struct workqueue_struct *wq;
-	int error = 0;
 
-	mutex_lock(&rpciod_mutex);
-	dprintk("rpciod_up: users %d\n", rpciod_users);
-	rpciod_users++;
-	if (rpciod_workqueue)
-		goto out;
-	/*
-	 * If there's no pid, we should be the first user.
-	 */
-	if (rpciod_users > 1)
-		printk(KERN_WARNING "rpciod_up: no workqueue, %d users??\n", rpciod_users);
 	/*
 	 * Create the rpciod thread and wait for it to start.
 	 */
-	error = -ENOMEM;
+	dprintk("RPC:       creating workqueue rpciod\n");
 	wq = create_workqueue("rpciod");
-	if (wq == NULL) {
-		printk(KERN_WARNING "rpciod_up: create workqueue failed, error=%d\n", error);
-		rpciod_users--;
-		goto out;
-	}
 	rpciod_workqueue = wq;
-	error = 0;
-out:
-	mutex_unlock(&rpciod_mutex);
-	return error;
+	return rpciod_workqueue != NULL;
 }
 
-void
-rpciod_down(void)
+static void rpciod_stop(void)
 {
-	mutex_lock(&rpciod_mutex);
-	dprintk("rpciod_down sema %d\n", rpciod_users);
-	if (rpciod_users) {
-		if (--rpciod_users)
-			goto out;
-	} else
-		printk(KERN_WARNING "rpciod_down: no users??\n");
+	struct workqueue_struct *wq = NULL;
 
-	if (!rpciod_workqueue) {
-		dprintk("rpciod_down: Nothing to do!\n");
-		goto out;
-	}
-	rpciod_killall();
+	if (rpciod_workqueue == NULL)
+		return;
+	dprintk("RPC:       destroying workqueue rpciod\n");
 
-	destroy_workqueue(rpciod_workqueue);
+	wq = rpciod_workqueue;
 	rpciod_workqueue = NULL;
- out:
-	mutex_unlock(&rpciod_mutex);
+	destroy_workqueue(wq);
 }
 
 #ifdef RPC_DEBUG
@@ -1176,6 +1132,7 @@ void rpc_show_tasks(void)
 void
 rpc_destroy_mempool(void)
 {
+	rpciod_stop();
 	if (rpc_buffer_mempool)
 		mempool_destroy(rpc_buffer_mempool);
 	if (rpc_task_mempool)
@@ -1209,6 +1166,8 @@ rpc_init_mempool(void)
 						      rpc_buffer_slabp);
 	if (!rpc_buffer_mempool)
 		goto err_nomem;
+	if (!rpciod_start())
+		goto err_nomem;
 	return 0;
 err_nomem:
 	rpc_destroy_mempool();