This article demonstrates a C program that reads a source file line by line, extracts numeric segments from each line using regular expressions, and writes the results to a target file. The implementation uses two threads that alternate execution, with a mutex protecting shared data.
The gather thread is responsible for reading lines from the source file and applying a regex to extract numbers. The process thread takes the extracted data and writes it into the target file. Thread alternation is controlled by a shared state varible, and a mutex ensures thread-safe access to the shared data buffer.
Below is the main implementation file:
#define __STDC_WANT_LIB_EXT1__ 1
#include "memory_tracker.h"
#include <threads.h>
#include <regex.h>
#include <string.h>
#include <stdbool.h>
#include <time.h>
typedef struct ThreadContext {
FILE* fileHandle;
char* filePath;
char** sharedBuffer;
} ThreadContext;
int collector_thread(void*);
int writer_thread(void*);
bool fetch_next_line(FILE*, char**);
char* extract_next_number(regex_t*, char*, size_t);
mtx_t shared_mutex;
thrd_t collector_id;
thrd_t writer_id;
bool collector_done = false;
bool fresh_line_ready = false;
bool active_role = 1;
bool role_collector = 0;
bool role_writer = 1;
struct timespec short_delay = {.tv_nsec = 1000};
int main(void) {
ThreadContext source_ctx = {.fileHandle = NULL, .filePath = "source.bin", .sharedBuffer = calloc(1, sizeof(char*))};
ThreadContext target_ctx = {.fileHandle = NULL, .filePath = "target.bin", .sharedBuffer = source_ctx.sharedBuffer};
register_allocation((void**)&(source_ctx.sharedBuffer));
if (mtx_init(&shared_mutex, mtx_plain) != thrd_success) {
fprintf(stderr, "Mutex initialization failed.\n");
thrd_exit(EXIT_FAILURE);
}
errno_t status = fopen_s(&source_ctx.fileHandle, source_ctx.filePath, "rb");
if (status) {
fprintf(stderr, "Cannot open %s. Reason: %s\n", source_ctx.filePath, strerror(status));
thrd_exit(EXIT_FAILURE);
}
status = fopen_s(&target_ctx.fileHandle, target_ctx.filePath, "wb");
if (status) {
fprintf(stderr, "Cannot open %s. Reason: %s\n", target_ctx.filePath, strerror(status));
thrd_exit(EXIT_FAILURE);
}
if (thrd_create(&collector_id, collector_thread, &source_ctx) == thrd_success) {
printf("main > Collector thread started.\n");
} else {
fprintf(stderr, "Failed to spawn collector thread.\n");
thrd_exit(EXIT_FAILURE);
}
if (thrd_create(&writer_id, writer_thread, &target_ctx) == thrd_success) {
printf("main > Writer thread started.\n");
} else {
fprintf(stderr, "Failed to spawn writer thread.\n");
exit(EXIT_FAILURE);
}
thrd_join(writer_id, NULL);
fclose(source_ctx.fileHandle);
fclose(target_ctx.fileHandle);
free_tracked_memory();
return 0;
}
int collector_thread(void* arg) {
ThreadContext* ctx = (ThreadContext*)arg;
FILE* input = ctx->fileHandle;
char** shared = ctx->sharedBuffer;
bool has_line = false;
char* temp_buffer = NULL;
char* token = NULL;
regex_t compiled_pattern;
const char* expression = "[[:digit:]]+";
if (regcomp(&compiled_pattern, expression, REG_EXTENDED)) {
char error_msg[100];
regerror(regcomp(&compiled_pattern, expression, REG_EXTENDED), &compiled_pattern, error_msg, 100);
fprintf(stderr, "Regex compilation error: %s\n", error_msg);
exit(EXIT_FAILURE);
}
while (true) {
mtx_lock(&shared_mutex);
has_line = fetch_next_line(input, shared);
if (!has_line) {
collector_done = true;
regfree(&compiled_pattern);
active_role = role_writer;
mtx_unlock(&shared_mutex);
break;
}
if (strlen(*shared)) {
temp_buffer = calloc(strlen(*shared), sizeof(char));
while ((token = extract_next_number(&compiled_pattern, *shared, strlen(*shared))) != NULL) {
strcat(temp_buffer, token);
free(token);
}
temp_buffer = realloc(temp_buffer, strlen(temp_buffer));
free(*shared);
*shared = temp_buffer;
}
active_role = role_writer;
fresh_line_ready = true;
mtx_unlock(&shared_mutex);
while (active_role == role_writer) {
thrd_sleep(&short_delay, NULL);
}
}
return 0;
}
int writer_thread(void* arg) {
ThreadContext* ctx = (ThreadContext*)arg;
while (true) {
mtx_lock(&shared_mutex);
if (collector_done) {
free(*(ctx->sharedBuffer));
break;
}
if (*(ctx->sharedBuffer)) {
fprintf(ctx->fileHandle, *(ctx->sharedBuffer));
}
active_role = role_collector;
mtx_unlock(&shared_mutex);
while (active_role == role_collector) {
thrd_sleep(&short_delay, NULL);
}
}
return 0;
}
char* extract_next_number(regex_t* pattern, char* str, size_t length) {
static size_t offset = 0;
if (fresh_line_ready) {
offset = 0;
fresh_line_ready = false;
}
if (offset == length) {
return NULL;
}
char* start = str + offset;
size_t group_count = pattern->re_nsub;
regmatch_t matches[group_count + 1];
if (regexec(pattern, start, group_count + 1, matches, 0) == REG_NOMATCH) {
return NULL;
}
offset += matches[0].rm_eo;
size_t match_len = matches[0].rm_eo - matches[0].rm_so;
start += matches[0].rm_so;
char* result = calloc(match_len + 1, sizeof(char));
memcpy(result, start, match_len);
return result;
}
bool fetch_next_line(FILE* file, char** buffer) {
if (*buffer) {
free(*buffer);
*buffer = NULL;
}
static bool eof_reached = false;
if (eof_reached) {
return false;
}
unsigned long capacity = 5;
unsigned long growth = 10;
fpos_t checkpoint;
fgetpos(file, &checkpoint);
char* temp_ptr = NULL;
retry:
*buffer = calloc(capacity, sizeof(char));
fgets(*buffer, capacity, file);
if (feof(file)) {
temp_ptr = realloc(*buffer, strlen(*buffer) + 1);
if (!temp_ptr) {
fprintf(stderr, "Reallocation failure at line %d. Rolling back.\n", __LINE__);
fsetpos(file, &checkpoint);
free(*buffer);
*buffer = NULL;
return false;
}
*buffer = temp_ptr;
eof_reached = true;
return true;
}
size_t len = strlen(*buffer);
if ((*buffer)[len - 1] == '\n') {
(*buffer)[len - 2] = '\0';
if ((*buffer)[0] == '\0') {
return true;
}
temp_ptr = realloc(*buffer, strlen(*buffer) + 1);
if (!temp_ptr) {
fprintf(stderr, "Reallocation failure at line %d. Rolling back.\n", __LINE__);
fsetpos(file, &checkpoint);
free(*buffer);
*buffer = NULL;
return false;
}
*buffer = temp_ptr;
return true;
}
capacity += growth;
free(*buffer);
fsetpos(file, &checkpoint);
goto retry;
}
The following header file provides utilities for tracking dynamically allocated memory so it can be relaesed cleanly at program exit:
#ifndef MEMORY_TRACKER_H
#define MEMORY_TRACKER_H
#include <stdlib.h>
#include <stdio.h>
typedef struct MemoryNode {
void** address;
struct MemoryNode* next;
} MemoryNode;
static MemoryNode* memory_list = NULL;
static unsigned long tracked_count = 0;
unsigned long register_allocation(void** ptr) {
MemoryNode* new_node = calloc(1, sizeof(MemoryNode));
new_node->address = ptr;
new_node->next = NULL;
if (memory_list == NULL) {
memory_list = new_node;
} else {
MemoryNode* last = memory_list;
while (last->next) {
last = last->next;
}
last->next = new_node;
}
return ++tracked_count;
}
void free_tracked_memory(void) {
MemoryNode* current = memory_list;
while (current) {
memory_list = current->next;
free(*(current->address));
*(current->address) = NULL;
free(current);
current = memory_list;
}
tracked_count = 0;
}
#endif