-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathputla_queue.c
177 lines (167 loc) · 7.33 KB
/
putla_queue.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#include "lci.h"
#include <stdio.h>
#include <assert.h>
#include <string.h>
int main(int argc, char** args)
{
// Number of messages to send
int num_msgs = 10;
int msgs_size = 65536;
if (argc > 1) num_msgs = atoi(args[1]);
if (argc > 2) msgs_size = atoi(args[1]);
// Call `LCI_initialize` to initialize the runtime
LCI_initialize();
// Initialize a device. A LCI device is associated with a set of communication
// resources (matching table, low-level network resources, etc).
// Alternatively, users can use LCI_UR_DEVICE, which
// has been initialized by the runtime in LCI_initialize().
LCI_device_t device;
LCI_device_init(&device);
// We can use one completion queue for both sends and receives.
LCI_comp_t cq;
LCI_queue_create(device, &cq);
// Initialize an endpoint. Alternatively, users can use LCI_UR_ENDPOINT,
// which has been initialized by the runtime in LCI_initialize().
LCI_endpoint_t ep;
// We need a property list to initialize the endpoint. Multiple properties of
// the endpoint can be specified by the property list.
// Currently, a LCI endpoint is not associated with any low-level
// communication resources, it is just a way to specify a bunch of
// configurations.
LCI_plist_t plist;
LCI_plist_create(&plist);
// We set the completion mechanism on the sender side to be completion queue.
LCI_plist_set_comp_type(plist, LCI_PORT_COMMAND, LCI_COMPLETION_QUEUE);
// We set the completion mechanism on the receiver side to be completion
// queue. This also sets the completion type of LCI_DEFAULT_COMP_REMOTE.
LCI_plist_set_comp_type(plist, LCI_PORT_MESSAGE, LCI_COMPLETION_QUEUE);
// Set the default completion object to be triggered by
// LCI_DEFAULT_COMP_REMOTE.
LCI_plist_set_default_comp(plist, cq);
// Matching rule does not matter here any more, as we are not using send/recv
// LCI_plist_set_match_type(plist, LCI_MATCH_RANKTAG);
LCI_endpoint_init(&ep, device, plist);
LCI_plist_free(&plist);
// For putla, we only need to allocate the send buffer. The receiver buffer
// will be allocated by the LCI runtime and delivered to the users through
// the completion object.
LCI_lbuffer_t src_buf;
src_buf.address = malloc(msgs_size);
src_buf.length = msgs_size;
memset(src_buf.address, 'a' + LCI_RANK, msgs_size);
// Users need to explicitly register/deregister the send buffer.
LCI_memory_register(device, src_buf.address, src_buf.length,
&src_buf.segment);
// Alternatively, users can pass a LCI_SEGMENT_ALL and the LCI runtime will
// register and deregister them for users.
// src_buf.segment = LCI_SEGMENT_ALL;
// Alternatively, users can directly use LCI_lbuffer_alloc to allocate and
// register the buffers.
// LCI_lbuffer_alloc(device, msgs_size, &src_buf);
int peer_rank;
if (LCI_NUM_PROCESSES == 1) {
// We will do loopback messages
peer_rank = 0;
} else if (LCI_NUM_PROCESSES == 2) {
// We will do a simple ping-pong here between rank 0 and rank 1.
// The destination rank to send and recv messages.
peer_rank = 1 - LCI_RANK;
} else {
fprintf(stderr, "Unexpected process number!");
}
// The tag of the messages. Since we are using the one-sided put, no tag
// matching will happen. This tag will just be passed to the receive side as
// a value.
LCI_tag_t tag = 99 + LCI_RANK;
LCI_tag_t peer_tag = 99 + peer_rank;
// User-defined contexts.
void* user_context = (void*)9527;
if (LCI_RANK == 0) {
for (int i = 0; i < num_msgs; i++) {
// Send a long message using LCI_putla
// A LCI send function can return LCI_ERR_RETRY, so we use a while loop
// here to make sure the message is sent
while (LCI_putla(ep, src_buf, cq, peer_rank, tag, LCI_DEFAULT_COMP_REMOTE,
user_context) == LCI_ERR_RETRY)
// Users have to call LCI_progress frequently to make progress on the
// background work.
LCI_progress(device);
LCI_request_t request;
// Try to pop a entry from the completion queue.
while (LCI_queue_pop(cq, &request) == LCI_ERR_RETRY)
// Users have to call LCI_progress frequently to make progress on the
// background work.
LCI_progress(device);
// Once an entry is popped, the completion information will be
// written to the request object.
assert(request.flag == LCI_OK);
assert(request.rank == peer_rank);
assert(request.tag == tag);
assert(request.type == LCI_LONG);
assert(request.user_context == user_context);
assert(request.data.lbuffer.address == src_buf.address);
assert(request.data.lbuffer.length == src_buf.length);
assert(request.data.lbuffer.segment == src_buf.segment);
// Wait for a remote put from the peer.
// We do not need to post a recv here. The data will directly be delivered
// throught the default completion object.
// Try to pop a entry from the completion queue.
while (LCI_queue_pop(cq, &request) == LCI_ERR_RETRY)
// Users have to call LCI_progress frequently to make progress on the
// background work.
LCI_progress(device);
// Once an entry is popped, the completion information will be
// written to the request object.
assert(request.flag == LCI_OK);
assert(request.rank == peer_rank);
assert(request.tag == peer_tag);
assert(request.type == LCI_LONG);
assert(request.data.lbuffer.length == msgs_size);
for (int j = 0; j < msgs_size; ++j) {
assert(((char*)request.data.lbuffer.address)[j] == 'a' + peer_rank);
}
// Since we are using the "a" (allocate) version of LCI_put, the receive
// buffer is allocated by the LCI runtime. We need to return it to the
// runtime after we processing the data.
LCI_lbuffer_free(request.data.lbuffer);
}
} else {
for (int i = 0; i < num_msgs; i++) {
LCI_request_t request;
while (LCI_queue_pop(cq, &request) == LCI_ERR_RETRY) LCI_progress(device);
assert(request.flag == LCI_OK);
assert(request.rank == peer_rank);
assert(request.tag == peer_tag);
assert(request.type == LCI_LONG);
assert(request.data.lbuffer.length == msgs_size);
for (int j = 0; j < msgs_size; ++j) {
assert(((char*)request.data.lbuffer.address)[j] == 'a' + peer_rank);
}
LCI_lbuffer_free(request.data.lbuffer);
while (LCI_putla(ep, src_buf, cq, peer_rank, tag, LCI_DEFAULT_COMP_REMOTE,
user_context) == LCI_ERR_RETRY)
LCI_progress(device);
while (LCI_queue_pop(cq, &request) == LCI_ERR_RETRY) LCI_progress(device);
assert(request.flag == LCI_OK);
assert(request.rank == peer_rank);
assert(request.tag == tag);
assert(request.type == LCI_LONG);
assert(request.user_context == user_context);
assert(request.data.lbuffer.address == src_buf.address);
assert(request.data.lbuffer.length == src_buf.length);
assert(request.data.lbuffer.segment == src_buf.segment);
}
}
// Free all the resources
// If you are using LCI_lbuffer_alloc
// LCI_lbuffer_free(src_buf);
// If you are using LCI_SEGMENT_ALL
// assert(src_buf.segment == LCI_SEGMENT_ALL);
LCI_memory_deregister(&src_buf.segment);
free(src_buf.address);
LCI_endpoint_free(&ep);
LCI_device_free(&device);
// Call `LCI_finalize` to finalize the runtime
LCI_finalize();
return 0;
}