鍍金池/ 問答/HTML5  C  C++  Linux/ 求助,多線程寫入文件錯(cuò)亂問題!!~~

求助,多線程寫入文件錯(cuò)亂問題??!~~

多線程復(fù)制文件,同一文件下,多線程并發(fā)寫入同一文件的不同部分。
思路是,提前為每個(gè)線程分配好寫入內(nèi)容大小,每個(gè)線程執(zhí)行fopen獲取單獨(dú)的文件描述符,然后按分配的寫入大小,fseek到不同的位置,并發(fā)寫入內(nèi)容。
但是寫入的內(nèi)容總是錯(cuò)亂。

#include <stdio.h>
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>

#define BUFF_SIZE 512
#define PTHREAD_NUMBER 4

typedef struct copy_block
{
char fin[BUFF_SIZE];
char fout[BUFF_SIZE];
long start;        //起始位置
long segment_size; //分段大小
int id;        //虛擬線程id
} __attribute__((packed)) page;

long file_size(char *filename)
{
struct stat fstat;
memset(&fstat, 0, sizeof(fstat));
stat(filename, &fstat);
return fstat.st_size;
}

//單線程任務(wù)邏輯
void pthread_copy(void *arg)
{
//轉(zhuǎn)換指針類型
page *p = (page *)arg;

//每個(gè)線程單獨(dú)打開文件
FILE *fin = fopen(p->fin, "r");
FILE *fout = fopen(p->fout, "wb+");

//移動(dòng)流到偏移位置
int res1 = fseek(fin, p->start, SEEK_SET);
int res2 = fseek(fout, p->start, SEEK_SET);

//開始復(fù)制
char buffer[BUFF_SIZE];        //讀寫區(qū)
long read_size = BUFF_SIZE; //預(yù)設(shè)讀寫大小
long left = p->segment_size; //剩余大小,初始化為任務(wù)總大小
long reade_len = 0;        //讀取文件大小
long total_len = 0;

//當(dāng)剩余大小大于0時(shí)保持復(fù)制
while (left > 0)
{
//如果文件剩余大小小于預(yù)設(shè)讀寫大小,則按剩余大小讀取
if (read_size > left)
{
read_size = left;
}
//讀取文件
reade_len = fread(buffer, 1, read_size, fin);
total_len += reade_len;
//寫入文件
if (reade_len > 0)
{
fwrite(buffer, 1, reade_len, fout);
}

//剩余大小減去已讀寫大小
left = left - reade_len;
}

//復(fù)制完成關(guān)閉文件
fclose(fin);
fclose(fout);
pthread_exit(NULL);
}

//開啟多線程任務(wù)
int multi_copy(char *src, char *dest)
{
//判斷文件是否存在,以及是否具有讀取權(quán)限
int file_exist = access(src, 4);
if (file_exist != 0)
fprintf(stderr, "源文件不存在");

//獲取文件大小
long fsize = file_size(src);

//真正運(yùn)行線程數(shù)量
int real_pthread_number = PTHREAD_NUMBER;
if (fsize < PTHREAD_NUMBER)
real_pthread_number = 1;

//給任務(wù)結(jié)構(gòu)體分配內(nèi)存
page *p;
p = malloc(sizeof(*p) * PTHREAD_NUMBER);

long offset = 0;        //文本偏移量
long segment = fsize / real_pthread_number;        //分段長(zhǎng)度
long segment_remainder = fsize % real_pthread_number; //分段后剩余長(zhǎng)度

//給每個(gè)線程分配任務(wù)
for (int i = 0; i < real_pthread_number; i++)
{
//分配復(fù)制任務(wù)的文件大小
if (i + 1 == real_pthread_number)
{
p[i].segment_size = segment + segment_remainder;
}
else
{
p[i].segment_size = segment;
}

//確定任務(wù)的起止位置
p[i].start = offset;
offset = offset + p[i].segment_size;

//文件路徑存入任務(wù)
strncpy(p[i].fin, src, strlen(src));
strncpy(p[i].fout, dest, strlen(dest));

//分配虛擬線程id
p[i].id = i;
}

//創(chuàng)建線程
pthread_t work[real_pthread_number];
for (int i = 0; i < real_pthread_number; i++)
{
pthread_create(&work[i], NULL, (void *)&pthread_copy, (void *)&p[i]);
}

//阻塞主線程
for (int i = 0; i < real_pthread_number; i++)
{
pthread_join(work[i], NULL);
}

//釋放任務(wù)結(jié)構(gòu)體占用內(nèi)存
if (p != NULL)
{
free(p);
p = NULL;
}

return 0;
}

