javascript是single thread,所以如果要可以讀訊息,又要同時送訊息就要使用worker
Inter-Process Communication(IPC)代表行程之間的通訊 ,下面使用多種同樣功能(chat.c,我寫入 = 你讀取)的程式碼來說明通訊的運作方式
下面的fifo、mmap、msgQ都是在同一台電腦上面才能進行通訊,使用udp、tcp才可以對不同電腦進行通訊
linux一切皆檔案,檔案有分成三種,管道(fifo)、硬體(device)、文件(file)
fifo是具名管道,可以用來傳輸,這邊使用兩個管道,獨立紀錄兩個視窗
使用兩個行程,分成兩個行程,因為read和fgets都會讓行程卡住,都需要有訊息才可以走下去。
gcc ./chat.c -o chat
chat.c: 必須開兩個視窗,一個是
./chat 1
,一個是./chat 0
,可以讓兩個程式碼互相通訊
#include <stdio.h>
#include <string.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#define SMAX 80
int main(int argc, char *argv[]) {
int fd;
char *fifo0 = "/tmp/user0";
char *fifo1 = "/tmp/user1";
mkfifo(fifo0, 0666);
mkfifo(fifo1, 0666);
char *me, *you;
if (strcmp(argv[1], "0")) { // me:0 => you:1 // ./chat 0
me = fifo0;
you = fifo1;
} else { // me:1 => you:0 // ./chat 1
me = fifo1;
you = fifo0;
}
char msg[SMAX];
// 分成兩個行程,因為read和fgets都會讓行程卡住,需要有訊息才可以走下去。
// 如果想要一個行程,就要使用輪巡的方式,不過這樣可讀性會下降,所以還是不要用比較好
if (fork() == 0) { // 上面的if是讀取對方訊息,下面是發送訊息給對方
// child: receive message and print (一直讀對方的訊息,讀到就印出來)
fd = open(you, O_RDONLY); // 開啟對方管道
while (1) {
int n = read(fd, msg, sizeof(msg)); // 讀取對方發來的訊息
if (n <= 0) break; // 如果管道已經關閉,就離開
printf("receive: %s", msg); // 印出訊息
}
close(fd); // 關閉管道
} else {
// parent: readline and send (一直讀鍵盤,然後把訊息送給對方)
fd = open(me, O_WRONLY); // 開啟我方管道
while (1) {
fgets(msg, SMAX, stdin); // 讀一行輸入 // 讀取標準輸入
int n = write(fd, msg, strlen(msg)+1); // 將該行輸入訊息送上我方管道
if (n<=0) break;
}
close(fd);
}
return 0;
}
使用共用記憶體的方式完成chat.c
mmap在linux很常用,不只在通訊上用到,也會在作業系統裡面用到
chat.c: 這個跟上面的行為模式都一樣,不過這個使用以檔案為共用記憶體的mmap
#include <stdio.h>
#include <string.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>
#define SMAX 80 // 每一行的最大字數
int main(int argc, char *argv[]) {
int id = argv[1][0]-'0';
int fd = open("chat.dat", O_RDWR | O_CREAT);
// 使用共用記憶體(fd文件),分配兩個區段,可對記憶體做讀寫
char *buf = mmap(NULL, 2*SMAX, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
char *myMsg, *yourMsg;
if (id == 0) { // 如果是 ./chat 0,第一段記憶體設為自己,後面一段設為對方
myMsg = buf;
yourMsg = buf + SMAX;
} else { // 如果是 ./chat 1,第一段記憶體設為對方,後面一段設為自己
myMsg = buf + SMAX;
yourMsg = buf;
}
if (fork() == 0) {
// child: receive message and print
while (1) {
if (yourMsg[0] != '\0') {
printf("receive: %s", yourMsg);
yourMsg[0] = '\0'; // 收到文件,把文件清空
}
}
} else {
// parent: readline and put into myMsg in buf
while (1) {
fgets(myMsg, SMAX, stdin);
}
}
munmap(buf, 2*SMAX); // 退出清空map
close(fd);
return 0;
}
把要傳送的訊息寫成結構,使用msgQ做傳送
chat.c
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/stat.h>
#include <unistd.h>
#define SMAX 80
struct msg_t {
long mtype; // type 指定要哪一種消息, 0:取第一個, >0:消息類型為 type 的第一個, <0:消息類型 <=abs(type) 的第一個。
char mtext[SMAX];
};
int main(int argc, char *argv[]) {
int id = argv[1][0]-'0';
int q0 = msgget((key_t) 1235, 0666|IPC_CREAT);
int q1 = msgget((key_t) 1236, 0666|IPC_CREAT);
int myQ, yourQ;
if (id == 0) {
myQ = q0;
yourQ = q1;
} else {
myQ = q1;
yourQ = q0;
}
struct msg_t msg = {.mtype=1};
// char msg[SMAX];
if (fork() == 0) {
// child: receive message and print
while (1) {
msgrcv(yourQ, &msg, SMAX, 0, 0);
printf("receive: %s", msg.mtext);
}
} else {
// parent: readline and put into myMsg in buf
while (1) {
fgets(msg.mtext, SMAX, stdin);
msgsnd(myQ, &msg, SMAX, 0);
}
}
return 0;
}
這個和tcp都是使用socket,可以進行遠端通訊,上禮拜有提過
udp是非連接導向,所以不用確保順序,直接做訊息收發
chat.c: 這邊只有使用一個fork(),所以只能一對一,如果要對多人,就要用多次fork()
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
#define SMAX 80
int main(int argc, char *argv[]) {
int sfd = socket(AF_INET, SOCK_DGRAM, 0);
struct sockaddr_in saddr, raddr;
memset(&saddr, 0, sizeof(saddr));
memset(&raddr, 0, sizeof(raddr));
saddr.sin_family = AF_INET;
saddr.sin_port = htons(8888);
char msg[SMAX];
if (argc==1) { // 沒輸入參數就是 server // ./chat
printf("I am server...\n");
saddr.sin_addr.s_addr = INADDR_ANY;
bind(sfd, (struct sockaddr*) &saddr, sizeof(struct sockaddr));
socklen_t rAddrLen = sizeof(struct sockaddr);
int rlen = recvfrom(sfd, msg, SMAX, 0, (struct sockaddr*) &raddr, &rAddrLen);
printf("receive: %s from client addr %s\n", msg, inet_ntoa(raddr.sin_addr));
} else { // 有輸入參數(server IP)就是 client // ./chat 127.0.0.1
printf("I am client...\n");
saddr.sin_addr.s_addr = inet_addr(argv[1]);
memcpy(&raddr, &saddr, sizeof(saddr));
char *connMsg = "<connect request>";
sendto(sfd, connMsg, strlen(connMsg)+1, 0, (struct sockaddr*) &saddr, sizeof(struct sockaddr));
}
if (fork() == 0) {
// child: receive message and print
while (1) {
socklen_t rAddrLen = sizeof(struct sockaddr);
recvfrom(sfd, msg, SMAX, 0, (struct sockaddr*) &raddr, &rAddrLen);
printf("receive: %s", msg);
}
} else {
// parent: readline and send msg
while (1) {
fgets(msg, SMAX, stdin);
sendto(sfd, msg, strlen(msg)+1, 0, (struct sockaddr*) &raddr, sizeof(struct sockaddr));
}
}
close(sfd);
return 0;
}
tcp是連接導向,所以訊息收發之前client需要connect,server需要listen
chat.c
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
#define SMAX 80
int main(int argc, char *argv[]) {
int sfd = socket(AF_INET, SOCK_STREAM, 0);
int cfd, fd;
struct sockaddr_in saddr, raddr;
memset(&saddr, 0, sizeof(saddr));
memset(&raddr, 0, sizeof(raddr));
saddr.sin_family = AF_INET;
saddr.sin_port = htons(8888);
char msg[SMAX];
if (argc==1) { // server
printf("I am server...\n");
saddr.sin_addr.s_addr = INADDR_ANY;
bind(sfd, (struct sockaddr*) &saddr, sizeof(struct sockaddr));
// 設定可以被一人連線
listen(sfd, 1);
socklen_t rAddrLen = sizeof(struct sockaddr);
cfd = accept(sfd, (struct sockaddr*) &raddr, &rAddrLen);
printf("accept: cfd=%d client addr %s\n", cfd, inet_ntoa(raddr.sin_addr));
fd = cfd;
} else { // client
printf("I am client...\n");
saddr.sin_addr.s_addr = inet_addr(argv[1]);
// 確保連線
connect(sfd, (struct sockaddr*) &saddr, sizeof(struct sockaddr));
fd = sfd;
printf("connect success: sfd=%d server addr=%s\n", sfd, inet_ntoa(saddr.sin_addr));
}
if (fork() == 0) {
// child: receive message and print
while (1) {
int n = recv(fd, msg, SMAX, 0);
if (n <=0) break;
printf("receive: %s", msg);
}
} else {
// parent: readline and send msg
while (1) {
fgets(msg, SMAX, stdin);
send(fd, msg, strlen(msg)+1, 0);
}
}
close(sfd);
return 0;
}
在tcp裡面使用thread,要在多人連線時效能才會明顯和fork有差
chat.c: 使用thread取代fork,main和receiver會同時執行
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
#include <pthread.h>
#define SMAX 80
int fd;
void *receiver(void *argu) {
char msg[SMAX];
while (1) {
int n = recv(fd, msg, SMAX, 0);
if (n <=0) break;
printf("receive: %s", msg);
}
return NULL;
}
int main(int argc, char *argv[]) {
int sfd = socket(AF_INET, SOCK_STREAM, 0);
int cfd;
struct sockaddr_in saddr, raddr;
memset(&saddr, 0, sizeof(saddr));
memset(&raddr, 0, sizeof(raddr));
saddr.sin_family = AF_INET;
saddr.sin_port = htons(8888);
char msg[SMAX];
if (argc==1) { // server
printf("I am server...\n");
saddr.sin_addr.s_addr = INADDR_ANY;
bind(sfd, (struct sockaddr*) &saddr, sizeof(struct sockaddr));
listen(sfd, 1);
socklen_t rAddrLen = sizeof(struct sockaddr);
cfd = accept(sfd, (struct sockaddr*) &raddr, &rAddrLen);
printf("accept: cfd=%d client addr %s\n", cfd, inet_ntoa(raddr.sin_addr));
fd = cfd;
} else { // client
printf("I am client...\n");
saddr.sin_addr.s_addr = inet_addr(argv[1]);
connect(sfd, (struct sockaddr*) &saddr, sizeof(struct sockaddr));
fd = sfd;
printf("connect success: sfd=%d server addr=%s\n", sfd, inet_ntoa(saddr.sin_addr));
}
pthread_t thread1; // 宣告執行緒
pthread_create(&thread1, NULL, &receiver, NULL);
// readline and send msg
while (1) {
fgets(msg, SMAX, stdin);
send(fd, msg, strlen(msg)+1, 0);
}
close(sfd);
return 0;
}
複習一下socket的server/client的流程:
- server:
socket -> bind -> listen -> accept -> send/recv -> close
- client:
socket -> connet -> send/recv -> close
server.c: 這個的功能是可以廣播client打的內容給所有client。
只有server用到poll(因為要連接到多個client),client使用普通的socket就好了
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
#include <pthread.h>
#include <assert.h>
#include <poll.h>
#define SMAX 80
#define CMAX 10 // 10 個client
struct pollfd clients[CMAX], pfds[CMAX];
void *receiver(void *argu) {
char msg[SMAX];
while (1) {
memcpy(pfds, clients, sizeof(clients));
int timeout = 100;
int ready = poll(pfds, CMAX, timeout); // 創建poll,把東西丟到pfds做處理
for (int ci = 0; ci < CMAX; ci++) {
if (pfds[ci].revents != 0) { // 相較於 epoll,這裡還是要一個一個測。
if (pfds[ci].revents & POLLIN) { // 接收到訊息的事件
ssize_t s = read(pfds[ci].fd, msg, sizeof(msg)); // rev
printf("%s", msg);
for (int i=0; i<CMAX; i++) { // 廣播給其他人
if (i != ci && clients[i].fd != 0) { // 如果對方不是發訊息者,而且不是空的,那就轉送給他!
write(clients[i].fd, msg, strlen(msg)+1); // send
}
}
}
}
}
}
return NULL;
}
void connectHandler(int sfd) {
struct sockaddr_in raddr;
socklen_t rAddrLen = sizeof(struct sockaddr);
int cfd = accept(sfd, (struct sockaddr*) &raddr, &rAddrLen); // 有客戶端連進來了,其串流為 cfd ...
// 開始監控,receiver開始可以運行
for (int i=0; i<CMAX; i++) { // 對每一個客戶端進行初始化
if (clients[i].fd == 0) { // 找到一個空的客戶端
memset(&clients[i], 0, sizeof(clients[i]));
clients[i].events = POLLIN; // 監控其輸入
clients[i].fd = cfd; // 監控對象為 cfd
break;
}
}
}
int main(int argc, char *argv[]) {
int port = atoi(argv[1]);
printf("port=%d\n", port);
int sfd = socket(AF_INET, SOCK_STREAM, 0); // 創建socket對象
struct sockaddr_in saddr, raddr;
memset(&saddr, 0, sizeof(saddr));
saddr.sin_family = AF_INET;
saddr.sin_port = htons(port); // htons(8888);
char msg[SMAX];
saddr.sin_addr.s_addr = INADDR_ANY;
// bind
int rb = bind(sfd, (struct sockaddr*) &saddr, sizeof(struct sockaddr));
assert(rb >= 0);
// listen
int rl = listen(sfd, CMAX);
assert(rl >= 0);
memset(clients, 0, sizeof(clients)); // 挪出空間給clients
// send/recv
pthread_t thread1;
pthread_create(&thread1, NULL, receiver, NULL); // 創建一個thread,處理訊息收發
while (1) {
connectHandler(sfd); // 連線(accept client)
}
close(sfd);
return 0;
}
client
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
#include <pthread.h>
#include <assert.h>
#define SMAX 80
void *receiver(void *argu) {
int sfd = *(int*)argu;
char msg[SMAX];
while (1) {
int n = recv(sfd, msg, SMAX, 0);
if (n <=0) break;
printf("%s", msg);
}
return NULL;
}
int main(int argc, char *argv[]) {
char *name = argv[1];
char *ip = argv[2];
int port = atoi(argv[3]);
int sfd = socket(AF_INET, SOCK_STREAM, 0); // 創建socket對象
struct sockaddr_in saddr, raddr;
memset(&saddr, 0, sizeof(saddr));
memset(&raddr, 0, sizeof(raddr));
saddr.sin_family = AF_INET;
saddr.sin_port = htons(port);
saddr.sin_addr.s_addr = inet_addr(ip);
// 建立連接
int rc = connect(sfd, (struct sockaddr*) &saddr, sizeof(struct sockaddr));
assert(rc >= 0);
// send/recv
pthread_t thread1;
pthread_create(&thread1, NULL, receiver, &sfd); // 創建一個thread,處理訊息接收
while (1) {
char msg[SMAX], fmsg[SMAX];
fgets(msg, SMAX, stdin); // 讀取stdin
sprintf(fmsg, "%s: %s", name, msg); // 把stdin內容丟到 fmsg
send(sfd, fmsg, strlen(fmsg)+1, 0); // 傳送訊息
}
close(sfd); // close
return 0;
}
event poll(事件輪詢)這個方法可以提升通訊的效能
使用epoll可以對事件做處理,跟傳統的poll(每一個事件都要輪詢,等於多了一個for迴圈)相比,epoll具有event count,可以對多個event進行整合
epoll1.c: 可以接收輸入並輸出到stdout,並顯示目前有幾個事件(有輸入就1個,沒輸入就0)。
#define MAX_EVENTS 5
#define READ_SIZE 10
#include <stdio.h> // for fprintf()
#include <unistd.h> // for close(), read()
#include <sys/epoll.h> // for epoll_create1(), epoll_ctl(), struct epoll_event
#include <string.h> // for strncmp
int main()
{
int running = 1, event_count, i;
size_t bytes_read;
char read_buffer[READ_SIZE + 1];
struct epoll_event event, events[MAX_EVENTS];
int epoll_fd = epoll_create1(0); // 創建epoll,傳回file handler
if(epoll_fd == -1) // 創建失敗
{
fprintf(stderr, "Failed to create epoll file descriptor\n");
return 1;
}
event.events = EPOLLIN;
event.data.fd = 0;
// 成功就加入event進去,這邊是輸入事件,如果在網路,這邊可能會要接收很多事件
if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, 0, &event))
{
fprintf(stderr, "Failed to add file descriptor to epoll\n");
close(epoll_fd);
return 1;
}
while(running)
{
printf("\nPolling for input...\n");
// 取得有多少事件發生 epoll_wait(handler, events, 事件數量, 最長等待時間(ms));
event_count = epoll_wait(epoll_fd, events, MAX_E VENTS, 30000);
printf("%d ready events\n", event_count);
// 針對每一個事件去讀取
for(i = 0; i < event_count; i++)
{
printf("Reading file descriptor '%d' -- ", events[i].data.fd);
// 每一個event都有他的id,這邊讀取事件(接收輸入)
bytes_read = read(events[i].data.fd, read_buffer, READ_SIZE);
printf("%zd bytes read.\n", bytes_read);
read_buffer[bytes_read] = '\0';
printf("Read '%s'\n", read_buffer);
if(!strncmp(read_buffer, "stop\n", 5))
running = 0;
}
}
if(close(epoll_fd))
{
fprintf(stderr, "Failed to close epoll file descriptor\n");
return 1;
}
return 0;
}
server.c: 這邊直接把廣播拿掉,不過就不用宣告client的結構和多個for迴圈
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
#include <pthread.h>
#include <assert.h>
#include <poll.h>
#include <sys/epoll.h>
#define SMAX 80
#define CMAX 10 // 10 個client
int epollfd;
void *receiver(void *argu) {
char msg[SMAX];
while (1) {
int timeout = 100;
struct epoll_event events[CMAX];
int event_count = epoll_wait(epollfd, events, CMAX, timeout); // 等待事件發生,event_count 是事件數
for(int i = 0; i < event_count; i++) { // 這裡比 poll 好,因為只要測有事件發生的串流,而不需要全測!
int n = read(events[i].data.fd, msg, SMAX);
msg[n] = '\0';
printf("%s", msg);
}
}
return NULL;
}
void connectHandler(int sfd) {
struct sockaddr_in raddr;
socklen_t rAddrLen = sizeof(struct sockaddr);
int cfd = accept(sfd, (struct sockaddr*) &raddr, &rAddrLen);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = cfd;
epoll_ctl(epollfd, EPOLL_CTL_ADD, cfd, &ev); // 開始監控,receiver開始可以運行
}
int main(int argc, char *argv[]) {
int port = atoi(argv[1]);
printf("port=%d\n", port);
int sfd = socket(AF_INET, SOCK_STREAM, 0); // 創建socket對像
struct sockaddr_in saddr, raddr;
memset(&saddr, 0, sizeof(saddr));
saddr.sin_family = AF_INET;
saddr.sin_port = htons(port);
char msg[SMAX];
saddr.sin_addr.s_addr = INADDR_ANY;
// bind
int rb = bind(sfd, (struct sockaddr*) &saddr, sizeof(struct sockaddr));
assert(rb >= 0);
// listen
int rl = listen(sfd, CMAX);
assert(rl >= 0);
memset(clients, 0, sizeof(clients));
epollfd = epoll_create1(0); // 創建一個結構,宣告使用epoll
// send/recv
pthread_t thread1;
pthread_create(&thread1, NULL, receiver, NULL); // 創建一個thread,處理訊息收發
while (1) {
connectHandler(sfd); // 連線(accept client)
}
close(sfd);
return 0;
}
這個server的client和poll client長的一樣,都是使用socket做傳輸
2:15:55clientserver、2:47:22測試完成
#include <string.h>
strcmp(str1, str2)
: 比較str1和str2的大小
- declaration:
int strcmp(const char *str1, const char *str2)
- return:
- 傳回值 < 0:
str1 < str2
- 傳回值 > 0:
str1 > str2
- 傳回 == 0:
str1 == str2
- 傳回值 < 0:
#include <stdio.h>
fgets(str, n, file)
: 讀取 file 裡面的 n 個字元,存放到str(通常是陣列)裡面
-
declaration:
char * fgets ( char * str , int n , FILE * stream )
-
return:
- 如果讀取成功,返回值等於str (文件內容)
- 如果讀取字元為到n個就讀到文件結束,就回傳空指標,並把內容存放到到str
- 如果發生錯誤,返回空指標
#include <stdlib.h>
fopen(str, n, file)
: 讀取 file 裡面的 n 個字元,存放到str(通常是陣列)裡面
-
declaration:
char * fgets ( char * str , int n , FILE * stream )
-
return:
- 如果讀取成功,返回值等於str (文件內容)
- 如果讀取字元為到n個就讀到文件結束,就回傳空指標,並把內容存放到到str
- 如果發生錯誤,返回空指標
#include <sys/mman.h>
mmap(addr, size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
: 設定在指定address設定映射文件或是設備,如果add設為NULL,就會把文件映射在隨機區域。使用munmap()
可以取消映射
- declaration:
void *mmap(void *start, size_t length, int prot, int flags, int fd, off_t offset);
- return:
- 如果mmap執行成功,返回區域指標,
munmap()
回傳0 - 如果發生錯誤,傳回MAP_FALED,代表 (void*) -1,
munmap()
回傳-1
- 如果mmap執行成功,返回區域指標,
#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h>
msgget((key_t) 1235, 0666|IPC_CREAT);
: 得到queue識別符(一個非負整數,像是id的東西)
- declaration:
int msgget(key_t key, int msgflg);
- return:
- 如果執行成功,返回queue的識別符(一個非負整數)
- 如果發生錯誤,返回 -1
msgsnd(myQ, &msg, SMAX, 0);
: 送消息(message queue)
- declaration:
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
- return:
- 如果執行成功,返回 0
- 如果發生錯誤,返回 -1,errno 可以看到詳細錯誤
msgrcv(yourQ, &msg, size, 0, 0);
: 收消息(youQ是queue識別符),存放到msg
- declaration:
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
- return:
- 如果執行成功,返回收到(存放在msg struct裡面)的位元數
- 如果發生錯誤,返回 -1,errno 可以看到詳細錯誤