diff --git a/kernel/pf_ring.c b/kernel/pf_ring.c index 750e27088a..cbaa8d61f6 100644 --- a/kernel/pf_ring.c +++ b/kernel/pf_ring.c @@ -4278,7 +4278,7 @@ int pf_ring_skb_ring_handler(struct sk_buff *skb, if(num_cluster_queues > 0) { u_short num_iterations; - u_int32_t cluster_queue_id; + u_int32_t cluster_queue_id, cluster_1st_queue_id; u_int8_t ip_with_dup_sent = 0; int sent_on_queue_id = -1; @@ -4323,6 +4323,7 @@ int pf_ring_skb_ring_handler(struct sk_buff *skb, } cluster_queue_id = skb_hash % num_logical_cluster_queues; + cluster_1st_queue_id = cluster_queue_id; iterate_cluster_elements: /* @@ -4337,8 +4338,10 @@ int pf_ring_skb_ring_handler(struct sk_buff *skb, num_iterations++) { struct sock *skElement = cluster_ptr->cluster.sk[cluster_queue_id]; - if(skElement != NULL - && (sent_on_queue_id < 0 || sent_on_queue_id != cluster_queue_id /* do not send twice on the same queue */)) { + if(/* Check if there is a producer running on this queue id */ + skElement != NULL + /* Do not send twice on the same queue in case of cluster_per_flow_ip_with_dup_tuple */ + && (sent_on_queue_id < 0 || sent_on_queue_id != cluster_queue_id)) { struct pf_ring_socket *pfr = ring_sk(skElement); if(pfr != NULL @@ -4370,9 +4373,10 @@ int pf_ring_skb_ring_handler(struct sk_buff *skb, sent_on_queue_id = cluster_queue_id; break; - } else if((cluster_ptr->cluster.hashing_mode != cluster_round_robin) + } else if(!(cluster_ptr->cluster.hashing_mode == cluster_round_robin || + cluster_ptr->cluster.relaxed_distribution) /* We're the last element of the cluster so no further cluster element to check */ - || ((num_iterations + 1) >= num_logical_cluster_queues)) { + || (num_iterations + 1) >= num_logical_cluster_queues) { pfr->slots_info->tot_pkts++, pfr->slots_info->tot_lost++; } } @@ -4386,16 +4390,16 @@ int pf_ring_skb_ring_handler(struct sk_buff *skb, } } /* for */ - if((cluster_ptr->cluster.hashing_mode == cluster_per_flow_ip_with_dup_tuple) + if(cluster_ptr->cluster.hashing_mode == cluster_per_flow_ip_with_dup_tuple && !ip_with_dup_sent) { - u_int32_t new_cluster_queue_id = hash_pkt_header(&hdr, HASH_PKT_HDR_MASK_SRC | HASH_PKT_HDR_MASK_MAC + u_int32_t cluster_2nd_queue_id = hash_pkt_header(&hdr, HASH_PKT_HDR_MASK_SRC | HASH_PKT_HDR_MASK_MAC | HASH_PKT_HDR_MASK_PROTO | HASH_PKT_HDR_MASK_PORT | HASH_PKT_HDR_RECOMPUTE | HASH_PKT_HDR_MASK_VLAN); - new_cluster_queue_id %= num_logical_cluster_queues; - - if(new_cluster_queue_id != cluster_queue_id) { - cluster_queue_id = new_cluster_queue_id; + cluster_2nd_queue_id %= num_logical_cluster_queues; + if(cluster_2nd_queue_id != cluster_1st_queue_id && + cluster_2nd_queue_id != sent_on_queue_id) { + cluster_queue_id = cluster_2nd_queue_id; ip_with_dup_sent = 1; goto iterate_cluster_elements; } @@ -6111,6 +6115,25 @@ unsigned int ring_poll(struct file *file, /* ************************************* */ +ring_cluster_element *cluster_lookup(u_int32_t cluster_id) { + ring_cluster_element *cluster_el; + u_int32_t last_list_idx; + + cluster_el = (ring_cluster_element*) lockless_list_get_first(&ring_cluster_list, &last_list_idx); + + while(cluster_el != NULL) { + + if(cluster_el->cluster.cluster_id == cluster_id) + return cluster_el; + + cluster_el = (ring_cluster_element*) lockless_list_get_next(&ring_cluster_list, &last_list_idx); + } + + return NULL; +} + +/* ************************************* */ + int get_first_available_cluster_queue(ring_cluster_element *el) { int i; @@ -6124,6 +6147,23 @@ int get_first_available_cluster_queue(ring_cluster_element *el) /* ************************************* */ +struct pf_ring_socket *get_first_cluster_consumer(ring_cluster_element *el) +{ + int i; + + for (i = 0; i < MAX_CLUSTER_QUEUES; i++) { + if (el->cluster.sk[i] != NULL) { + struct sock *first_sk = el->cluster.sk[i]; + struct pf_ring_socket *pfr = ring_sk(first_sk); + return pfr; + } + } + + return NULL; +} + +/* ************************************* */ + int add_sock_to_cluster_list(ring_cluster_element *el, struct sock *sk, u_int16_t consumer_id) { struct pf_ring_socket *pfr = ring_sk(sk); @@ -6136,9 +6176,8 @@ int add_sock_to_cluster_list(ring_cluster_element *el, struct sock *sk, u_int16_ if (el->cluster.num_cluster_queues > 0) { /* There is already some consumer, checking compatibility */ - struct sock *first_sk = el->cluster.sk[0]; - struct pf_ring_socket *first_pfr = ring_sk(first_sk); - if (!bitmap_equal(first_pfr->pf_dev_mask, pfr->pf_dev_mask, MAX_NUM_DEV_IDX)) { + struct pf_ring_socket *first_pfr = get_first_cluster_consumer(el); + if (first_pfr && !bitmap_equal(first_pfr->pf_dev_mask, pfr->pf_dev_mask, MAX_NUM_DEV_IDX)) { printk("[PF_RING] Error: adding sockets with different interfaces to cluster %u\n", el->cluster.cluster_id); return(-EINVAL); @@ -6154,10 +6193,14 @@ int add_sock_to_cluster_list(ring_cluster_element *el, struct sock *sk, u_int16_ /* ID provided, check if available */ if (el->cluster.sk[consumer_id] == NULL) queue_id = consumer_id; + else { + printk("[PF_RING] Error: cluster queues %d in use\n", consumer_id); + return(-EBUSY); + } } - if (queue_id < 0 || el->cluster.sk[queue_id] != NULL) { - printk("[PF_RING] Error: cluster queue is not available or all queues are in use\n"); + if (queue_id < 0) { + printk("[PF_RING] Error: all cluster queues are in use\n"); return(-EINVAL); } @@ -6173,15 +6216,15 @@ int add_sock_to_cluster_list(ring_cluster_element *el, struct sock *sk, u_int16_ /* ************************************* */ -int remove_from_cluster_list(struct ring_cluster *el, struct sock *sock) +int remove_from_cluster_list(struct ring_cluster *cluster_ptr, struct sock *sock) { int i; for(i = 0; i < MAX_CLUSTER_QUEUES; i++) { - if(el->sk[i] == sock) { + if(cluster_ptr->sk[i] == sock) { /* Found - removing */ - el->num_cluster_queues--; - el->sk[i] = NULL; + cluster_ptr->num_cluster_queues--; + cluster_ptr->sk[i] = NULL; return(0); } } @@ -6193,36 +6236,37 @@ int remove_from_cluster_list(struct ring_cluster *el, struct sock *sock) static int remove_from_cluster(struct sock *sock, struct pf_ring_socket *pfr) { - ring_cluster_element *cluster_ptr; - u_int32_t last_list_idx; + ring_cluster_element *cluster_el; - debug_printk(2, "--> remove_from_cluster(%d)\n", pfr->cluster_id); + //printk("remove_from_cluster(%d)\n", pfr->cluster_id); if(pfr->cluster_id == 0 /* 0 = No Cluster */ ) - return(0); /* Nothing to do */ + return(0); /* Nothing to do */ write_lock_bh(&ring_cluster_lock); - cluster_ptr = (ring_cluster_element*)lockless_list_get_first(&ring_cluster_list, &last_list_idx); + cluster_el = cluster_lookup(pfr->cluster_id); - while(cluster_ptr != NULL) { - if(cluster_ptr->cluster.cluster_id == pfr->cluster_id) { - int ret = remove_from_cluster_list(&cluster_ptr->cluster, sock); + if (cluster_el) { + int ret = remove_from_cluster_list(&cluster_el->cluster, sock); - if(cluster_ptr->cluster.num_cluster_queues == 0) { - lockless_list_remove(&ring_cluster_list, cluster_ptr); - lockless_list_add(&delayed_memory_table, cluster_ptr); /* Free later */ - } + printk("[PF_RING] Cluster %u consumer detached (active consumers: %u)\n", + pfr->cluster_id, cluster_el->cluster.num_cluster_queues); - write_unlock_bh(&ring_cluster_lock); - return ret; + if(cluster_el->cluster.num_cluster_queues == 0) { + printk("[PF_RING] Last cluster consumer detached, removing cluster %u\n", pfr->cluster_id); + lockless_list_remove(&ring_cluster_list, cluster_el); + lockless_list_add(&delayed_memory_table, cluster_el); /* Free later */ } - cluster_ptr = (ring_cluster_element*)lockless_list_get_next(&ring_cluster_list, &last_list_idx); + pfr->cluster_id = 0; + + write_unlock_bh(&ring_cluster_lock); + return ret; } write_unlock_bh(&ring_cluster_lock); - return(-EINVAL); /* Not found */ + return(-EINVAL); /* Not found */ } /* ************************************* */ @@ -6277,8 +6321,7 @@ static int add_sock_to_cluster(struct sock *sock, struct pf_ring_socket *pfr, struct add_to_cluster *consumer_info) { - ring_cluster_element *cluster_ptr; - u_int32_t last_list_idx; + ring_cluster_element *cluster_el; int rc; debug_printk(2, "--> add_sock_to_cluster(%d)\n", consumer_info->cluster_id); @@ -6291,41 +6334,36 @@ static int add_sock_to_cluster(struct sock *sock, write_lock_bh(&ring_cluster_lock); - cluster_ptr = (ring_cluster_element*)lockless_list_get_first(&ring_cluster_list, &last_list_idx); + cluster_el = cluster_lookup(consumer_info->cluster_id); - while(cluster_ptr != NULL) { - if(cluster_ptr->cluster.cluster_id == consumer_info->cluster_id) { + if (cluster_el) { + /* Cluster already present, adding socket */ + rc = add_sock_to_cluster_list(cluster_el, sock, consumer_info->queue_id); - /* Cluster already present, adding socket */ - rc = add_sock_to_cluster_list(cluster_ptr, sock, consumer_info->queue_id); - - write_unlock_bh(&ring_cluster_lock); - return(rc); - } - - cluster_ptr = (ring_cluster_element*)lockless_list_get_next(&ring_cluster_list, &last_list_idx); + write_unlock_bh(&ring_cluster_lock); + return(rc); } /* The cluster does not exist, creating it.. */ - if((cluster_ptr = kmalloc(sizeof(ring_cluster_element), GFP_KERNEL)) == NULL) { + if((cluster_el = kmalloc(sizeof(ring_cluster_element), GFP_KERNEL)) == NULL) { write_unlock_bh(&ring_cluster_lock); return(-ENOMEM); } - INIT_LIST_HEAD(&cluster_ptr->list); + INIT_LIST_HEAD(&cluster_el->list); - cluster_ptr->cluster.cluster_id = consumer_info->cluster_id; - cluster_ptr->cluster.num_cluster_queues = 0; - cluster_ptr->cluster.max_queue_index = 0; - cluster_ptr->cluster.relaxed_distribution = !!(consumer_info->options & CLUSTER_OPTION_RELAXED_DISTRIBUTION); - cluster_ptr->cluster.hashing_mode = consumer_info->the_type; /* Default */ - cluster_ptr->cluster.hashing_id = 0; - memset(cluster_ptr->cluster.sk, 0, sizeof(cluster_ptr->cluster.sk)); + cluster_el->cluster.cluster_id = consumer_info->cluster_id; + cluster_el->cluster.num_cluster_queues = 0; + cluster_el->cluster.max_queue_index = 0; + cluster_el->cluster.relaxed_distribution = !!(consumer_info->options & CLUSTER_OPTION_RELAXED_DISTRIBUTION); + cluster_el->cluster.hashing_mode = consumer_info->the_type; /* Default */ + cluster_el->cluster.hashing_id = 0; + memset(cluster_el->cluster.sk, 0, sizeof(cluster_el->cluster.sk)); - rc = add_sock_to_cluster_list(cluster_ptr, sock, consumer_info->queue_id); + rc = add_sock_to_cluster_list(cluster_el, sock, consumer_info->queue_id); - lockless_list_add(&ring_cluster_list, cluster_ptr); + lockless_list_add(&ring_cluster_list, cluster_el); write_unlock_bh(&ring_cluster_lock);