int main(int argc, char *argv[])
{
char *src;
char *dest;

src = argv[1];
dest = argv[2];

multi_copy(src, dest);
}
回答
編輯回答
陌南塵

借樓主問題,多線程加鎖的寫和單線程的寫應(yīng)該沒啥區(qū)別吧,依然是同一時(shí)刻只有一個(gè)線程在執(zhí)行寫操作,而且還會(huì)有線程切換帶來的損耗

2018年6月5日 00:34
編輯回答
悶騷型

加上線程鎖,就不會(huì)出現(xiàn)這個(gè)情況了

2017年11月30日 18:39
編輯回答
笑忘初

按照我的想法,最好上鎖然后再操作

2017年8月31日 20:23
編輯回答
愛是癌

首先, 同一個(gè)文件, 寫是獨(dú)占的, 你想同時(shí)寫, 其實(shí)也是實(shí)現(xiàn)不了的. 如果你說不同線程去操作同一個(gè)文件指針, 那么悲劇就此發(fā)生, 亂是必然的. 原因就是線程1移動(dòng)到0位置寫, 但線程1說,我要移動(dòng)到300開始寫, 最終移動(dòng)到哪里寫去了, 就要看運(yùn)氣了.

比較好的辦法是使用文件映射. 這樣不同線程就能同時(shí)寫它的不同部分了.
或者分開寫, 所有線程完成工作后,再合并到一起.

2018年8月17日 09:52
編輯回答
不討囍
一、多線程版本
#include <stdio.h>
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>

#define BUFF_SIZE 512
#define PTHREAD_NUMBER 4

typedef struct copy_block{
    // char src[BUFF_SIZE];    //source file
    // char dest[BUFF_SIZE];    //destnation file
    FILE *src;                //source file
    FILE *dest;                //destnation file
    long start;                //begin position
    long segment_size;        //aim copy size
    int id;                    //virtual thread id
}__attribute__((packed)) Page;

long file_size(const char *filename);
void pthread_copy(void *task_info);
int multi_copy(const char *dest,const char *src);

int main(int argc,char *argv[])
{
    if(argc < 3)
    {
        fprintf(stderr,"Error params!\n");
        return 0;
    }
    char *src = argv[1];
    char *dest = argv[2];
    
    multi_copy(dest,src);
    
    return 0;
}

//get file size
long file_size(const char *filename)
{
    struct stat fstat;
    memset(&fstat,0,sizeof(fstat));
    stat(filename,&fstat);
    
    return fstat.st_size;
}

