From: Doug Ledford <dledford@redhat.com> Date: Tue, 14 Apr 2009 15:23:38 -0400 Subject: [openib] SDP: update to OFED 1.4.1-rc3 Message-id: 1239737023-31222-12-git-send-email-dledford@redhat.com O-Subject: [Patch RHEL5.4 11/16] [SDP] Update to OFED 1.4.1-rc3 version Bugzilla: 476301 Signed-off-by: Doug Ledford <dledford@redhat.com> diff --git a/drivers/infiniband/ulp/sdp/sdp.h b/drivers/infiniband/ulp/sdp/sdp.h index 57a95ce..72003d2 100644 --- a/drivers/infiniband/ulp/sdp/sdp.h +++ b/drivers/infiniband/ulp/sdp/sdp.h @@ -8,7 +8,8 @@ #include <rdma/ib_verbs.h> #define sdp_printk(level, sk, format, arg...) \ - printk(level "sdp_sock(%d:%d): " format, \ + printk(level "%s:%d sdp_sock(%d:%d): " format, \ + __func__, __LINE__, \ (sk) ? inet_sk(sk)->num : -1, \ (sk) ? ntohs(inet_sk(sk)->dport) : -1, ## arg) #define sdp_warn(sk, format, arg...) \ @@ -22,9 +23,30 @@ extern int sdp_debug_level; if (sdp_debug_level > 0) \ sdp_printk(KERN_DEBUG, sk, format , ## arg); \ } while (0) + +#define sock_ref(sk, msg, sock_op) ({ \ + if (!atomic_read(&(sk)->sk_refcnt)) {\ + sdp_warn(sk, "%s:%d - %s (%s) ref = 0.\n", \ + __func__, __LINE__, #sock_op, msg); \ + WARN_ON(1); \ + } else { \ + sdp_dbg(sk, "%s:%d - %s (%s) ref = %d.\n", __func__, __LINE__, \ + #sock_op, msg, atomic_read(&(sk)->sk_refcnt)); \ + sock_op(sk); \ + }\ +}) + +#define sk_common_release(sk) do { \ + sdp_dbg(sk, "%s:%d - sock_put(" SOCK_REF_BORN ") - refcount = %d " \ + "from withing sk_common_release\n",\ + __FUNCTION__, __LINE__, atomic_read(&(sk)->sk_refcnt)); \ + sk_common_release(sk); \ +} while (0) + #else /* CONFIG_INFINIBAND_SDP_DEBUG */ #define sdp_dbg(priv, format, arg...) \ do { (void) (priv); } while (0) +#define sock_ref(sk, msg, sock_op) sock_op(sk) #endif /* CONFIG_INFINIBAND_SDP_DEBUG */ #ifdef CONFIG_INFINIBAND_SDP_DEBUG_DATA @@ -39,10 +61,22 @@ extern int sdp_data_debug_level; do { (void) (priv); } while (0) #endif +#define SOCK_REF_RESET "RESET" +#define SOCK_REF_BORN "BORN" /* sock_alloc -> destruct_sock */ +#define SOCK_REF_CLONE "CLONE" +#define SOCK_REF_CM_TW "CM_TW" /* TIMEWAIT_ENTER -> TIMEWAIT_EXIT */ +#define SOCK_REF_SEQ "SEQ" /* during proc read */ +#define SOCK_REF_DREQ_TO "DREQ_TO" /* dreq timeout is pending */ + +#define sock_hold(sk, msg) sock_ref(sk, msg, sock_hold) +#define sock_put(sk, msg) sock_ref(sk, msg, sock_put) +#define __sock_put(sk, msg) sock_ref(sk, msg, __sock_put) + #define SDP_RESOLVE_TIMEOUT 1000 #define SDP_ROUTE_TIMEOUT 1000 #define SDP_RETRY_COUNT 5 #define SDP_KEEPALIVE_TIME (120 * 60 * HZ) +#define SDP_FIN_WAIT_TIMEOUT (60 * HZ) #define SDP_TX_SIZE 0x40 #define SDP_RX_SIZE 0x40 @@ -111,7 +145,7 @@ struct sdp_sock { struct work_struct work; wait_queue_head_t wq; - struct delayed_work time_wait_work; + struct delayed_work dreq_wait_work; struct work_struct destroy_work; /* Like tcp_sock */ @@ -126,24 +160,32 @@ struct sdp_sock { int xmit_size_goal; int nonagle; - int time_wait; + int dreq_wait_timeout; unsigned keepalive_time; + spinlock_t lock; + /* tx_head/rx_head when keepalive timer started */ unsigned keepalive_tx_head; unsigned keepalive_rx_head; - /* Data below will be reset on error */ + int destructed_already; + int sdp_disconnect; + int destruct_in_process; + + struct sdp_buf *rx_ring; + struct sdp_buf *tx_ring; + /* rdma specific */ - struct rdma_cm_id *id; struct ib_qp *qp; struct ib_cq *cq; struct ib_mr *mr; + /* Data below will be reset on error */ + struct rdma_cm_id *id; struct ib_device *ib_device; /* SDP specific */ - struct sdp_buf *rx_ring; struct ib_recv_wr rx_wr; unsigned rx_head; unsigned rx_tail; @@ -155,7 +197,6 @@ struct sdp_sock { int remote_credits; int poll_cq; - struct sdp_buf *tx_ring; unsigned tx_head; unsigned tx_tail; struct ib_send_wr tx_wr; @@ -192,6 +233,7 @@ struct bzcopy_state { struct page **pages; }; +extern int rcvbuf_initial_size; extern struct proto sdp_proto; extern struct workqueue_struct *sdp_workqueue; @@ -219,26 +261,70 @@ static inline struct sdp_sock *sdp_sk(const struct sock *sk) return (struct sdp_sock *)sk; } -static inline void sdp_set_error(struct sock *sk, int err) +static inline char *sdp_state_str(int state) { - sk->sk_err = -err; - if (sk->sk_socket) - sk->sk_socket->state = SS_UNCONNECTED; - - sk->sk_state = TCP_CLOSE; + static char *state_str[] = { + [TCP_ESTABLISHED] = "TCP_ESTABLISHED", + [TCP_SYN_SENT] = "TCP_SYN_SENT", + [TCP_SYN_RECV] = "TCP_SYN_RECV", + [TCP_FIN_WAIT1] = "TCP_FIN_WAIT1", + [TCP_FIN_WAIT2] = "TCP_FIN_WAIT2", + [TCP_TIME_WAIT] = "TCP_TIME_WAIT", + [TCP_CLOSE] = "TCP_CLOSE", + [TCP_CLOSE_WAIT] = "TCP_CLOSE_WAIT", + [TCP_LAST_ACK] = "TCP_LAST_ACK", + [TCP_LISTEN] = "TCP_LISTEN", + [TCP_CLOSING] = "TCP_CLOSING", + }; + + if (state < 0 || state >= TCP_MAX_STATES) + return "unknown"; + + return state_str[state]; +} - if (sdp_sk(sk)->time_wait) { - sdp_dbg(sk, "%s: destroy in time wait state\n", __func__); - sdp_sk(sk)->time_wait = 0; - queue_work(sdp_workqueue, &sdp_sk(sk)->destroy_work); +static inline int _sdp_exch_state(const char *func, int line, struct sock *sk, + int from_states, int state) +{ + unsigned long flags; + int old; + + spin_lock_irqsave(&sdp_sk(sk)->lock, flags); + + sdp_dbg(sk, "%s:%d - set state: %s -> %s 0x%x\n", func, line, + sdp_state_str(sk->sk_state), sdp_state_str(state), from_states); + + if ((1 << sk->sk_state) & ~from_states) { + sdp_warn(sk, "trying to exchange state from unexpected state " + "%s to state %s. expected states: 0x%x\n", + sdp_state_str(sk->sk_state), sdp_state_str(state), + from_states); } - sk->sk_error_report(sk); + old = sk->sk_state; + sk->sk_state = state; + + spin_unlock_irqrestore(&sdp_sk(sk)->lock, flags); + + return old; } +#define sdp_exch_state(sk, from_states, state) \ + _sdp_exch_state(__func__, __LINE__, sk, from_states, state) -static inline void sdp_set_state(struct sock *sk, int state) +static inline void sdp_set_error(struct sock *sk, int err) { - sk->sk_state = state; + int ib_teardown_states = TCPF_FIN_WAIT1 | TCPF_CLOSE_WAIT + | TCPF_LAST_ACK; + sk->sk_err = -err; + if (sk->sk_socket) + sk->sk_socket->state = SS_DISCONNECTING; + + if ((1 << sk->sk_state) & ib_teardown_states) + sdp_exch_state(sk, ib_teardown_states, TCP_TIME_WAIT); + else + sdp_exch_state(sk, ~0, TCP_CLOSE); + + sk->sk_error_report(sk); } extern struct workqueue_struct *sdp_workqueue; @@ -246,7 +332,6 @@ extern struct workqueue_struct *sdp_workqueue; int sdp_cma_handler(struct rdma_cm_id *, struct rdma_cm_event *); void sdp_reset(struct sock *sk); void sdp_reset_sk(struct sock *sk, int rc); -void sdp_time_wait_destroy_sk(struct sdp_sock *ssk); void sdp_completion_handler(struct ib_cq *cq, void *cq_context); void sdp_work(struct work_struct *work); int sdp_post_credits(struct sdp_sock *ssk); @@ -255,7 +340,8 @@ void sdp_post_recvs(struct sdp_sock *ssk); int sdp_poll_cq(struct sdp_sock *ssk, struct ib_cq *cq); void sdp_post_sends(struct sdp_sock *ssk, int nonagle); void sdp_destroy_work(struct work_struct *work); -void sdp_time_wait_work(struct work_struct *work); +void sdp_cancel_dreq_wait_timeout(struct sdp_sock *ssk); +void sdp_dreq_wait_timeout_work(struct work_struct *work); struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id); struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq); void sdp_urg(struct sdp_sock *ssk, struct sk_buff *skb); @@ -263,8 +349,10 @@ void sdp_add_sock(struct sdp_sock *ssk); void sdp_remove_sock(struct sdp_sock *ssk); void sdp_remove_large_sock(struct sdp_sock *ssk); int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size); +int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size); void sdp_post_keepalive(struct sdp_sock *ssk); void sdp_start_keepalive_timer(struct sock *sk); void sdp_bzcopy_write_space(struct sdp_sock *ssk); +int sdp_init_sock(struct sock *sk); #endif diff --git a/drivers/infiniband/ulp/sdp/sdp_bcopy.c b/drivers/infiniband/ulp/sdp/sdp_bcopy.c index 845935b..afb3b12 100644 --- a/drivers/infiniband/ulp/sdp/sdp_bcopy.c +++ b/drivers/infiniband/ulp/sdp/sdp_bcopy.c @@ -45,6 +45,10 @@ struct sdp_chrecvbuf { static int rcvbuf_scale = 0x10; +int rcvbuf_initial_size = SDP_HEAD_SIZE; +module_param_named(rcvbuf_initial_size, rcvbuf_initial_size, int, 0644); +MODULE_PARM_DESC(rcvbuf_initial_size, "Receive buffer initial size in bytes."); + module_param_named(rcvbuf_scale, rcvbuf_scale, int, 0644); MODULE_PARM_DESC(rcvbuf_scale, "Receive buffer size scale factor."); @@ -96,7 +100,7 @@ void sdp_remove_large_sock(struct sdp_sock *ssk) } } -/* Like tcp_fin */ +/* Like tcp_fin - called when SDP_MID_DISCONNECT is received */ static void sdp_fin(struct sock *sk) { sdp_dbg(sk, "%s\n", __func__); @@ -104,6 +108,44 @@ static void sdp_fin(struct sock *sk) sk->sk_shutdown |= RCV_SHUTDOWN; sock_set_flag(sk, SOCK_DONE); + switch (sk->sk_state) { + case TCP_SYN_RECV: + case TCP_ESTABLISHED: + sdp_exch_state(sk, TCPF_SYN_RECV | TCPF_ESTABLISHED, + TCP_CLOSE_WAIT); + break; + + case TCP_FIN_WAIT1: + /* Received a reply FIN - start Infiniband tear down */ + sdp_dbg(sk, "%s: Starting Infiniband tear down sending DREQ\n", + __func__); + + sdp_cancel_dreq_wait_timeout(sdp_sk(sk)); + + sdp_exch_state(sk, TCPF_FIN_WAIT1, TCP_TIME_WAIT); + + if (sdp_sk(sk)->id) { + rdma_disconnect(sdp_sk(sk)->id); + } else { + sdp_warn(sk, "%s: sdp_sk(sk)->id is NULL\n", __func__); + return; + } + break; + case TCP_TIME_WAIT: + /* This is a mutual close situation and we've got the DREQ from + the peer before the SDP_MID_DISCONNECT */ + break; + case TCP_CLOSE: + /* FIN arrived after IB teardown started - do nothing */ + sdp_dbg(sk, "%s: fin in state %s\n", + __func__, sdp_state_str(sk->sk_state)); + return; + default: + sdp_warn(sk, "%s: FIN in unexpected state. sk->sk_state=%d\n", + __func__, sk->sk_state); + break; + } + sk_stream_mem_reclaim(sk); @@ -264,7 +306,7 @@ static void sdp_post_recv(struct sdp_sock *ssk) skb_frag_t *frag; struct sdp_bsdh *h; int id = ssk->rx_head; - unsigned int gfp_page; + gfp_t gfp_page; /* Now, allocate and repost recv */ /* TODO: allocate from cache */ @@ -338,9 +380,13 @@ static void sdp_post_recv(struct sdp_sock *ssk) void sdp_post_recvs(struct sdp_sock *ssk) { + struct sock *sk = &ssk->isk.sk; int scale = ssk->rcvbuf_scale; - if (unlikely(!ssk->id)) + + if (unlikely(!ssk->id || ((1 << sk->sk_state) & + (TCPF_CLOSE | TCPF_TIME_WAIT)))) { return; + } if (top_mem_usage && (top_mem_usage * 0x100000) < atomic_read(&sdp_current_mem_usage) * PAGE_SIZE) @@ -349,8 +395,7 @@ void sdp_post_recvs(struct sdp_sock *ssk) while ((likely(ssk->rx_head - ssk->rx_tail < SDP_RX_SIZE) && (ssk->rx_head - ssk->rx_tail - SDP_MIN_BUFS) * (SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE) + - ssk->rcv_nxt - ssk->copied_seq < - ssk->isk.sk.sk_rcvbuf * scale) || + ssk->rcv_nxt - ssk->copied_seq < sk->sk_rcvbuf * scale) || unlikely(ssk->rx_head - ssk->rx_tail < SDP_MIN_BUFS)) sdp_post_recv(ssk); } @@ -389,7 +434,7 @@ static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk, { int skb_len; struct sdp_sock *ssk = sdp_sk(sk); - struct sk_buff *tail; + struct sk_buff *tail = NULL; /* not needed since sk_rmem_alloc is not currently used * TODO - remove this? @@ -457,7 +502,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle) /* TODO: nonagle? */ struct sk_buff *skb; int c; - int gfp_page; + gfp_t gfp_page; if (unlikely(!ssk->id)) { if (ssk->isk.sk.sk_send_head) { @@ -524,7 +569,9 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle) if (unlikely(c < ssk->rx_head - ssk->rx_tail) && likely(ssk->bufs > 1) && - likely(ssk->tx_head - ssk->tx_tail < SDP_TX_SIZE)) { + likely(ssk->tx_head - ssk->tx_tail < SDP_TX_SIZE) && + likely((1 << ssk->isk.sk.sk_state) & + (TCPF_ESTABLISHED | TCPF_FIN_WAIT1))) { skb = sk_stream_alloc_skb(&ssk->isk.sk, sizeof(struct sdp_bsdh), GFP_KERNEL); @@ -533,27 +580,44 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle) sdp_post_send(ssk, skb, SDP_MID_DATA); } - if (unlikely((1 << ssk->isk.sk.sk_state) & - (TCPF_FIN_WAIT1 | TCPF_LAST_ACK)) && + if (unlikely(ssk->sdp_disconnect) && !ssk->isk.sk.sk_send_head && ssk->bufs > (ssk->remote_credits >= ssk->rx_head - ssk->rx_tail)) { + ssk->sdp_disconnect = 0; skb = sk_stream_alloc_skb(&ssk->isk.sk, sizeof(struct sdp_bsdh), gfp_page); /* FIXME */ BUG_ON(!skb); sdp_post_send(ssk, skb, SDP_MID_DISCONN); - if (ssk->isk.sk.sk_state == TCP_FIN_WAIT1) - ssk->isk.sk.sk_state = TCP_FIN_WAIT2; - else - ssk->isk.sk.sk_state = TCP_CLOSING; } } +int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size) +{ + ssk->recv_frags = PAGE_ALIGN(new_size - SDP_HEAD_SIZE) / PAGE_SIZE; + if (ssk->recv_frags > SDP_MAX_SEND_SKB_FRAGS) + ssk->recv_frags = SDP_MAX_SEND_SKB_FRAGS; + ssk->rcvbuf_scale = rcvbuf_scale; + + sdp_post_recvs(ssk); + + return 0; +} + int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size) { u32 curr_size = SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE; +#if defined(__ia64__) + /* for huge PAGE_SIZE systems, aka IA64, limit buffers size + [re-]negotiation to a known+working size that will not + trigger a HW error/rc to be interpreted as a IB_WC_LOC_LEN_ERR */ + u32 max_size = (SDP_HEAD_SIZE + SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE) <= + 32784 ? + (SDP_HEAD_SIZE + SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE): 32784; +#else u32 max_size = SDP_HEAD_SIZE + SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE; +#endif if (new_size > curr_size && new_size <= max_size && sdp_get_large_socket(ssk)) { @@ -588,112 +652,166 @@ static void sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *bu ssk->sent_request = 0; } -static void sdp_handle_wc(struct sdp_sock *ssk, struct ib_wc *wc) +static int sdp_handle_recv_comp(struct sdp_sock *ssk, struct ib_wc *wc) { + struct sock *sk = &ssk->isk.sk; + int frags; struct sk_buff *skb; struct sdp_bsdh *h; int pagesz, i; - if (wc->wr_id & SDP_OP_RECV) { - skb = sdp_recv_completion(ssk, wc->wr_id); - if (unlikely(!skb)) - return; + skb = sdp_recv_completion(ssk, wc->wr_id); + if (unlikely(!skb)) + return -1; - atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage); + atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage); - if (unlikely(wc->status)) { - if (wc->status != IB_WC_WR_FLUSH_ERR) { - sdp_dbg(&ssk->isk.sk, - "Recv completion with error. " - "Status %d\n", wc->status); - sdp_reset(&ssk->isk.sk); - } - __kfree_skb(skb); - } else { - int frags; - - sdp_dbg_data(&ssk->isk.sk, - "Recv completion. ID %d Length %d\n", - (int)wc->wr_id, wc->byte_len); - if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) { - printk("SDP BUG! byte_len %d < %zd\n", - wc->byte_len, sizeof(struct sdp_bsdh)); - __kfree_skb(skb); - return; - } - skb->len = wc->byte_len; - if (likely(wc->byte_len > SDP_HEAD_SIZE)) - skb->data_len = wc->byte_len - SDP_HEAD_SIZE; - else - skb->data_len = 0; - skb->data = skb->head; + if (unlikely(wc->status)) { + if (wc->status != IB_WC_WR_FLUSH_ERR) { + sdp_dbg(sk, "Recv completion with error. Status %d\n", + wc->status); + sdp_reset(sk); + } + __kfree_skb(skb); + return 0; + } + + sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n", + (int)wc->wr_id, wc->byte_len); + if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) { + printk(KERN_WARNING "SDP BUG! byte_len %d < %zd\n", + wc->byte_len, sizeof(struct sdp_bsdh)); + __kfree_skb(skb); + return -1; + } + skb->len = wc->byte_len; + if (likely(wc->byte_len > SDP_HEAD_SIZE)) + skb->data_len = wc->byte_len - SDP_HEAD_SIZE; + else + skb->data_len = 0; + skb->data = skb->head; #ifdef NET_SKBUFF_DATA_USES_OFFSET - skb->tail = skb_headlen(skb); + skb->tail = skb_headlen(skb); #else - skb->tail = skb->head + skb_headlen(skb); + skb->tail = skb->head + skb_headlen(skb); #endif - h = (struct sdp_bsdh *)skb->data; - skb_reset_transport_header(skb); - ssk->mseq_ack = ntohl(h->mseq); - if (ssk->mseq_ack != (int)wc->wr_id) - printk("SDP BUG! mseq %d != wrid %d\n", - ssk->mseq_ack, (int)wc->wr_id); - ssk->bufs = ntohl(h->mseq_ack) - ssk->tx_head + 1 + - ntohs(h->bufs); - - frags = skb_shinfo(skb)->nr_frags; - pagesz = PAGE_ALIGN(skb->data_len); - skb_shinfo(skb)->nr_frags = pagesz / PAGE_SIZE; - - for (i = skb_shinfo(skb)->nr_frags; - i < frags; ++i) { - put_page(skb_shinfo(skb)->frags[i].page); - skb->truesize -= PAGE_SIZE; - } + h = (struct sdp_bsdh *)skb->data; + skb_reset_transport_header(skb); + ssk->mseq_ack = ntohl(h->mseq); + if (ssk->mseq_ack != (int)wc->wr_id) + printk(KERN_WARNING "SDP BUG! mseq %d != wrid %d\n", + ssk->mseq_ack, (int)wc->wr_id); + ssk->bufs = ntohl(h->mseq_ack) - ssk->tx_head + 1 + + ntohs(h->bufs); + + frags = skb_shinfo(skb)->nr_frags; + pagesz = PAGE_ALIGN(skb->data_len); + skb_shinfo(skb)->nr_frags = pagesz / PAGE_SIZE; + + for (i = skb_shinfo(skb)->nr_frags; + i < frags; ++i) { + put_page(skb_shinfo(skb)->frags[i].page); + skb->truesize -= PAGE_SIZE; + } - if (unlikely(h->flags & SDP_OOB_PEND)) - sk_send_sigurg(&ssk->isk.sk); - - skb_pull(skb, sizeof(struct sdp_bsdh)); - - if (likely(h->mid == SDP_MID_DATA) && - likely(skb->len > 0)) { - int oob = h->flags & SDP_OOB_PRES; - skb = sdp_sock_queue_rcv_skb(&ssk->isk.sk, skb); - if (unlikely(oob)) - sdp_urg(ssk, skb); - } else if (likely(h->mid == SDP_MID_DATA)) { - __kfree_skb(skb); - } else if (h->mid == SDP_MID_DISCONN) { - /* this will wake recvmsg */ - sdp_sock_queue_rcv_skb(&ssk->isk.sk, skb); - sdp_fin(&ssk->isk.sk); - } else if (h->mid == SDP_MID_CHRCVBUF) { - sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)skb->data); - __kfree_skb(skb); - } else if (h->mid == SDP_MID_CHRCVBUF_ACK) { - sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)skb->data); - __kfree_skb(skb); - } else { - /* TODO: Handle other messages */ - printk("SDP: FIXME MID %d\n", h->mid); - __kfree_skb(skb); + if (unlikely(h->flags & SDP_OOB_PEND)) + sk_send_sigurg(sk); + + skb_pull(skb, sizeof(struct sdp_bsdh)); + + switch (h->mid) { + case SDP_MID_DATA: + if (unlikely(skb->len <= 0)) { + __kfree_skb(skb); + break; + } + + if (unlikely(sk->sk_shutdown & RCV_SHUTDOWN)) { + /* got data in RCV_SHUTDOWN */ + if (sk->sk_state == TCP_FIN_WAIT1) { + /* go into abortive close */ + sdp_exch_state(sk, TCPF_FIN_WAIT1, + TCP_TIME_WAIT); + + sk->sk_prot->disconnect(sk, 0); } + + __kfree_skb(skb); + break; + } + skb = sdp_sock_queue_rcv_skb(sk, skb); + if (unlikely(h->flags & SDP_OOB_PRES)) + sdp_urg(ssk, skb); + break; + case SDP_MID_DISCONN: + __kfree_skb(skb); + sdp_fin(sk); + break; + case SDP_MID_CHRCVBUF: + sdp_handle_resize_request(ssk, + (struct sdp_chrecvbuf *)skb->data); + __kfree_skb(skb); + break; + case SDP_MID_CHRCVBUF_ACK: + sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)skb->data); + __kfree_skb(skb); + break; + default: + /* TODO: Handle other messages */ + printk(KERN_WARNING "SDP: FIXME MID %d\n", h->mid); + __kfree_skb(skb); + } + + return 0; +} + +static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc) +{ + struct sk_buff *skb; + struct sdp_bsdh *h; + + skb = sdp_send_completion(ssk, wc->wr_id); + if (unlikely(!skb)) + return -1; + + if (unlikely(wc->status)) { + if (wc->status != IB_WC_WR_FLUSH_ERR) { + struct sock *sk = &ssk->isk.sk; + sdp_dbg(sk, "Send completion with error. " + "Status %d\n", wc->status); + sdp_set_error(sk, -ECONNRESET); + wake_up(&ssk->wq); + + queue_work(sdp_workqueue, &ssk->destroy_work); } + goto out; + } + + h = (struct sdp_bsdh *)skb->data; + + if (likely(h->mid != SDP_MID_DISCONN)) + goto out; + + if ((1 << ssk->isk.sk.sk_state) & ~(TCPF_FIN_WAIT1 | TCPF_LAST_ACK)) { + sdp_dbg(&ssk->isk.sk, + "%s: sent DISCONNECT from unexpected state %d\n", + __func__, ssk->isk.sk.sk_state); + } + +out: + sk_stream_free_skb(&ssk->isk.sk, skb); + + return 0; +} + +static void sdp_handle_wc(struct sdp_sock *ssk, struct ib_wc *wc) +{ + if (wc->wr_id & SDP_OP_RECV) { + if (sdp_handle_recv_comp(ssk, wc)) + return; } else if (likely(wc->wr_id & SDP_OP_SEND)) { - skb = sdp_send_completion(ssk, wc->wr_id); - if (unlikely(!skb)) + if (sdp_handle_send_comp(ssk, wc)) return; - sk_stream_free_skb(&ssk->isk.sk, skb); - if (unlikely(wc->status)) { - if (wc->status != IB_WC_WR_FLUSH_ERR) { - sdp_dbg(&ssk->isk.sk, - "Send completion with error. " - "Status %d\n", wc->status); - sdp_set_error(&ssk->isk.sk, -ECONNRESET); - wake_up(&ssk->wq); - } - } } else { sdp_cnt(sdp_keepalive_probes_sent); @@ -711,13 +829,6 @@ static void sdp_handle_wc(struct sdp_sock *ssk, struct ib_wc *wc) return; } - - if (ssk->time_wait && !ssk->isk.sk.sk_send_head && - ssk->tx_head == ssk->tx_tail) { - sdp_dbg(&ssk->isk.sk, "%s: destroy in time wait state\n", - __func__); - sdp_time_wait_destroy_sk(ssk); - } } void sdp_completion_handler(struct ib_cq *cq, void *cq_context) diff --git a/drivers/infiniband/ulp/sdp/sdp_cma.c b/drivers/infiniband/ulp/sdp/sdp_cma.c index 2e4ceb5..04a2b8a 100644 --- a/drivers/infiniband/ulp/sdp/sdp_cma.c +++ b/drivers/infiniband/ulp/sdp/sdp_cma.c @@ -94,7 +94,7 @@ static void sdp_qp_event_handler(struct ib_event *event, void *data) { } -int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id) +static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id) { struct ib_qp_init_attr qp_init_attr = { .event_handler = sdp_qp_event_handler, @@ -132,7 +132,7 @@ int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id) if (!sdp_sk(sk)->rx_ring) { rc = -ENOMEM; sdp_warn(sk, "Unable to allocate RX Ring size %zd.\n", - sizeof *sdp_sk(sk)->rx_ring * SDP_TX_SIZE); + sizeof *sdp_sk(sk)->rx_ring * SDP_RX_SIZE); goto err_rx; } @@ -162,8 +162,6 @@ int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id) goto err_cq; } - ib_req_notify_cq(cq, IB_CQ_NEXT_COMP); - qp_init_attr.send_cq = qp_init_attr.recv_cq = cq; rc = rdma_create_qp(id, pd, &qp_init_attr); @@ -177,10 +175,6 @@ int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id) init_waitqueue_head(&sdp_sk(sk)->wq); - sdp_sk(sk)->recv_frags = 0; - sdp_sk(sk)->rcvbuf_scale = 1; - sdp_post_recvs(sdp_sk(sk)); - sdp_dbg(sk, "%s done\n", __func__); return 0; @@ -192,13 +186,15 @@ err_mr: ib_dealloc_pd(pd); err_pd: kfree(sdp_sk(sk)->rx_ring); + sdp_sk(sk)->rx_ring = NULL; err_rx: kfree(sdp_sk(sk)->tx_ring); + sdp_sk(sk)->tx_ring = NULL; err_tx: return rc; } -int sdp_connect_handler(struct sock *sk, struct rdma_cm_id *id, +static int sdp_connect_handler(struct sock *sk, struct rdma_cm_id *id, struct rdma_cm_event *event) { struct sockaddr_in *dst_addr; @@ -217,32 +213,31 @@ int sdp_connect_handler(struct sock *sk, struct rdma_cm_id *id, if (!child) return -ENOMEM; - sdp_add_sock(sdp_sk(child)); - INIT_LIST_HEAD(&sdp_sk(child)->accept_queue); - INIT_LIST_HEAD(&sdp_sk(child)->backlog_queue); - INIT_DELAYED_WORK(&sdp_sk(child)->time_wait_work, sdp_time_wait_work); - INIT_WORK(&sdp_sk(child)->destroy_work, sdp_destroy_work); + sdp_init_sock(child); dst_addr = (struct sockaddr_in *)&id->route.addr.dst_addr; inet_sk(child)->dport = dst_addr->sin_port; inet_sk(child)->daddr = dst_addr->sin_addr.s_addr; bh_unlock_sock(child); - __sock_put(child); + __sock_put(child, SOCK_REF_CLONE); rc = sdp_init_qp(child, id); if (rc) { - sk_common_release(child); + sdp_sk(child)->destructed_already = 1; + sk_free(child); return rc; } + sdp_add_sock(sdp_sk(child)); + sdp_sk(child)->max_bufs = sdp_sk(child)->bufs = ntohs(h->bsdh.bufs); sdp_sk(child)->min_bufs = sdp_sk(child)->bufs / 4; sdp_sk(child)->xmit_size_goal = ntohl(h->localrcvsz) - sizeof(struct sdp_bsdh); sdp_sk(child)->send_frags = PAGE_ALIGN(sdp_sk(child)->xmit_size_goal) / PAGE_SIZE; - sdp_resize_buffers(sdp_sk(child), ntohl(h->desremrcvsz)); + sdp_init_buffers(sdp_sk(child), ntohl(h->desremrcvsz)); sdp_dbg(child, "%s bufs %d xmit_size_goal %d send trigger %d\n", __func__, @@ -256,7 +251,7 @@ int sdp_connect_handler(struct sock *sk, struct rdma_cm_id *id, list_add_tail(&sdp_sk(child)->backlog_queue, &sdp_sk(sk)->backlog_queue); sdp_sk(child)->parent = sk; - child->sk_state = TCP_SYN_RECV; + sdp_exch_state(child, TCPF_LISTEN | TCPF_CLOSE, TCP_SYN_RECV); /* child->sk_write_space(child); */ /* child->sk_data_ready(child, 0); */ @@ -272,7 +267,7 @@ static int sdp_response_handler(struct sock *sk, struct rdma_cm_id *id, struct sockaddr_in *dst_addr; sdp_dbg(sk, "%s\n", __func__); - sk->sk_state = TCP_ESTABLISHED; + sdp_exch_state(sk, TCPF_SYN_SENT, TCP_ESTABLISHED); if (sock_flag(sk, SOCK_KEEPOPEN)) sdp_start_keepalive_timer(sk); @@ -308,7 +303,7 @@ static int sdp_response_handler(struct sock *sk, struct rdma_cm_id *id, return 0; } -int sdp_connected_handler(struct sock *sk, struct rdma_cm_event *event) +static int sdp_connected_handler(struct sock *sk, struct rdma_cm_event *event) { struct sock *parent; sdp_dbg(sk, "%s\n", __func__); @@ -316,7 +311,7 @@ int sdp_connected_handler(struct sock *sk, struct rdma_cm_event *event) parent = sdp_sk(sk)->parent; BUG_ON(!parent); - sk->sk_state = TCP_ESTABLISHED; + sdp_exch_state(sk, TCPF_SYN_RECV, TCP_ESTABLISHED); if (sock_flag(sk, SOCK_KEEPOPEN)) sdp_start_keepalive_timer(sk); @@ -350,7 +345,7 @@ done: return 0; } -int sdp_disconnected_handler(struct sock *sk) +static int sdp_disconnected_handler(struct sock *sk) { struct sdp_sock *ssk = sdp_sk(sk); @@ -420,6 +415,7 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) hh.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HH_SIZE); hh.max_adverts = 1; hh.majv_minv = SDP_MAJV_MINV; + sdp_init_buffers(sdp_sk(sk), rcvbuf_initial_size); hh.localrcvsz = hh.desremrcvsz = htonl(sdp_sk(sk)->recv_frags * PAGE_SIZE + SDP_HEAD_SIZE); hh.max_adverts = 0x1; @@ -498,9 +494,32 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) ((struct sockaddr_in *)&id->route.addr.src_addr)->sin_addr.s_addr; rc = sdp_connected_handler(sk, event); break; - case RDMA_CM_EVENT_DISCONNECTED: + case RDMA_CM_EVENT_DISCONNECTED: /* This means DREQ/DREP received */ sdp_dbg(sk, "RDMA_CM_EVENT_DISCONNECTED\n"); + + if (sk->sk_state == TCP_LAST_ACK) { + sdp_cancel_dreq_wait_timeout(sdp_sk(sk)); + + sdp_exch_state(sk, TCPF_LAST_ACK, TCP_TIME_WAIT); + + sdp_dbg(sk, "%s: waiting for Infiniband tear down\n", + __func__); + } + rdma_disconnect(id); + + if (sk->sk_state != TCP_TIME_WAIT) { + if (sk->sk_state == TCP_CLOSE_WAIT) { + sdp_dbg(sk, "IB teardown while in TCP_CLOSE_WAIT " + "taking reference to let close() finish the work\n"); + sock_hold(sk, SOCK_REF_CM_TW); + } + sdp_set_error(sk, EPIPE); + rc = sdp_disconnected_handler(sk); + } + break; + case RDMA_CM_EVENT_TIMEWAIT_EXIT: + sdp_dbg(sk, "RDMA_CM_EVENT_TIMEWAIT_EXIT\n"); rc = sdp_disconnected_handler(sk); break; case RDMA_CM_EVENT_DEVICE_REMOVAL: diff --git a/drivers/infiniband/ulp/sdp/sdp_main.c b/drivers/infiniband/ulp/sdp/sdp_main.c index 0ac9954..ca7c833 100644 --- a/drivers/infiniband/ulp/sdp/sdp_main.c +++ b/drivers/infiniband/ulp/sdp/sdp_main.c @@ -141,7 +141,9 @@ struct workqueue_struct *sdp_workqueue; static struct list_head sock_list; static spinlock_t sock_list_lock; -DEFINE_RWLOCK(device_removal_lock); +static DEFINE_RWLOCK(device_removal_lock); + +static inline void sdp_start_dreq_wait_timeout(struct sdp_sock *ssk, int timeo); static inline unsigned int sdp_keepalive_time_when(const struct sdp_sock *ssk) { @@ -240,8 +242,14 @@ static void sdp_destroy_qp(struct sdp_sock *ssk) sdp_remove_large_sock(ssk); - kfree(ssk->rx_ring); - kfree(ssk->tx_ring); + if (ssk->rx_ring) { + kfree(ssk->rx_ring); + ssk->rx_ring = NULL; + } + if (ssk->tx_ring) { + kfree(ssk->tx_ring); + ssk->tx_ring = NULL; + } } @@ -295,7 +303,7 @@ static void sdp_keepalive_timer(unsigned long data) out: bh_unlock_sock(sk); - sock_put(sk); + sock_put(sk, SOCK_REF_BORN); } static void sdp_init_timer(struct sock *sk) @@ -338,17 +346,14 @@ void sdp_reset_sk(struct sock *sk, int rc) if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk)) sdp_set_error(sk, rc); - sdp_destroy_qp(ssk); - memset((void *)&ssk->id, 0, sizeof(*ssk) - offsetof(typeof(*ssk), id)); - if (ssk->time_wait) { - sdp_dbg(sk, "%s: destroy in time wait state\n", __func__); - sdp_time_wait_destroy_sk(ssk); - } - sk->sk_state_change(sk); + /* Don't destroy socket before destroy work does its job */ + sock_hold(sk, SOCK_REF_RESET); + queue_work(sdp_workqueue, &ssk->destroy_work); + read_unlock(&device_removal_lock); } @@ -417,6 +422,12 @@ static void sdp_destruct(struct sock *sk) struct sdp_sock *s, *t; sdp_dbg(sk, "%s\n", __func__); + if (ssk->destructed_already) { + sdp_warn(sk, "redestructing sk!"); + return; + } + + ssk->destructed_already = 1; sdp_remove_sock(ssk); @@ -436,27 +447,32 @@ done: sdp_dbg(sk, "%s done\n", __func__); } -static void sdp_send_active_reset(struct sock *sk, gfp_t priority) +static void sdp_send_disconnect(struct sock *sk) { - sk->sk_prot->disconnect(sk, 0); + sock_hold(sk, SOCK_REF_DREQ_TO); + sdp_start_dreq_wait_timeout(sdp_sk(sk), SDP_FIN_WAIT_TIMEOUT); + + sdp_sk(sk)->sdp_disconnect = 1; + sdp_post_sends(sdp_sk(sk), 0); } /* * State processing on a close. - * TCP_ESTABLISHED -> TCP_FIN_WAIT1 -> TCP_FIN_WAIT2 -> TCP_CLOSE + * TCP_ESTABLISHED -> TCP_FIN_WAIT1 -> TCP_CLOSE */ - static int sdp_close_state(struct sock *sk) { - if ((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT)) + switch (sk->sk_state) { + case TCP_ESTABLISHED: + sdp_exch_state(sk, TCPF_ESTABLISHED, TCP_FIN_WAIT1); + break; + case TCP_CLOSE_WAIT: + sdp_exch_state(sk, TCPF_CLOSE_WAIT, TCP_LAST_ACK); + break; + default: return 0; + } - if (sk->sk_state == TCP_ESTABLISHED) - sk->sk_state = TCP_FIN_WAIT1; - else if (sk->sk_state == TCP_CLOSE_WAIT) - sk->sk_state = TCP_LAST_ACK; - else - return 0; return 1; } @@ -473,14 +489,23 @@ static void sdp_close(struct sock *sk, long timeout) sdp_delete_keepalive_timer(sk); sk->sk_shutdown = SHUTDOWN_MASK; + + if ((1 << sk->sk_state) & (TCPF_TIME_WAIT | TCPF_CLOSE)) { + /* this could happen if socket was closed by a CM teardown + and after that the user called close() */ + goto out; + } + if (sk->sk_state == TCP_LISTEN || sk->sk_state == TCP_SYN_SENT) { - sdp_set_state(sk, TCP_CLOSE); + sdp_exch_state(sk, TCPF_LISTEN | TCPF_SYN_SENT, TCP_CLOSE); /* Special case: stop listening. This is done by sdp_destruct. */ goto adjudge_to_death; } + sock_hold(sk, SOCK_REF_CM_TW); + /* We need to flush the recv. buffs. We do this only on the * descriptor close, not protocol-sourced closes, because the * reader process may not have drained the data yet! @@ -501,27 +526,28 @@ static void sdp_close(struct sock *sk, long timeout) * the FTP client, wheee... Note: timeout is always zero * in such a case. */ - if (data_was_unread) { + if (data_was_unread || + (sock_flag(sk, SOCK_LINGER) && !sk->sk_lingertime)) { /* Unread data was tossed, zap the connection. */ NET_INC_STATS_USER(LINUX_MIB_TCPABORTONCLOSE); - sdp_set_state(sk, TCP_CLOSE); - sdp_send_active_reset(sk, GFP_KERNEL); - } else if (sock_flag(sk, SOCK_LINGER) && !sk->sk_lingertime) { - /* Check zero linger _after_ checking for unread data. */ + sdp_exch_state(sk, TCPF_CLOSE_WAIT | TCPF_ESTABLISHED, + TCP_TIME_WAIT); + + /* Go into abortive close */ sk->sk_prot->disconnect(sk, 0); - NET_INC_STATS_USER(LINUX_MIB_TCPABORTONDATA); } else if (sdp_close_state(sk)) { /* We FIN if the application ate all the data before * zapping the connection. */ - sdp_post_sends(sdp_sk(sk), 0); + sdp_send_disconnect(sk); } /* TODO: state should move to CLOSE or CLOSE_WAIT etc on disconnect. Since it currently doesn't, do it here to avoid blocking below. */ if (!sdp_sk(sk)->id) - sdp_set_state(sk, TCP_CLOSE); + sdp_exch_state(sk, TCPF_FIN_WAIT1 | TCPF_LAST_ACK | + TCPF_CLOSE_WAIT, TCP_CLOSE); sk_stream_wait_close(sk, timeout); @@ -533,7 +559,6 @@ adjudge_to_death: */ lock_sock(sk); - sock_hold(sk); sock_orphan(sk); /* This is a (useful) BSD violating of the RFC. There is a @@ -549,35 +574,21 @@ adjudge_to_death: * consume significant resources. Let's do it with special * linger2 option. --ANK */ - - if (sk->sk_state == TCP_FIN_WAIT2 && - !sk->sk_send_head && - sdp_sk(sk)->tx_head == sdp_sk(sk)->tx_tail) { - sk->sk_state = TCP_CLOSE; - } - - if ((1 << sk->sk_state) & (TCPF_FIN_WAIT1 | TCPF_FIN_WAIT2)) { - sdp_sk(sk)->time_wait = 1; + if (sk->sk_state == TCP_FIN_WAIT1) { /* TODO: liger2 unimplemented. We should wait 3.5 * rto. How do I know rto? */ /* TODO: tcp_fin_time to get timeout */ sdp_dbg(sk, "%s: entering time wait refcnt %d\n", __func__, atomic_read(&sk->sk_refcnt)); atomic_inc(sk->sk_prot->orphan_count); - queue_delayed_work(sdp_workqueue, &sdp_sk(sk)->time_wait_work, - TCP_FIN_TIMEOUT); - goto out; } /* TODO: limit number of orphaned sockets. TCP has sysctl_tcp_mem and sysctl_tcp_max_orphans */ - sock_put(sk); - /* Otherwise, socket is reprieved until protocol close. */ out: - sdp_dbg(sk, "%s: last socket put %d\n", __func__, - atomic_read(&sk->sk_refcnt)); release_sock(sk); + sk_common_release(sk); } @@ -622,7 +633,7 @@ static int sdp_connect(struct sock *sk, struct sockaddr *uaddr, int addr_len) return rc; } - sk->sk_state = TCP_SYN_SENT; + sdp_exch_state(sk, TCPF_CLOSE, TCP_SYN_SENT); return 0; } @@ -635,13 +646,15 @@ static int sdp_disconnect(struct sock *sk, int flags) struct rdma_cm_id *id; sdp_dbg(sk, "%s\n", __func__); - if (ssk->id) - rc = rdma_disconnect(ssk->id); - if (old_state != TCP_LISTEN) + if (old_state != TCP_LISTEN) { + if (ssk->id) + rc = rdma_disconnect(ssk->id); + return rc; + } - sdp_set_state(sk, TCP_CLOSE); + sdp_exch_state(sk, TCPF_LISTEN, TCP_CLOSE); id = ssk->id; ssk->id = NULL; release_sock(sk); /* release socket since locking semantics is parent @@ -827,48 +840,86 @@ static int sdp_ioctl(struct sock *sk, int cmd, unsigned long arg) return put_user(answ, (int __user *)arg); } +static inline void sdp_start_dreq_wait_timeout(struct sdp_sock *ssk, int timeo) +{ + sdp_dbg(&ssk->isk.sk, "Starting dreq wait timeout\n"); + + queue_delayed_work(sdp_workqueue, &ssk->dreq_wait_work, timeo); + ssk->dreq_wait_timeout = 1; +} + +void sdp_cancel_dreq_wait_timeout(struct sdp_sock *ssk) +{ + if (!ssk->dreq_wait_timeout) + return; + + sdp_dbg(&ssk->isk.sk, "cancelling dreq wait timeout\n"); + + ssk->dreq_wait_timeout = 0; + if (cancel_delayed_work(&ssk->dreq_wait_work)) { + /* The timeout hasn't reached - need to clean ref count */ + sock_put(&ssk->isk.sk, SOCK_REF_DREQ_TO); + } + + atomic_dec(ssk->isk.sk.sk_prot->orphan_count); +} + void sdp_destroy_work(struct work_struct *work) { struct sdp_sock *ssk = container_of(work, struct sdp_sock, destroy_work); struct sock *sk = &ssk->isk.sk; sdp_dbg(sk, "%s: refcnt %d\n", __func__, atomic_read(&sk->sk_refcnt)); - cancel_delayed_work(&sdp_sk(sk)->time_wait_work); - atomic_dec(sk->sk_prot->orphan_count); + sdp_cancel_dreq_wait_timeout(ssk); + + if (sk->sk_state == TCP_TIME_WAIT) + sock_put(sk, SOCK_REF_CM_TW); - sock_put(sk); + /* In normal close current state is TCP_TIME_WAIT or TCP_CLOSE + but if a CM connection is dropped below our legs state could + be any state */ + sdp_exch_state(sk, ~0, TCP_CLOSE); + sock_put(sk, SOCK_REF_RESET); } -void sdp_time_wait_work(struct work_struct *work) +void sdp_dreq_wait_timeout_work(struct work_struct *work) { - struct sdp_sock *ssk = container_of(work, struct sdp_sock, time_wait_work.work); + struct sdp_sock *ssk = + container_of(work, struct sdp_sock, dreq_wait_work.work); struct sock *sk = &ssk->isk.sk; + lock_sock(sk); - sdp_dbg(sk, "%s\n", __func__); - if (!sdp_sk(sk)->time_wait) { + if (!sdp_sk(sk)->dreq_wait_timeout || + !((1 << sk->sk_state) & (TCPF_FIN_WAIT1 | TCPF_LAST_ACK))) { release_sock(sk); - return; + goto out; } - sdp_dbg(sk, "%s: refcnt %d\n", __func__, atomic_read(&sk->sk_refcnt)); + sdp_warn(sk, "timed out waiting for FIN/DREQ. " + "going into abortive close.\n"); + + sdp_sk(sk)->dreq_wait_timeout = 0; + + if (sk->sk_state == TCP_FIN_WAIT1) + atomic_dec(ssk->isk.sk.sk_prot->orphan_count); + + sdp_exch_state(sk, TCPF_LAST_ACK | TCPF_FIN_WAIT1, TCP_TIME_WAIT); - sk->sk_state = TCP_CLOSE; - sdp_sk(sk)->time_wait = 0; release_sock(sk); - atomic_dec(sk->sk_prot->orphan_count); - sock_put(sk); -} + if (sdp_sk(sk)->id) { + rdma_disconnect(sdp_sk(sk)->id); + } else { + sdp_warn(sk, "NOT SENDING DREQ - no need to wait for timewait exit\n"); + sock_put(sk, SOCK_REF_CM_TW); + } -void sdp_time_wait_destroy_sk(struct sdp_sock *ssk) -{ - ssk->time_wait = 0; - ssk->isk.sk.sk_state = TCP_CLOSE; - queue_work(sdp_workqueue, &ssk->destroy_work); +out: + sock_put(sk, SOCK_REF_DREQ_TO); } -static int sdp_init_sock(struct sock *sk) +int sdp_init_sock(struct sock *sk) { struct sdp_sock *ssk = sdp_sk(sk); @@ -876,10 +927,18 @@ static int sdp_init_sock(struct sock *sk) INIT_LIST_HEAD(&ssk->accept_queue); INIT_LIST_HEAD(&ssk->backlog_queue); - INIT_DELAYED_WORK(&ssk->time_wait_work, sdp_time_wait_work); + INIT_DELAYED_WORK(&ssk->dreq_wait_work, sdp_dreq_wait_timeout_work); INIT_WORK(&ssk->destroy_work, sdp_destroy_work); sk->sk_route_caps |= NETIF_F_SG | NETIF_F_NO_CSUM; + + ssk->rx_ring = NULL; + ssk->tx_ring = NULL; + ssk->sdp_disconnect = 0; + ssk->destructed_already = 0; + ssk->destruct_in_process = 0; + spin_lock_init(&ssk->lock); + return 0; } @@ -891,15 +950,15 @@ static void sdp_shutdown(struct sock *sk, int how) if (!(how & SEND_SHUTDOWN)) return; - if ((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT)) + /* If we've already sent a FIN, or it's a closed state, skip this. */ + if (!((1 << sk->sk_state) & + (TCPF_ESTABLISHED | TCPF_SYN_SENT | + TCPF_SYN_RECV | TCPF_CLOSE_WAIT))) { return; + } - if (sk->sk_state == TCP_ESTABLISHED) - sk->sk_state = TCP_FIN_WAIT1; - else if (sk->sk_state == TCP_CLOSE_WAIT) - sk->sk_state = TCP_LAST_ACK; - else - return; + if (!sdp_close_state(sk)) + return; /* * Just turn off CORK here. @@ -910,7 +969,7 @@ static void sdp_shutdown(struct sock *sk, int how) if (ssk->nonagle & TCP_NAGLE_OFF) ssk->nonagle |= TCP_NAGLE_PUSH; - sdp_post_sends(ssk, 0); + sdp_send_disconnect(sk); } static void sdp_mark_push(struct sdp_sock *ssk, struct sk_buff *skb) @@ -1203,14 +1262,14 @@ static inline void skb_entail(struct sock *sk, struct sdp_sock *ssk, { skb_header_release(skb); __skb_queue_tail(&sk->sk_write_queue, skb); - sk_charge_skb(sk, skb); + sk_charge_skb(sk, skb); if (!sk->sk_send_head) sk->sk_send_head = skb; if (ssk->nonagle & TCP_NAGLE_PUSH) ssk->nonagle &= ~TCP_NAGLE_PUSH; } -void sdp_push_one(struct sock *sk, unsigned int mss_now) +static void sdp_push_one(struct sock *sk, unsigned int mss_now) { } @@ -1255,14 +1314,14 @@ static struct bzcopy_state *sdp_bz_setup(struct sdp_sock *ssk, { struct bzcopy_state *bz; unsigned long addr; - int done_pages; + int done_pages = 0; int thresh; + mm_segment_t cur_fs; - thresh = ssk->zcopy_thresh ? : sdp_zcopy_thresh; - if (thresh == 0 || len < thresh) - return NULL; + cur_fs = get_fs(); - if (!can_do_mlock()) + thresh = ssk->zcopy_thresh ? : sdp_zcopy_thresh; + if (thresh == 0 || len < thresh || !capable(CAP_IPC_LOCK)) return NULL; /* @@ -1276,7 +1335,7 @@ static struct bzcopy_state *sdp_bz_setup(struct sdp_sock *ssk, bz = kzalloc(sizeof(*bz), GFP_KERNEL); if (!bz) - return NULL; + return ERR_PTR(-ENOMEM); addr = (unsigned long)base; @@ -1289,34 +1348,41 @@ static struct bzcopy_state *sdp_bz_setup(struct sdp_sock *ssk, bz->page_cnt = PAGE_ALIGN(len + bz->cur_offset) >> PAGE_SHIFT; bz->pages = kcalloc(bz->page_cnt, sizeof(struct page *), GFP_KERNEL); - if (!bz->pages) - goto out_1; - - down_write(¤t->mm->mmap_sem); + if (!bz->pages) { + kfree(bz); + return ERR_PTR(-ENOMEM); + } - if (!capable(CAP_IPC_LOCK)) - goto out_2; addr &= PAGE_MASK; - - done_pages = get_user_pages(current, current->mm, addr, bz->page_cnt, - 0, 0, bz->pages, NULL); + if (segment_eq(cur_fs, KERNEL_DS)) { + for (done_pages = 0; done_pages < bz->page_cnt; done_pages++) { + bz->pages[done_pages] = virt_to_page(addr); + if (!bz->pages[done_pages]) + break; + get_page(bz->pages[done_pages]); + addr += PAGE_SIZE; + } + } else { + if (current->mm) { + down_write(¤t->mm->mmap_sem); + done_pages = get_user_pages(current, current->mm, addr, + bz->page_cnt, 0, 0, bz->pages, NULL); + up_write(¤t->mm->mmap_sem); + } + } if (unlikely(done_pages != bz->page_cnt)){ - bz->page_cnt = done_pages; - goto out_2; + int i; + if (done_pages > 0) { + for (i = 0; i < done_pages; i++) + put_page(bz->pages[i]); + } + kfree(bz->pages); + kfree(bz); + bz = ERR_PTR(-EFAULT); } - up_write(¤t->mm->mmap_sem); - return bz; - -out_2: - up_write(¤t->mm->mmap_sem); - kfree(bz->pages); -out_1: - kfree(bz); - - return NULL; } @@ -1442,11 +1508,13 @@ static inline int sdp_bzcopy_get(struct sock *sk, struct sk_buff *skb, if (!sk_stream_wmem_schedule(sk, copy)) return SDP_DO_WAIT_MEM; + get_page(bz->pages[bz->cur_page]); skb_fill_page_desc(skb, skb_shinfo(skb)->nr_frags, bz->pages[bz->cur_page], bz->cur_offset, this_page); BUG_ON(skb_shinfo(skb)->nr_frags >= MAX_SKB_FRAGS); + BUG_ON(bz->cur_offset > PAGE_SIZE); bz->cur_offset += this_page; if (bz->cur_offset == PAGE_SIZE) { @@ -1454,11 +1522,6 @@ static inline int sdp_bzcopy_get(struct sock *sk, struct sk_buff *skb, bz->cur_page++; BUG_ON(bz->cur_page > bz->page_cnt); - } else { - BUG_ON(bz->cur_offset > PAGE_SIZE); - - if (bz->cur_page != bz->page_cnt || left != this_page) - get_page(bz->pages[bz->cur_page]); } left -= this_page; @@ -1576,7 +1639,7 @@ void sdp_bzcopy_write_space(struct sdp_sock *ssk) /* Like tcp_sendmsg */ /* TODO: check locking */ -int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, +static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, size_t size) { struct iovec *iov; @@ -1620,11 +1683,18 @@ int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, iov++; - /* Limiting the size_goal is required when using 64K pages*/ + /* Limmiting the size_goal is reqired when using 64K pages*/ if (size_goal > SDP_MAX_PAYLOAD) size_goal = SDP_MAX_PAYLOAD; + if (bz) + sdp_bz_cleanup(bz); bz = sdp_bz_setup(ssk, from, seglen, size_goal); + if (IS_ERR(bz)) { + bz = NULL; + err = PTR_ERR(bz); + goto do_error; + } while (seglen > 0) { int copy; @@ -1922,7 +1992,6 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, } } if (!(flags & MSG_TRUNC)) { - int err; err = skb_copy_datagram_iovec(skb, offset, /* TODO: skip header? */ msg->msg_iov, used); @@ -1994,14 +2063,14 @@ static int sdp_listen(struct sock *sk, int backlog) sdp_warn(sk, "rdma_listen failed: %d\n", rc); sdp_set_error(sk, rc); } else - sk->sk_state = TCP_LISTEN; + sdp_exch_state(sk, TCPF_CLOSE, TCP_LISTEN); return rc; } /* We almost could use inet_listen, but that calls inet_csk_listen_start. Longer term we'll want to add a listen callback to struct proto, similiar to bind. */ -int sdp_inet_listen(struct socket *sock, int backlog) +static int sdp_inet_listen(struct socket *sock, int backlog) { struct sock *sk = sock->sk; unsigned char old_state; @@ -2174,6 +2243,8 @@ static int sdp_create_socket(struct socket *sock, int protocol) sock_init_data(sock, sk); sk->sk_protocol = 0x0 /* TODO: inherit tcp socket to use IPPROTO_TCP */; + memset((struct inet_sock *)sk + 1, 0, + sizeof(struct sdp_sock) - sizeof(struct inet_sock)); rc = sdp_init_sock(sk); if (rc) { sdp_warn(sk, "SDP: failed to init sock.\n"); @@ -2223,7 +2294,7 @@ static void *sdp_seq_start(struct seq_file *seq, loff_t *pos) spin_lock_irq(&sock_list_lock); start = sdp_get_idx(seq, *pos - 1); if (start) - sock_hold((struct sock *)start); + sock_hold((struct sock *)start, SOCK_REF_SEQ); spin_unlock_irq(&sock_list_lock); return start; @@ -2240,7 +2311,7 @@ static void *sdp_seq_next(struct seq_file *seq, void *v, loff_t *pos) else next = sdp_get_idx(seq, *pos); if (next) - sock_hold((struct sock *)next); + sock_hold((struct sock *)next, SOCK_REF_SEQ); spin_unlock_irq(&sock_list_lock); *pos += 1; @@ -2271,7 +2342,7 @@ static int sdp_seq_show(struct seq_file *seq, void *v) if (v == SEQ_START_TOKEN) { seq_printf(seq, "%-*s\n", TMPSZ - 1, " sl local_address rem_address uid inode" - " rx_queue tx_queue"); + " rx_queue tx_queue state"); goto out; } @@ -2286,13 +2357,13 @@ static int sdp_seq_show(struct seq_file *seq, void *v) rx_queue = sdp_sk(sk)->rcv_nxt - sdp_sk(sk)->copied_seq; tx_queue = sdp_sk(sk)->write_seq - sdp_sk(sk)->snd_una; - sprintf(tmpbuf, "%4d: %08X:%04X %08X:%04X %5d %lu %08X:%08X", + sprintf(tmpbuf, "%4d: %08X:%04X %08X:%04X %5d %lu %08X:%08X %X", st->num, src, srcp, dest, destp, uid, inode, - rx_queue, tx_queue); + rx_queue, tx_queue, sk->sk_state); seq_printf(seq, "%-*s\n", TMPSZ - 1, tmpbuf); - sock_put(sk); + sock_put(sk, SOCK_REF_SEQ); out: return 0; } @@ -2350,7 +2421,7 @@ static int __init sdp_proc_init(void) sdp_seq_afinfo.seq_fops->llseek = seq_lseek; sdp_seq_afinfo.seq_fops->release = seq_release_private; - p = proc_net_fops_create(sdp_seq_afinfo.name, S_IRUGO, sdp_seq_afinfo.seq_fops); + p = proc_net_fops_create(sdp_seq_afinfo.name, S_IRUGO, sdp_seq_afinfo.seq_fops); if (p) p->data = &sdp_seq_afinfo; else @@ -2411,13 +2482,31 @@ do_next: } } +kill_socks: list_for_each(p, &sock_list) { ssk = list_entry(p, struct sdp_sock, sock_list); - if (ssk->ib_device == device) { + + if (ssk->ib_device == device && !ssk->destruct_in_process) { + ssk->destruct_in_process = 1; sk = &ssk->isk.sk; + sdp_cancel_dreq_wait_timeout(ssk); + + spin_unlock_irq(&sock_list_lock); + sk->sk_shutdown |= RCV_SHUTDOWN; sdp_reset(sk); + if ((1 << sk->sk_state) & + (TCPF_FIN_WAIT1 | TCPF_CLOSE_WAIT | + TCPF_LAST_ACK | TCPF_TIME_WAIT)) { + sock_put(sk, SOCK_REF_CM_TW); + } + + schedule(); + + spin_lock_irq(&sock_list_lock); + + goto kill_socks; } } @@ -2432,7 +2521,7 @@ static struct net_proto_family sdp_net_proto = { .owner = THIS_MODULE, }; -struct ib_client sdp_client = { +static struct ib_client sdp_client = { .name = "sdp", .add = sdp_add_device, .remove = sdp_remove_device