From: Ian Kent <ikent@redhat.com> Date: Tue, 1 Dec 2009 17:07:27 -0500 Subject: [nfs] sunrpc: allow rpc_release() CB run on another workq Message-id: <20091201170727.12228.41509.stgit@zeus.themaw.net> Patchwork-id: 21573 O-Subject: [RHEL 5.4 PATCH 3/5] SUNRPC: Allow the rpc_release() callback to be run on another workqueue (bz489931) Bugzilla: 489931 RH-Acked-by: Jeff Layton <jlayton@redhat.com> From: Trond Myklebust <Trond.Myklebust@netapp.com> A lot of the work done by the rpc_release() callback is inappropriate for rpciod as it will often involve things like starting a new rpc call in order to clean up state after an interrupted NFSv4 open() call, or calls to mntput(), etc. This patch allows the caller of rpc_run_task() to specify that the rpc_release callback should run on a different workqueue than the default rpciod_workqueue. Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com> diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h index 5fc94b6..3f34e8f 100644 --- a/include/linux/sunrpc/sched.h +++ b/include/linux/sunrpc/sched.h @@ -244,11 +244,20 @@ struct rpc_wait_queue { /* * Function prototypes */ +struct rpc_task *rpc_new_task_wq(struct rpc_clnt *, int flags, + const struct rpc_call_ops *ops, void *data, + struct workqueue_struct *workqueue); struct rpc_task *rpc_new_task(struct rpc_clnt *, int flags, const struct rpc_call_ops *ops, void *data); +struct rpc_task *rpc_run_task_wq(struct rpc_clnt *clnt, int flags, + const struct rpc_call_ops *ops, void *data, + struct workqueue_struct *workqueue); struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *ops, void *data); struct rpc_task *rpc_new_child(struct rpc_clnt *, struct rpc_task *parent); +void rpc_init_task_wq(struct rpc_task *task, struct rpc_clnt *clnt, + int flags, const struct rpc_call_ops *ops, + void *data, struct workqueue_struct *workqueue); void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *ops, void *data); diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index 68f7e36..1685402 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -329,7 +329,7 @@ static void rpc_make_runnable(struct rpc_task *task) int status; INIT_WORK(&task->u.tk_work, rpc_async_schedule, (void *)task); - status = queue_work(task->tk_workqueue, &task->u.tk_work); + status = queue_work(rpciod_workqueue, &task->u.tk_work); if (status < 0) { printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); task->tk_status = status; @@ -813,7 +813,8 @@ void rpc_free(struct rpc_task *task) /* * Creation and deletion of RPC task structures */ -void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata) +void rpc_init_task_wq(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, + void *calldata, struct workqueue_struct *workqueue) { memset(task, 0, sizeof(*task)); init_timer(&task->tk_timer); @@ -835,7 +836,7 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, cons task->tk_cookie = (unsigned long)current; /* Initialize workqueue for async tasks */ - task->tk_workqueue = rpciod_workqueue; + task->tk_workqueue = workqueue; if (clnt) { atomic_inc(&clnt->cl_users); @@ -854,13 +855,19 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, cons current->pid); } +void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, + void *calldata) +{ + rpc_init_task_wq(task, clnt, flags, tk_ops, calldata, NULL); +} + static struct rpc_task * rpc_alloc_task(void) { return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS); } -static void rpc_free_task(struct rcu_head *rcu) +static void rpc_free_task_rcu(struct rcu_head *rcu) { struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu); dprintk("RPC: %4d freeing task\n", task->tk_pid); @@ -872,7 +879,8 @@ static void rpc_free_task(struct rcu_head *rcu) * clean up after an allocation failure, as the client may * have specified "oneshot". */ -struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata) +struct rpc_task *rpc_new_task_wq(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata, + struct workqueue_struct *workqueue) { struct rpc_task *task; @@ -880,7 +888,7 @@ struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc if (!task) goto cleanup; - rpc_init_task(task, clnt, flags, tk_ops, calldata); + rpc_init_task_wq(task, clnt, flags, tk_ops, calldata, workqueue); dprintk("RPC: %4d allocated task\n", task->tk_pid); task->tk_flags |= RPC_TASK_DYNAMIC; @@ -898,12 +906,28 @@ cleanup: goto out; } +struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata) +{ + return rpc_new_task_wq(clnt, flags, tk_ops, calldata, NULL); +} -void rpc_put_task(struct rpc_task *task) +static void rpc_free_task(struct rpc_task *task) { const struct rpc_call_ops *tk_ops = task->tk_ops; void *calldata = task->tk_calldata; + if (task->tk_flags & RPC_TASK_DYNAMIC) + call_rcu_bh(&task->u.tk_rcu, rpc_free_task_rcu); + rpc_release_calldata(tk_ops, calldata); +} + +static void rpc_async_release(void *task) +{ + rpc_free_task((struct rpc_task *) task); +} + +void rpc_put_task(struct rpc_task *task) +{ if (!atomic_dec_and_test(&task->tk_count)) return; /* Release resources */ @@ -915,9 +939,11 @@ void rpc_put_task(struct rpc_task *task) rpc_release_client(task->tk_client); task->tk_client = NULL; } - if (task->tk_flags & RPC_TASK_DYNAMIC) - call_rcu_bh(&task->u.tk_rcu, rpc_free_task); - rpc_release_calldata(tk_ops, calldata); + if (task->tk_workqueue != NULL) { + INIT_WORK(&task->u.tk_work, rpc_async_release, (void *) task); + queue_work(task->tk_workqueue, &task->u.tk_work); + } else + rpc_free_task(task); } EXPORT_SYMBOL(rpc_put_task); @@ -954,12 +980,12 @@ static void rpc_release_task(struct rpc_task *task) * @ops: RPC call ops * @data: user call data */ -struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags, - const struct rpc_call_ops *ops, - void *data) +struct rpc_task *rpc_run_task_wq(struct rpc_clnt *clnt, int flags, + const struct rpc_call_ops *ops, + void *data, struct workqueue_struct *workqueue) { struct rpc_task *task; - task = rpc_new_task(clnt, flags, ops, data); + task = rpc_new_task_wq(clnt, flags, ops, data, workqueue); if (task == NULL) { rpc_release_calldata(ops, data); return ERR_PTR(-ENOMEM); @@ -968,6 +994,14 @@ struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags, rpc_execute(task); return task; } +EXPORT_SYMBOL(rpc_run_task_wq); + +struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags, + const struct rpc_call_ops *ops, + void *data) +{ + return rpc_run_task_wq(clnt, flags, ops, data, NULL); +} EXPORT_SYMBOL(rpc_run_task); /** diff --git a/net/sunrpc/sunrpc_syms.c b/net/sunrpc/sunrpc_syms.c index 9395902..3895d93 100644 --- a/net/sunrpc/sunrpc_syms.c +++ b/net/sunrpc/sunrpc_syms.c @@ -25,12 +25,14 @@ /* RPC scheduler */ EXPORT_SYMBOL(rpc_execute); +EXPORT_SYMBOL(rpc_init_task_wq); EXPORT_SYMBOL(rpc_init_task); EXPORT_SYMBOL(rpc_sleep_on); EXPORT_SYMBOL(rpc_wake_up_next); EXPORT_SYMBOL(rpc_wake_up_task); EXPORT_SYMBOL(rpciod_down); EXPORT_SYMBOL(rpciod_up); +EXPORT_SYMBOL(rpc_new_task_wq); EXPORT_SYMBOL(rpc_new_task); EXPORT_SYMBOL(rpc_wake_up_status);