//thread task
void pthread_copy(void *task_info)
{
    //convert void * to Page *
    Page *p = (Page *)task_info;
    
    //open file for every thread
    // FILE *fin = fopen(p->src,"rb");
    // FILE *fout = fopen(p->dest,"ab+");
    
    //seek the stream to the offset
    int res_in = fseek(p->src,p->start,SEEK_SET);
    int res_out = fseek(p->dest,p->start,SEEK_SET);
    
    char buffer[BUFF_SIZE];            //buffer
    long read_size = BUFF_SIZE;        //the size for read
    long left  = p->segment_size;    //current left bytes
    long read_len = 0;                //the size has read
    long total_len = 0;                //the readed total size
    
    printf("Thread %d begin\t",p->id);
    printf("start %ld\t",p->start);
    printf("segment_size %ld\n",p->segment_size);
    // printf("src %s\t",p->src);
    // printf("dest %s\n",p->dest);
    
    //begin copy
    while(left > 0)
    {
        if(left < read_size)
        {
            read_size = left;
        }
        //read file
        read_len = fread(buffer,sizeof(char),read_size,p->src);
        total_len += read_len;
        //write file
        if(read_len > 0)
        {
            fwrite(buffer,sizeof(char),read_len,p->dest);
        }
        //the left size
        left -= read_len;
    }
    pthread_exit(NULL);
}
//create multi thread
int multi_copy(const char *dest,const char *src)
{
    //file is exists
    int file_exists = access(src,04);//read access
    if(file_exists != 0)
    {
        fprintf(stderr,"file is not exists!\n");
        return 1;
    }
    //file size
    long filesize = file_size(src);
    //pthread number
    int real_pthread_number = PTHREAD_NUMBER;
    if(filesize < BUFF_SIZE)
    {
        real_pthread_number = 1;
    }
    Page *p = (Page *)malloc(sizeof(*p) * real_pthread_number);
    
    long offset = 0;                                        //offset from file begin
    long segment = filesize / real_pthread_number;            //the bytes of every thread
    long segment_remain = filesize % real_pthread_number;    //the left bytes for the last thread
    
    //ctor the task_info
    //open file for every thread
    FILE *fin = fopen(src,"rb");
    FILE *fout = fopen(dest,"wb+");
    int i;
    for(i = 0;i < real_pthread_number;i++)
    {
        if((i + 1) == real_pthread_number)
        {
            p[i].segment_size = segment + segment_remain;
        }
        else
        {
            p[i].segment_size = segment;
        }
        //task file pointer offset
        p[i].start = offset;
        offset += p[i].segment_size;
        //file path
        // strncpy(p[i].src,src,strlen(src));
        // strncpy(p[i].dest,dest,strlen(dest));
        p[i].src = fin;
        p[i].dest = fout;
        //virtual thread id
        p[i].id = i;
    }
    //create thread
    pthread_t work[real_pthread_number];
    for(i = 0;i < real_pthread_number;i++)
    {
        pthread_create(&work[i],NULL,(void *)&pthread_copy,(void *)&p[i]);
    }
    //wait subthread exit
    for(i = 0;i < real_pthread_number;i++)
    {
        pthread_join(work[i],NULL);
    }
    //free
    if(p != NULL)
    {
        free(p);
        p = NULL;
    }
    fclose(fin);
    fclose(fout);
    
    return 0;
}

在main中以rb打開源文件,wb+打開目標(biāo)文件
修改結(jié)構(gòu)體存儲(chǔ)文件指針
在線程pthread_copy中操作文件指針位置,去除打開文件操作(在線程中以wb+打開文件會(huì)將文件截?cái)酁?)
在線程調(diào)度函數(shù)multi_copy中打開文件并在末尾關(guān)閉文件
將文件指針賦給每個(gè)線程的task_info結(jié)構(gòu)體成員

二、線程鎖版本

#include <stdio.h>
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>

#define BUFF_SIZE 512
#define PTHREAD_NUMBER 4

typedef struct copy_block{
    char src[BUFF_SIZE];    //source file
    char dest[BUFF_SIZE];    //destnation file
    long start;                //begin position
    long segment_size;        //aim copy size
    int id;                    //virtual thread id
}__attribute__((packed)) Page;

static pthread_mutex_t mut;

long file_size(const char *filename);
void pthread_copy(void *task_info);
int multi_copy(const char *dest,const char *src);

int main(int argc,char *argv[])
{
    if(argc < 3)
    {
        fprintf(stderr,"Error params!\n");
        return 0;
    }
    char *src = argv[1];
    char *dest = argv[2];
    
    multi_copy(dest,src);
    
    return 0;
}

