Skip to content

Commit

Permalink
Merge pull request #120 from marcone/posixqueue
Browse files Browse the repository at this point in the history
Switch to POSIX message queue
  • Loading branch information
jfdelnero authored Nov 18, 2024
2 parents c479e35 + a734ef0 commit cc2b7d7
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 105 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CFLAGS += -I./inc -Wall
LDFLAGS += -lpthread
LDFLAGS += -lpthread -lrt

sources := $(wildcard src/*.c)
objects := $(sources:src/%.c=obj/%.o)
Expand Down
171 changes: 67 additions & 104 deletions src/msgqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@

#include <inttypes.h>
#include <pthread.h>
#include <fcntl.h>
#include <mqueue.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <unistd.h>
#include <signal.h>

Expand All @@ -53,12 +53,15 @@
#include "inotify.h"
#include "logs_out.h"

#define MAX_MSG_SIZE 100
// minimum message size for POSIX queues on Linux
#define MAX_MSG_SIZE 128

typedef struct mesg_buffer_ {
long mesg_type;
char mesg_text[MAX_MSG_SIZE];
} queue_msg_buf;
static struct mq_attr qattrs = {
0, // flags
20, // max number of messages on the queue
MAX_MSG_SIZE,
0 // number of messages currently on the queue
};

static void *msgqueue_gotsig(int sig, siginfo_t *info, void *ucontext)
{
Expand All @@ -68,7 +71,7 @@ static void *msgqueue_gotsig(int sig, siginfo_t *info, void *ucontext)
static void* msgqueue_thread( void* arg )
{
mtp_ctx * ctx;
queue_msg_buf msg_buf;
char message[MAX_MSG_SIZE + 1];
uint32_t handle[3];
int store_index;
struct sigaction sa;
Expand All @@ -87,25 +90,33 @@ static void* msgqueue_thread( void* arg )
return (void *)-1;
}

PRINT_DEBUG("msgqueue_thread : Thread started");
PRINT_DEBUG("%s : Thread started", __func__);

do
{
if( msgrcv(ctx->msgqueue_id, &msg_buf, MAX_MSG_SIZE, 1, 0) > 0 )
ssize_t numreceived = mq_receive(ctx->msgqueue_id, message, MAX_MSG_SIZE + 1, NULL);
if (numreceived < 0)
{
PRINT_DEBUG("msgqueue_thread : New message received : %s",msg_buf.mesg_text);
PRINT_ERROR("error receiving: %s", strerror(errno));
}
else if (numreceived <= MAX_MSG_SIZE)
{
message[numreceived] = 0; // ensure zero termination
PRINT_DEBUG("%s : New message received : %s", __func__, message);

if (!strncmp(msg_buf.mesg_text,"addstorage:", 11)) {
mtp_add_storage_from_line(ctx, &msg_buf.mesg_text[11], 0);
if (!strncmp(message,"addstorage:", 11))
{
mtp_add_storage_from_line(ctx, message + 11, 0);
}

if (!strncmp(msg_buf.mesg_text,"rmstorage:", 10)) {
mtp_remove_storage_from_line(ctx, &msg_buf.mesg_text[10], 0);
if (!strncmp(message,"rmstorage:", 10))
{
mtp_remove_storage_from_line(ctx, message + 10, 0);
}

if(!strncmp((char*)&msg_buf.mesg_text,"mount:",6))
if(!strncmp(message,"mount:",6))
{
store_index = mtp_get_storage_index_by_name(ctx, (char*)&msg_buf.mesg_text + 6);
store_index = mtp_get_storage_index_by_name(ctx, message + 6);

if(store_index >= 0)
{
Expand All @@ -125,13 +136,13 @@ static void* msgqueue_thread( void* arg )
}
else
{
PRINT_ERROR("msgqueue_thread : Store not found : %s",(char*)&msg_buf.mesg_text + 6);
PRINT_ERROR("%s : Store not found : %s", __func__, message + 6);
}
}

if(!strncmp((char*)&msg_buf.mesg_text,"unmount:",8))
if(!strncmp(message,"unmount:",8))
{
store_index = mtp_get_storage_index_by_name(ctx, (char*)&msg_buf.mesg_text + 8);
store_index = mtp_get_storage_index_by_name(ctx, message + 8);
if(store_index >= 0)
{
if( !pthread_mutex_lock( &ctx->inotify_mutex ) )
Expand All @@ -150,11 +161,11 @@ static void* msgqueue_thread( void* arg )
}
else
{
PRINT_ERROR("msgqueue_thread : Store not found : %s",(char*)&msg_buf.mesg_text + 8);
PRINT_ERROR("%s : Store not found : %s", __func__, message + 8);
}
}

if(!strncmp((char*)&msg_buf.mesg_text,"lock",4))
if(!strncmp(message,"lock",4))
{
store_index = 0;
while(store_index < MAX_STORAGE_NB)
Expand Down Expand Up @@ -185,7 +196,7 @@ static void* msgqueue_thread( void* arg )
}
}

if(!strncmp((char*)&msg_buf.mesg_text,"unlock",6))
if(!strncmp(message,"unlock",6))
{
store_index = 0;
while(store_index < MAX_STORAGE_NB)
Expand Down Expand Up @@ -218,123 +229,75 @@ static void* msgqueue_thread( void* arg )
}
else
{
break;
PRINT_DEBUG("received message of size %zd", numreceived);
}

}while(1);

msgctl(ctx->msgqueue_id, IPC_RMID, NULL);
} while(1);

PRINT_DEBUG("msgqueue_thread : Leaving msgqueue_thread...");
PRINT_DEBUG("%s : Leaving thread", __func__);

return NULL;

error:
msgctl(ctx->msgqueue_id, IPC_RMID, NULL);

PRINT_DEBUG("msgqueue_thread : General Error ! Leaving msgqueue_thread...");
PRINT_DEBUG("%s : General Error ! Leaving thread", __func__);

return NULL;
}

static int get_current_exec_path( char * exec_path, int maxsize )
{
pid_t pid;
char path[PATH_MAX];
char tmp_exec_path[PATH_MAX + 1];

memset(tmp_exec_path,0,sizeof(tmp_exec_path));

pid = getpid();

sprintf(path, "/proc/%d/exe", pid);

if (readlink(path, tmp_exec_path, PATH_MAX) == -1)
{
return -1;
}

tmp_exec_path[PATH_MAX] = 0;

if(strlen(tmp_exec_path) < maxsize)
{
strncpy(exec_path,tmp_exec_path,maxsize);
return 0;
}
else
return -2;
}

int send_message_queue( char * message )
int send_message_queue(char * message )
{
key_t key;
int msgqueue_id;
char exec_path[PATH_MAX + 1];
queue_msg_buf msg_buf;
mqd_t msgqueue_id;

if(get_current_exec_path(exec_path, sizeof(exec_path)) >= 0)
msgqueue_id = mq_open("/umtprd", O_RDWR | O_CREAT, 0600, &qattrs);
PRINT_DEBUG("%s : msgqueue_id = %d", __func__, msgqueue_id);
if (msgqueue_id != -1)
{
PRINT_DEBUG("send_message_queue : current exec path : %s", exec_path);

key = ftok(exec_path, 44);

msgqueue_id = msgget(key, 0666 | IPC_CREAT);

PRINT_DEBUG("send_message_queue : msgqueue_id = %d", msgqueue_id);

msg_buf.mesg_type = 1;
strncpy(msg_buf.mesg_text,message,MAX_MSG_SIZE - 1);
msg_buf.mesg_text[MAX_MSG_SIZE - 1] = '\0'; // to be sure to terminate the string - see the strncpy's behavior.
if( msgsnd(msgqueue_id, &msg_buf, MAX_MSG_SIZE, 0) == 0 )
if (mq_send(msgqueue_id, message, strlen(message) + 1, 0) == 0 )
{
return 0;
}
else
{
PRINT_ERROR("%s : Couldn't send %s !", __func__, message);
}
}
else
{
PRINT_ERROR("%s : mq_open error %d (%s)", __func__, errno, strerror(errno));
}

PRINT_ERROR("send_message_queue : Couldn't send %s !", message);

return -1;
}

int msgqueue_handler_init( mtp_ctx * ctx )
int msgqueue_handler_init(mtp_ctx * ctx )
{
key_t key;
char exec_path[PATH_MAX + 1];
int ret;

if( ctx )
{
if(get_current_exec_path(exec_path, sizeof(exec_path)-1) >= 0)
ctx->msgqueue_id = mq_open("/umtprd", O_RDWR | O_CREAT, 0600, &qattrs);
if(ctx->msgqueue_id != -1)
{
PRINT_DEBUG("msgqueue_handler_init : current exec path : %s", exec_path);

key = ftok(exec_path, 44);

ctx->msgqueue_id = msgget(key, 0666 | IPC_CREAT);
PRINT_DEBUG("%s : msgqueue_id = %d", __func__, ctx->msgqueue_id);

if(ctx->msgqueue_id != -1)
ret = pthread_create(&ctx->msgqueue_thread, NULL, msgqueue_thread, ctx);
if(ret != 0)
{
PRINT_DEBUG("msgqueue_handler_init : msgqueue_id = %d", ctx->msgqueue_id);

ret = pthread_create(&ctx->msgqueue_thread, NULL, msgqueue_thread, ctx);
if(ret != 0)
{
PRINT_ERROR("msgqueue_handler_init : msgqueue_thread thread creation failed ! (error %d)", ret);
return -1;
}
else
{
return 1;
}
PRINT_ERROR("%s : msgqueue_thread thread creation failed ! (error %d)", __func__, ret);
return -1;
}
else
{
PRINT_ERROR("msgqueue_handler_init : msgget error %d", errno);
return 0;
}
}
else
{
PRINT_ERROR("%s : mq_open error %d (%s)", __func__, errno, strerror(errno));
}
}

return 0;
return -2;
}

int msgqueue_handler_deinit( mtp_ctx * ctx )
Expand All @@ -347,7 +310,7 @@ int msgqueue_handler_deinit( mtp_ctx * ctx )
{
pthread_kill( ctx->msgqueue_thread, SIGUSR1);
pthread_join( ctx->msgqueue_thread, &ret);
ctx->msgqueue_id = -1;
mq_close(ctx->msgqueue_id);
}

return 1;
Expand Down

0 comments on commit cc2b7d7

Please sign in to comment.