I folks, i need some help. I have to implement a server/client project for transfer files from clients to server. Server receives N number of sockets, one socket per server-process, using fork(), and more than 1 client per socket(threads). Clients sends a file to server and it has to backup it and show TX speed and bytes count. I resolved the TX issue, for many server-process, but when i try to apply threads for concurrency on same socket it does not work, messages and files are not received.
Server use 1 parameter: number_of_sockets
Client use 3 parameters: server_ip server_port file_name
server.c
#include<sys/socket.h> #include<netinet/in.h> #include<arpa/inet.h> #include<string.h> #include<stdio.h> #include<stdlib.h> #include<pthread.h> #include<fcntl.h> #include<errno.h> #include<sys/resource.h> #include<sys/wait.h> #include<signal.h> #include<unistd.h> #include<time.h> // accepting connections short int aceptar_conexiones = 1; //struct for threads typedef struct{ int fd_socket; struct sockaddr_in address; socklen_t sock_length; }thread_args; //struct for associate server-process and sockets typedef struct{ int fd_socket; pid_t pid_proceso; }fd_socket_proceso; //max sockets open in server unsigned int max_sockets; //max server-process in server unsigned int cant_procesos; //dynamic array for associated struct fd_socket_proceso *fdsp; //mutex for sync threads pthread_mutex_t mutex_thread; #define TAM_BUFFER 1024 //handler for threads //void *connection_handler(void *); void connection_handler(void *); void finaliza_sockets(); void finaliza_procesos(); void error(const char *); int main(int argc, char *argv[]) { int fd_listen_socket, fd_communication_socket, i; max_sockets = atoi(argv[1]); struct sockaddr_in listen_address, connection_address; socklen_t con_addr_len = sizeof(connection_address); //allocating memory for dynamic array fdsp = (fd_socket_proceso *)malloc(sizeof(fd_socket_proceso)*max_sockets); //create and open sockets, grabbing it on array, it starts on 1024 and on, max 5 clients per socket for(i=0 ; i<max_sockets ; i++){ fd_listen_socket = socket(AF_INET, SOCK_STREAM, 0); if(fd_listen_socket < 0) error("No se pudo crear el socket.\n"); bzero(&listen_address, sizeof(struct sockaddr_in)); listen_address.sin_family = AF_INET; listen_address.sin_port = htons(1024+i); // Puede utilizarse cualquier puerto listen_address.sin_addr.s_addr = htonl(INADDR_ANY); // Cualquier direccion propia if(bind(fd_listen_socket, (struct sockaddr *)&listen_address, sizeof(struct sockaddr)) < 0) error("No se puede enlazar el socket.\n"); printf("Servidor escuchando en puerto: %d\n",ntohs(listen_address.sin_port)); listen(fd_listen_socket, 5); fdsp[i].fd_socket = fd_listen_socket; } printf("Comenzamos a escuchar conexiones...\n"); fflush(stdout); //fork per socket for(i=0 ; i<max_sockets ; i++ , cant_procesos++){ if(!(fdsp[i].pid_proceso=fork())){ while(aceptar_conexiones){ bzero(&connection_address, sizeof(connection_address)); if((fd_communication_socket = accept(fdsp[i].fd_socket, (struct sockaddr *)&connection_address, &con_addr_len))==-1){ finaliza_sockets(); finaliza_procesos(); perror("Error en la comunicacion con el socket"); exit(EXIT_FAILURE); } pthread_t thread_cliente; pthread_attr_t thread_attr; thread_args t_args; pthread_attr_init(&thread_attr); pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED); t_args.address = connection_address; t_args.fd_socket = fd_communication_socket; t_args.sock_length = con_addr_len; //pthread_create(&thread_cliente, NULL, connection_handler, (void *)&t_args); connection_handler((void *)&t_args); } } } finaliza_sockets(); finaliza_procesos(); printf("Server offline.\n"); } //void *connection_handler(void *t_args) void connection_handler(void *t_args) { thread_args *t = (thread_args *)t_args; char buffer[TAM_BUFFER]; char *buffer2; FILE *fp; char *datos[2]; char file_name[TAM_BUFFER]; char *ip_cliente = inet_ntoa(t->address.sin_addr); int port_cliente = ntohs(t->address.sin_port); long int file_size, bytes_restantes; ssize_t recv_size; int t_inicio ,t_fin , t_transferencia; int i; bzero(buffer, sizeof(buffer)); //1st msg recieve file_size concat using char '°' with file_name recv_size = recvfrom(t->fd_socket, buffer, sizeof(buffer), 0, (struct sockaddr *)&(t->address), &(t->sock_length)); //parse file_size and file_name buffer2=strtok(buffer,"°"); for(i=0 ; buffer2 ; i++){ datos[i]=buffer2; buffer2=strtok(NULL,"°"); } file_size = atoi(datos[0]); strcpy(file_name, datos[1]); //concat file_name for backup strcat(file_name, ".bkp"); if(fp=fopen(file_name, "r")){ printf("\nError, el fichero %s provisto por el cliente %s ya existe! Este sera omitido por el servidor.\n",file_name, ip_cliente); fflush(stdout); close(t->fd_socket); return; }else{ if((fp=fopen(file_name,"w"))==NULL){ close(t->fd_socket); printf("\nError en la creacion del fichero %s provisto por el cliente %s.\n",file_name, ip_cliente); fflush(stdout); exit(errno); }else{ bytes_restantes = file_size; bzero(buffer, sizeof(buffer)); //calculate begin TX t_inicio = clock(); //recieve file from sendfile() from client recv_size = recv(t->fd_socket, buffer, sizeof(buffer), 0); printf("llego, recv: %ld\n",recv_size); while(recv_size > 0 && bytes_restantes > 0){ //calculate finish TX t_fin = clock(); bytes_restantes -= recv_size; //write on file descriptor fwrite(buffer, sizeof(char), recv_size, fp); //calculate TX speed t_transferencia = t_fin-t_inicio/CLOCKS_PER_SEC; //print data about TX printf("Cliente:%s:%d, Fichero:%*s, Bytes recibidos:%*ld, Bytes restantes:%*ld, Velocidad TX:%d (bytes/seg)\r", ip_cliente, port_cliente, strlen(file_name)*1L, file_name ,sizeof(long int), file_size-bytes_restantes, sizeof(long int), bytes_restantes, t_transferencia); fflush(stdout); bzero(buffer, sizeof(buffer)); //recieve file from sendfile() from client recv_size = recv(t->fd_socket, buffer, sizeof(buffer), 0); t_inicio = clock(); } } } printf("\nCliente:%s:%d\tFichero:%s\tTransferencia finalizada.\n",ip_cliente, port_cliente, file_name); fflush(stdout); fclose(fp); close(t->fd_socket); //pthread_exit(0); } //close sockets void finaliza_sockets(){ int i; for(i=0 ; i<max_sockets ; i++) close(fdsp[i].fd_socket); free(fdsp); } //wait for all server-process void finaliza_procesos(){ int i; for(i=0 ; i<cant_procesos ; i++) wait(0); } void error(const char *msg){ perror(msg); exit(errno); } client.c
#include<sys/types.h> #include<sys/socket.h> #include<netinet/in.h> #include<arpa/inet.h> #include<stdio.h> #include<stdlib.h> #include<string.h> #include<errno.h> #include<signal.h> #include<time.h> #include<sys/stat.h> #include<fcntl.h> #include<sys/sendfile.h> #define TAM_BUFFER 1024 int main(int argc, char *argv[]) { int fd_socket; int i; int tini, tfin, ttransferencia; struct sockaddr_in server_address; char *path, *buffer, *vect[30], linea[TAM_BUFFER], linea2[TAM_BUFFER], file_name[TAM_BUFFER/4]; int fd_file; long int send_bytes, offset, remain_data; struct stat file_stat; //file_name , maybe use path path = argv[3]; if((fd_file = open(path, O_RDONLY))==-1){ perror("error en la apertura del archivo\n"); exit(EXIT_FAILURE); } if(fstat(fd_file, &file_stat)<0){ perror("error fstat"); exit(EXIT_FAILURE); } //connection with server fd_socket = socket(AF_INET, SOCK_STREAM, 0); server_address.sin_family = AF_INET; server_address.sin_port = htons(atoi(argv[2])); server_address.sin_addr.s_addr = inet_addr(argv[1]); bzero(&(server_address.sin_zero),8); signal(SIGINT,SIG_DFL); //establishing connection if(connect(fd_socket,(struct sockaddr *)&server_address,sizeof(struct sockaddr)) < -1){ perror("error on connection with server"); exit(errno); } //parse file_name from path i=0; buffer=strtok(path,"/"); while(buffer != NULL) { vect[i]=buffer; buffer=strtok(NULL,"/"); i++; } //concat file_size with char '°' and file_name strcpy(file_name, vect[i-1]); sprintf(linea,"%ld",file_stat.st_size); strcat(linea,"°"); strcat(linea, file_name); //send concatenated char send_bytes = send(fd_socket,linea,strlen(linea),0); //send file to server offset = 0; remain_data = file_stat.st_size; while(((send_bytes = sendfile(fd_socket, fd_file,(off_t *) &offset, TAM_BUFFER)) > 0) && (remain_data > 0)){ printf("enviados: %*ld bytes\toffset: %*ld\tbytes restantes: %ld\r",sizeof(send_bytes), send_bytes, sizeof(offset), offset, remain_data); sizeof(stdout); remain_data -= send_bytes; } close(fd_file); close(fd_socket); return 0; } As i said, my problem is i need concurrency on each socket. How i manage and sync them? I tried basic things but it does not work, so i comment thread_create and thread_exit func.
Thanks for your time.-
connection_handlerfunction has thevoid*return type commented out in favor ofvoid. This is incorrect for use with pthread_create. Did you know that? Do you have compiler warnings fully enabled?