//get file size
long file_size(const char *filename)
{
    struct stat fstat;
    memset(&fstat,0,sizeof(fstat));
    stat(filename,&fstat);
    
    return fstat.st_size;
}

//thread task
void pthread_copy(void *task_info)
{
    pthread_mutex_lock(&mut);
    //convert void * to Page *
    Page *p = (Page *)task_info;
    
    //open file for every thread
    FILE *fin = fopen(p->src,"rb");
    FILE *fout = fopen(p->dest,"rb+");
    
    if(fout == NULL)
    {
        fprintf(stderr,"Open file %s fail\n",p->dest);
        return ;
    }
    
    //seek the stream to the offset
    int res_in = fseek(fin,p->start,SEEK_SET);
    int res_out = fseek(fout,p->start,SEEK_SET);
    
    char buffer[BUFF_SIZE];            //buffer
    long read_size = BUFF_SIZE;        //the size for read
    long left  = p->segment_size;    //current left bytes
    long read_len = 0;                //the size has read
    long total_len = 0;                //the readed total size
    
    printf("Thread %d begin\t",p->id);
    printf("start %ld\t",p->start);
    printf("segment_size %ld\t",p->segment_size);
    printf("src %s\t",p->src);
    printf("dest %s\n",p->dest);
    
    //begin copy
    while(left > 0)
    {
        if(left < read_size)
        {
            read_size = left;
        }
        //read file
        read_len = fread(buffer,sizeof(char),read_size,fin);
        total_len += read_len;
        //write file
        if(read_len > 0)
        {
            fwrite(buffer,sizeof(char),read_len,fout);
        }
        //the left size
        left -= read_len;
    }
    fclose(fin);
    fclose(fout);
    pthread_mutex_unlock(&mut);
    pthread_exit(NULL);
}
//create multi thread
int multi_copy(const char *dest,const char *src)
{
    //file is exists
    int file_exists = access(src,04);//read access
    if(file_exists != 0)
    {
        fprintf(stderr,"file is not exists!\n");
        return 1;
    }
    int dest_file_exists = access(dest,04);
    if(dest_file_exists != 0)
    {
        fprintf(stderr,"File %s not exists,create it\n",dest);
        FILE *cfp = fopen(dest,"w");
        fclose(cfp);
    }
    //file size
    long filesize = file_size(src);
    //pthread number
    int real_pthread_number = PTHREAD_NUMBER;
    if(filesize < BUFF_SIZE)
    {
        real_pthread_number = 1;
    }
    Page *p = (Page *)malloc(sizeof(Page) * real_pthread_number);
    
    long offset = 0;                                        //offset from file begin
    long segment = filesize / real_pthread_number;            //the bytes of every thread
    long segment_remain = filesize % real_pthread_number;    //the left bytes for the last thread
    
    //ctor the task_info
    int i;
    for(i = 0;i < real_pthread_number;i++)
    {
        if((i + 1) == real_pthread_number)
        {
            p[i].segment_size = segment + segment_remain;
        }
        else
        {
            p[i].segment_size = segment;
        }
        //task file pointer offset
        p[i].start = offset;
        offset += p[i].segment_size;
        //file path
        strncpy(p[i].src,src,strlen(src));
        strncpy(p[i].dest,dest,strlen(dest));
        //virtual thread id
        p[i].id = i;
    }
    //create thread
    pthread_mutex_init(&mut,NULL);//init thread lock
    pthread_t work[real_pthread_number];
    for(i = 0;i < real_pthread_number;i++)
    {
        pthread_create(&work[i],NULL,(void *)&pthread_copy,(void *)&p[i]);
    }
    //wait subthread exit
    for(i = 0;i < real_pthread_number;i++)
    {
        pthread_join(work[i],NULL);
    }
    //free
    if(p != NULL)
    {
        free(p);
        p = NULL;
    }
    
    return 0;
}
2017年12月12日 05:24