鍍金池/ 問答/Java  C++/ 使用epoll來做為兩個client socket的中轉(zhuǎn)站

使用epoll來做為兩個client socket的中轉(zhuǎn)站

問題描述

我建立一個socket server, 當(dāng)有其他client連接時,如果一開始輸入key123, 那么就認(rèn)為它是主client, 監(jiān)視它的in和out, 如果連接后輸入的不是key123, 那么就是從client, 從client可以有多個

從client輸入的字符都會被server轉(zhuǎn)發(fā)到主client, 主client輸入的字符會被群發(fā)到所有從client

如果telnet 127.0.0.1 2323這樣的手工方法,沒有問題,

但如果我用C寫個程序來模擬手工輸入,那么主client很快后讀失敗,而且server的狀態(tài)不如預(yù)期

問題出現(xiàn)的環(huán)境背景及自己嘗試過哪些方法

我嘗試過用nonblocking,也不行

相關(guān)代碼

// 請把代碼文本粘貼到下方(請勿用圖片代替代碼)

socket server



static void handle_accept(int epollfd, int listenfd, cli_t * pclient)
{
    int clifd;
    struct sockaddr_in cliaddr;
    pclient->cliaddr = &cliaddr;

    socklen_t  cliaddrlen = sizeof(*(pclient->cliaddr));
    clifd = accept(listenfd,(sockaddr*)(pclient->cliaddr),&cliaddrlen);
    if (clifd == -1)
    {
        if ( (errno == EAGAIN) || (errno == EWOULDBLOCK) ) {
            return;
        } else {
            perror("accpet error:");
        }
    } else {
        report_peer_connected((pclient->cliaddr), cliaddrlen);
        set_socket_non_blocking(clifd);
        add_event(epollfd,clifd,EPOLLIN|EPOLLOUT);
        // add one client read event
        int nread, nwrite;
#if 0
        const char * req = "please input the key:\n";
        printf("size: %d\n", sizeof(req));
        while ((nwrite = write(clifd,req,strlen(req)))<0)
        {
            if(nwrite == -1){
                if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
                    //sleep(1);
                    continue;
                } else {
                    perror("read error");
                    exit(1);
                }
            }
        }
#endif
        int times=0;
//        sleep(3);
        memset(ibuf, 0, sizeof(buf_t));

        //while (((nread = read(clifd,ibuf->buf,MAXSIZE))<0) && (times < 100))
        while (((nread = read(clifd,ibuf->buf,MAXSIZE))<0))
        {
            if(nread == -1){
                if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
//                    usleep(100000);
                    times++;
                    continue;
                } else {
                    perror("read error");
                    exit(1);
                }
            }
        }
        if (nread == 0) {
            fprintf(stderr, "nread: the client closed\n");
        } else {
            printf("nread: %d, ibuf->buf: %s\n", nread, ibuf->buf);
            if((strncmp(ibuf->buf , "key123", 6) == 0))
            {
#ifdef DEBUG
                printf("get HW connection\n");
#endif
                pclient->hwfd = clifd;
                pclient->hwcon = 1;
            } else {
#ifdef DEBUG
                printf("get SW connection\n");
#endif
                pclient->swfd.push_back(clifd);
                (pclient->swconnum)++;
                delete_event(epollfd,clifd,EPOLLIN);
            }
        }
    }
}

// HW side reads message from the client
static void do_read(int epollfd, int fd, cli_t * pclient)
{
    if((fd == pclient->hwfd)) //if it's hw side
    {
        memset(ibuf, 0, sizeof(buf_t));
        int nread;
        nread = read(fd,ibuf->buf,MAXSIZE);
        if (nread == -1)
        {
            perror("read error:");
            close(fd);
            delete_event(epollfd,fd,EPOLLIN);
        }
        else if (nread == 0)
        {
            fprintf(stderr,"client %s:%d close.\n", inet_ntoa(pclient->cliaddr->sin_addr), ntohs(pclient->cliaddr->sin_port));
            close(fd);
            delete_event(epollfd,fd,EPOLLIN);
        }
        else
        {
            // get message from HW side
            {
#ifdef DEBUG
                printf("read message from hw is : %s\n",ibuf->buf);
#endif
            }
            int nwrite;
            for(int i=0; i<(pclient->swconnum);i++)
            {
                nwrite = write((pclient->swfd[i]),ibuf->buf,nread);
            }
            //modify_event(epollfd,fd,EPOLLIN|EPOLLOUT);
        }
    }     
}

