Skip to content

Commit

Permalink
Code improvements. Fix cresh on cluster reattach.
Browse files Browse the repository at this point in the history
  • Loading branch information
cardigliano committed Oct 9, 2023
1 parent 5b11e71 commit 33af829
Showing 1 changed file with 98 additions and 60 deletions.
158 changes: 98 additions & 60 deletions kernel/pf_ring.c
Original file line number Diff line number Diff line change
Expand Up @@ -4278,7 +4278,7 @@ int pf_ring_skb_ring_handler(struct sk_buff *skb,

if(num_cluster_queues > 0) {
u_short num_iterations;
u_int32_t cluster_queue_id;
u_int32_t cluster_queue_id, cluster_1st_queue_id;
u_int8_t ip_with_dup_sent = 0;
int sent_on_queue_id = -1;

Expand Down Expand Up @@ -4323,6 +4323,7 @@ int pf_ring_skb_ring_handler(struct sk_buff *skb,
}

cluster_queue_id = skb_hash % num_logical_cluster_queues;
cluster_1st_queue_id = cluster_queue_id;

iterate_cluster_elements:
/*
Expand All @@ -4337,8 +4338,10 @@ int pf_ring_skb_ring_handler(struct sk_buff *skb,
num_iterations++) {
struct sock *skElement = cluster_ptr->cluster.sk[cluster_queue_id];

if(skElement != NULL
&& (sent_on_queue_id < 0 || sent_on_queue_id != cluster_queue_id /* do not send twice on the same queue */)) {
if(/* Check if there is a producer running on this queue id */
skElement != NULL
/* Do not send twice on the same queue in case of cluster_per_flow_ip_with_dup_tuple */
&& (sent_on_queue_id < 0 || sent_on_queue_id != cluster_queue_id)) {
struct pf_ring_socket *pfr = ring_sk(skElement);

if(pfr != NULL
Expand Down Expand Up @@ -4370,9 +4373,10 @@ int pf_ring_skb_ring_handler(struct sk_buff *skb,
sent_on_queue_id = cluster_queue_id;
break;

} else if((cluster_ptr->cluster.hashing_mode != cluster_round_robin)
} else if(!(cluster_ptr->cluster.hashing_mode == cluster_round_robin ||
cluster_ptr->cluster.relaxed_distribution)
/* We're the last element of the cluster so no further cluster element to check */
|| ((num_iterations + 1) >= num_logical_cluster_queues)) {
|| (num_iterations + 1) >= num_logical_cluster_queues) {
pfr->slots_info->tot_pkts++, pfr->slots_info->tot_lost++;
}
}
Expand All @@ -4386,16 +4390,16 @@ int pf_ring_skb_ring_handler(struct sk_buff *skb,
}
} /* for */

if((cluster_ptr->cluster.hashing_mode == cluster_per_flow_ip_with_dup_tuple)
if(cluster_ptr->cluster.hashing_mode == cluster_per_flow_ip_with_dup_tuple
&& !ip_with_dup_sent) {
u_int32_t new_cluster_queue_id = hash_pkt_header(&hdr, HASH_PKT_HDR_MASK_SRC | HASH_PKT_HDR_MASK_MAC
u_int32_t cluster_2nd_queue_id = hash_pkt_header(&hdr, HASH_PKT_HDR_MASK_SRC | HASH_PKT_HDR_MASK_MAC
| HASH_PKT_HDR_MASK_PROTO | HASH_PKT_HDR_MASK_PORT
| HASH_PKT_HDR_RECOMPUTE | HASH_PKT_HDR_MASK_VLAN);

new_cluster_queue_id %= num_logical_cluster_queues;

if(new_cluster_queue_id != cluster_queue_id) {
cluster_queue_id = new_cluster_queue_id;
cluster_2nd_queue_id %= num_logical_cluster_queues;
if(cluster_2nd_queue_id != cluster_1st_queue_id &&
cluster_2nd_queue_id != sent_on_queue_id) {
cluster_queue_id = cluster_2nd_queue_id;
ip_with_dup_sent = 1;
goto iterate_cluster_elements;
}
Expand Down Expand Up @@ -6111,6 +6115,25 @@ unsigned int ring_poll(struct file *file,

/* ************************************* */

ring_cluster_element *cluster_lookup(u_int32_t cluster_id) {
ring_cluster_element *cluster_el;
u_int32_t last_list_idx;

cluster_el = (ring_cluster_element*) lockless_list_get_first(&ring_cluster_list, &last_list_idx);

while(cluster_el != NULL) {

if(cluster_el->cluster.cluster_id == cluster_id)
return cluster_el;

cluster_el = (ring_cluster_element*) lockless_list_get_next(&ring_cluster_list, &last_list_idx);
}

return NULL;
}

/* ************************************* */

int get_first_available_cluster_queue(ring_cluster_element *el)
{
int i;
Expand All @@ -6124,6 +6147,23 @@ int get_first_available_cluster_queue(ring_cluster_element *el)

/* ************************************* */

struct pf_ring_socket *get_first_cluster_consumer(ring_cluster_element *el)
{
int i;

for (i = 0; i < MAX_CLUSTER_QUEUES; i++) {
if (el->cluster.sk[i] != NULL) {
struct sock *first_sk = el->cluster.sk[i];
struct pf_ring_socket *pfr = ring_sk(first_sk);
return pfr;
}
}

return NULL;
}

/* ************************************* */

int add_sock_to_cluster_list(ring_cluster_element *el, struct sock *sk, u_int16_t consumer_id)
{
struct pf_ring_socket *pfr = ring_sk(sk);
Expand All @@ -6136,9 +6176,8 @@ int add_sock_to_cluster_list(ring_cluster_element *el, struct sock *sk, u_int16_

if (el->cluster.num_cluster_queues > 0) {
/* There is already some consumer, checking compatibility */
struct sock *first_sk = el->cluster.sk[0];
struct pf_ring_socket *first_pfr = ring_sk(first_sk);
if (!bitmap_equal(first_pfr->pf_dev_mask, pfr->pf_dev_mask, MAX_NUM_DEV_IDX)) {
struct pf_ring_socket *first_pfr = get_first_cluster_consumer(el);
if (first_pfr && !bitmap_equal(first_pfr->pf_dev_mask, pfr->pf_dev_mask, MAX_NUM_DEV_IDX)) {
printk("[PF_RING] Error: adding sockets with different interfaces to cluster %u\n",
el->cluster.cluster_id);
return(-EINVAL);
Expand All @@ -6154,10 +6193,14 @@ int add_sock_to_cluster_list(ring_cluster_element *el, struct sock *sk, u_int16_
/* ID provided, check if available */
if (el->cluster.sk[consumer_id] == NULL)
queue_id = consumer_id;
else {
printk("[PF_RING] Error: cluster queues %d in use\n", consumer_id);
return(-EBUSY);
}
}

if (queue_id < 0 || el->cluster.sk[queue_id] != NULL) {
printk("[PF_RING] Error: cluster queue is not available or all queues are in use\n");
if (queue_id < 0) {
printk("[PF_RING] Error: all cluster queues are in use\n");
return(-EINVAL);
}

Expand All @@ -6173,15 +6216,15 @@ int add_sock_to_cluster_list(ring_cluster_element *el, struct sock *sk, u_int16_

/* ************************************* */

int remove_from_cluster_list(struct ring_cluster *el, struct sock *sock)
int remove_from_cluster_list(struct ring_cluster *cluster_ptr, struct sock *sock)
{
int i;

for(i = 0; i < MAX_CLUSTER_QUEUES; i++) {
if(el->sk[i] == sock) {
if(cluster_ptr->sk[i] == sock) {
/* Found - removing */
el->num_cluster_queues--;
el->sk[i] = NULL;
cluster_ptr->num_cluster_queues--;
cluster_ptr->sk[i] = NULL;
return(0);
}
}
Expand All @@ -6193,36 +6236,37 @@ int remove_from_cluster_list(struct ring_cluster *el, struct sock *sock)

static int remove_from_cluster(struct sock *sock, struct pf_ring_socket *pfr)
{
ring_cluster_element *cluster_ptr;
u_int32_t last_list_idx;
ring_cluster_element *cluster_el;

debug_printk(2, "--> remove_from_cluster(%d)\n", pfr->cluster_id);
//printk("remove_from_cluster(%d)\n", pfr->cluster_id);

if(pfr->cluster_id == 0 /* 0 = No Cluster */ )
return(0); /* Nothing to do */
return(0); /* Nothing to do */

write_lock_bh(&ring_cluster_lock);

cluster_ptr = (ring_cluster_element*)lockless_list_get_first(&ring_cluster_list, &last_list_idx);
cluster_el = cluster_lookup(pfr->cluster_id);

while(cluster_ptr != NULL) {
if(cluster_ptr->cluster.cluster_id == pfr->cluster_id) {
int ret = remove_from_cluster_list(&cluster_ptr->cluster, sock);
if (cluster_el) {
int ret = remove_from_cluster_list(&cluster_el->cluster, sock);

if(cluster_ptr->cluster.num_cluster_queues == 0) {
lockless_list_remove(&ring_cluster_list, cluster_ptr);
lockless_list_add(&delayed_memory_table, cluster_ptr); /* Free later */
}
printk("[PF_RING] Cluster %u consumer detached (active consumers: %u)\n",
pfr->cluster_id, cluster_el->cluster.num_cluster_queues);

write_unlock_bh(&ring_cluster_lock);
return ret;
if(cluster_el->cluster.num_cluster_queues == 0) {
printk("[PF_RING] Last cluster consumer detached, removing cluster %u\n", pfr->cluster_id);
lockless_list_remove(&ring_cluster_list, cluster_el);
lockless_list_add(&delayed_memory_table, cluster_el); /* Free later */
}

cluster_ptr = (ring_cluster_element*)lockless_list_get_next(&ring_cluster_list, &last_list_idx);
pfr->cluster_id = 0;

write_unlock_bh(&ring_cluster_lock);
return ret;
}

write_unlock_bh(&ring_cluster_lock);
return(-EINVAL); /* Not found */
return(-EINVAL); /* Not found */
}

/* ************************************* */
Expand Down Expand Up @@ -6277,8 +6321,7 @@ static int add_sock_to_cluster(struct sock *sock,
struct pf_ring_socket *pfr,
struct add_to_cluster *consumer_info)
{
ring_cluster_element *cluster_ptr;
u_int32_t last_list_idx;
ring_cluster_element *cluster_el;
int rc;

debug_printk(2, "--> add_sock_to_cluster(%d)\n", consumer_info->cluster_id);
Expand All @@ -6291,41 +6334,36 @@ static int add_sock_to_cluster(struct sock *sock,

write_lock_bh(&ring_cluster_lock);

cluster_ptr = (ring_cluster_element*)lockless_list_get_first(&ring_cluster_list, &last_list_idx);
cluster_el = cluster_lookup(consumer_info->cluster_id);

while(cluster_ptr != NULL) {
if(cluster_ptr->cluster.cluster_id == consumer_info->cluster_id) {
if (cluster_el) {
/* Cluster already present, adding socket */
rc = add_sock_to_cluster_list(cluster_el, sock, consumer_info->queue_id);

/* Cluster already present, adding socket */
rc = add_sock_to_cluster_list(cluster_ptr, sock, consumer_info->queue_id);

write_unlock_bh(&ring_cluster_lock);
return(rc);
}

cluster_ptr = (ring_cluster_element*)lockless_list_get_next(&ring_cluster_list, &last_list_idx);
write_unlock_bh(&ring_cluster_lock);
return(rc);
}

/* The cluster does not exist, creating it.. */

if((cluster_ptr = kmalloc(sizeof(ring_cluster_element), GFP_KERNEL)) == NULL) {
if((cluster_el = kmalloc(sizeof(ring_cluster_element), GFP_KERNEL)) == NULL) {
write_unlock_bh(&ring_cluster_lock);
return(-ENOMEM);
}

INIT_LIST_HEAD(&cluster_ptr->list);
INIT_LIST_HEAD(&cluster_el->list);

cluster_ptr->cluster.cluster_id = consumer_info->cluster_id;
cluster_ptr->cluster.num_cluster_queues = 0;
cluster_ptr->cluster.max_queue_index = 0;
cluster_ptr->cluster.relaxed_distribution = !!(consumer_info->options & CLUSTER_OPTION_RELAXED_DISTRIBUTION);
cluster_ptr->cluster.hashing_mode = consumer_info->the_type; /* Default */
cluster_ptr->cluster.hashing_id = 0;
memset(cluster_ptr->cluster.sk, 0, sizeof(cluster_ptr->cluster.sk));
cluster_el->cluster.cluster_id = consumer_info->cluster_id;
cluster_el->cluster.num_cluster_queues = 0;
cluster_el->cluster.max_queue_index = 0;
cluster_el->cluster.relaxed_distribution = !!(consumer_info->options & CLUSTER_OPTION_RELAXED_DISTRIBUTION);
cluster_el->cluster.hashing_mode = consumer_info->the_type; /* Default */
cluster_el->cluster.hashing_id = 0;
memset(cluster_el->cluster.sk, 0, sizeof(cluster_el->cluster.sk));

rc = add_sock_to_cluster_list(cluster_ptr, sock, consumer_info->queue_id);
rc = add_sock_to_cluster_list(cluster_el, sock, consumer_info->queue_id);

lockless_list_add(&ring_cluster_list, cluster_ptr);
lockless_list_add(&ring_cluster_list, cluster_el);

write_unlock_bh(&ring_cluster_lock);

Expand Down

0 comments on commit 33af829

Please sign in to comment.