// HW side writes message to the client
static void do_write(int epollfd,int fd, cli_t * pclient)
{
    if((fd == pclient->hwfd)) //if it's hw side
    {
        int nwrite;
        int nread;
        char towrite[MAXSIZE];
        for(int i=0;i<((pclient->swfd).size());i++)
        {
            set_socket_non_blocking((pclient->swfd[i]));
            nread=read((pclient->swfd[i]), obuf->buf, MAXSIZE);
            if((nread == -1) && ((errno == EAGAIN || errno == EWOULDBLOCK)))
            {
            } else {
                if(nread == -1)
                {
                    perror("read error:");
                    pclient->swfd.erase((pclient->swfd).begin()+i);
                    pclient->swconnum--;
                } else {
                    if(nread == 0)
                    {
                        fprintf(stderr, "client closed.\n");
                        close(pclient->swfd[i]);
                        pclient->swfd.erase((pclient->swfd).begin()+i);
                        pclient->swconnum--;
                    } else {

#ifdef DEBUG
                        printf("write message to hw is : %s\n", obuf->buf);
#endif
                        nwrite=write(fd, obuf->buf, nread);
                    }
                }
            }
        }
        //modify_event(epollfd,fd,EPOLLIN);
            
    }
}

static void handle_events(int epollfd, int listenfd, struct epoll_event * events, int num, cli_t* pclient, TRANSTATE * ts )
{
    int i;
    int fd;
    struct sockaddr_in cliaddr;
    //*ts = INIT;
    // wark over the event ready
    for (i = 0;i < num;i++)
    {
        fd = events[i].data.fd;
        // deal as to different event
        if ((fd == listenfd) &&(events[i].events & EPOLLIN))
        {
            handle_accept(epollfd, listenfd, pclient);
        } else {
            if (events[i].events & EPOLLIN)
            {
                do_read(epollfd, fd, pclient);
            } else {
                if (events[i].events & EPOLLOUT)
                {
                    do_write(epollfd,fd,pclient);
                }
            }
        }
    }
}

static void do_epoll(int listenfd)
{
    //                int epollfd;
    struct epoll_event events[EPOLLEVENTS];
    cli_t * pclient = (cli_t*)malloc(sizeof(cli_t));

    set_socket_non_blocking(listenfd);

    int ret;
    // create one epoll fd
    int epollfd = epoll_create(FDSIZE);
    TRANSTATE ts=INIT;
    // add one listen fd into epoll pool
    add_event(epollfd,listenfd,EPOLLIN);
    for ( ; ; )
    {
        // get the event ready
        ret = epoll_wait(epollfd,events,EPOLLEVENTS,EPOLL_TIMEOUT);
        if(ret <0) {
            perror("epoll_wait failed");
            exit(1);
        } else if (ret == 0) {
            fprintf(stderr, "no socket ready for read within %d secs\n", EPOLL_TIMEOUT/1000);
        } else {
            handle_events(epollfd,listenfd,events,ret,pclient,&ts);
        }
    }
    close(epollfd);
}

master client



    char message[BUF_SIZE];

    int nread, nwrite;
    int times = 0;

    const char * key="key123\n";
    printf("key len=%d\n", strlen(key));

#if 1
    while ((nwrite = write(sock,key,strlen(key)+1))<0)
    {
        if(nwrite == -1){
            if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
                //sleep(1);
                printf("nwrite: try again!\n");
                continue;
            } else {
                perror("read error");
                exit(1);
            }
        }
    }
#endif
//    nwrite = write(sock,key,strlen(key)+1);
    if (nwrite == 0) {
        fprintf(stderr, "nwrite: the client closed\n");
//        exit(1);
    } else {
            printf("HW connected\n");
    }
    
    //keep communicating with server
    int count = 0;
    while(1)
    {
        sprintf(message, "%d\n", count++);
        
        //Send count
        printf("[Send]: %s", message);
        if( (nwrite=write(sock, message, strlen(message)+1)) < 0)
        {
            perror("write failed");
            exit(1);
        } else {
            if( nwrite == 0)
            {
                printf("nwwrite: no message written to svr\n");
            } else {
                printf("writen done\n");
            }
        }
#if 1
        //Receive a reply from the server
        while( (nread =read(sock, message, BUF_SIZE)) <= 0)
        {
            if(nread <0)
            {
                perror("read failed");
                exit(1);
            } else {
                printf("nread: 0\n");
                continue;
            }
        }

        
        printf("[Recv]: %s\n", message);
#endif
        usleep(CLIENT_INTERVAL);
    }
    
    close(sock);
    exit(0);
}

先運(yùn)行./go

然后在另一個窗口運(yùn)行./to

你期待的結(jié)果是什么?實(shí)際看到的錯誤信息又是什么?

我期待可以看到./to一直運(yùn)行, 然后它發(fā)出去的0, 1, 2, 3... 會被socket server接收到

一旦有其他client (telnet 127.0.0.1 2323, 然后隨便輸入字符回車), 那么主client的消息會被發(fā)到從client, 從client的消息會被發(fā)到主client

實(shí)際上./to在read的時候就會出錯

read failed: Resource temporarily unavailable

而在socket server側(cè)沒有收到主client發(fā)過來的數(shù)字“0”, 而是一個空字符(0)

read message from hw is :

回答
編輯回答
舊螢火

read failed的原因是因?yàn)楫惒揭?,返回了EAGAIN, 需要對它做特殊處理

read message的原因還沒找到

2017年7月12日 15:51