Multithreaded C Implementation with Regex and File I/O for Extracting Numeric Data

This article demonstrates a C program that reads a source file line by line, extracts numeric segments from each line using regular expressions, and writes the results to a target file. The implementation uses two threads that alternate execution, with a mutex protecting shared data.

The gather thread is responsible for reading lines from the source file and applying a regex to extract numbers. The process thread takes the extracted data and writes it into the target file. Thread alternation is controlled by a shared state varible, and a mutex ensures thread-safe access to the shared data buffer.

Below is the main implementation file:

#define __STDC_WANT_LIB_EXT1__ 1
#include "memory_tracker.h"
#include <threads.h>
#include <regex.h>
#include <string.h>
#include <stdbool.h>
#include <time.h>

typedef struct ThreadContext {
    FILE* fileHandle;
    char* filePath;
    char** sharedBuffer;
} ThreadContext;

int collector_thread(void*);
int writer_thread(void*);
bool fetch_next_line(FILE*, char**);
char* extract_next_number(regex_t*, char*, size_t);

mtx_t shared_mutex;
thrd_t collector_id;
thrd_t writer_id;

bool collector_done = false;
bool fresh_line_ready = false;
bool active_role = 1;
bool role_collector = 0;
bool role_writer = 1;

struct timespec short_delay = {.tv_nsec = 1000};

int main(void) {
    ThreadContext source_ctx = {.fileHandle = NULL, .filePath = "source.bin", .sharedBuffer = calloc(1, sizeof(char*))};
    ThreadContext target_ctx = {.fileHandle = NULL, .filePath = "target.bin", .sharedBuffer = source_ctx.sharedBuffer};

    register_allocation((void**)&(source_ctx.sharedBuffer));

    if (mtx_init(&shared_mutex, mtx_plain) != thrd_success) {
        fprintf(stderr, "Mutex initialization failed.\n");
        thrd_exit(EXIT_FAILURE);
    }

    errno_t status = fopen_s(&source_ctx.fileHandle, source_ctx.filePath, "rb");
    if (status) {
        fprintf(stderr, "Cannot open %s. Reason: %s\n", source_ctx.filePath, strerror(status));
        thrd_exit(EXIT_FAILURE);
    }

    status = fopen_s(&target_ctx.fileHandle, target_ctx.filePath, "wb");
    if (status) {
        fprintf(stderr, "Cannot open %s. Reason: %s\n", target_ctx.filePath, strerror(status));
        thrd_exit(EXIT_FAILURE);
    }

    if (thrd_create(&collector_id, collector_thread, &source_ctx) == thrd_success) {
        printf("main > Collector thread started.\n");
    } else {
        fprintf(stderr, "Failed to spawn collector thread.\n");
        thrd_exit(EXIT_FAILURE);
    }

    if (thrd_create(&writer_id, writer_thread, &target_ctx) == thrd_success) {
        printf("main > Writer thread started.\n");
    } else {
        fprintf(stderr, "Failed to spawn writer thread.\n");
        exit(EXIT_FAILURE);
    }

    thrd_join(writer_id, NULL);

    fclose(source_ctx.fileHandle);
    fclose(target_ctx.fileHandle);

    free_tracked_memory();
    return 0;
}

int collector_thread(void* arg) {
    ThreadContext* ctx = (ThreadContext*)arg;
    FILE* input = ctx->fileHandle;
    char** shared = ctx->sharedBuffer;
    bool has_line = false;
    char* temp_buffer = NULL;
    char* token = NULL;
    regex_t compiled_pattern;
    const char* expression = "[[:digit:]]+";

    if (regcomp(&compiled_pattern, expression, REG_EXTENDED)) {
        char error_msg[100];
        regerror(regcomp(&compiled_pattern, expression, REG_EXTENDED), &compiled_pattern, error_msg, 100);
        fprintf(stderr, "Regex compilation error: %s\n", error_msg);
        exit(EXIT_FAILURE);
    }

    while (true) {
        mtx_lock(&shared_mutex);
        has_line = fetch_next_line(input, shared);
        if (!has_line) {
            collector_done = true;
            regfree(&compiled_pattern);
            active_role = role_writer;
            mtx_unlock(&shared_mutex);
            break;
        }

        if (strlen(*shared)) {
            temp_buffer = calloc(strlen(*shared), sizeof(char));
            while ((token = extract_next_number(&compiled_pattern, *shared, strlen(*shared))) != NULL) {
                strcat(temp_buffer, token);
                free(token);
            }
            temp_buffer = realloc(temp_buffer, strlen(temp_buffer));
            free(*shared);
            *shared = temp_buffer;
        }
        active_role = role_writer;
        fresh_line_ready = true;
        mtx_unlock(&shared_mutex);

        while (active_role == role_writer) {
            thrd_sleep(&short_delay, NULL);
        }
    }
    return 0;
}

int writer_thread(void* arg) {
    ThreadContext* ctx = (ThreadContext*)arg;
    while (true) {
        mtx_lock(&shared_mutex);
        if (collector_done) {
            free(*(ctx->sharedBuffer));
            break;
        }
        if (*(ctx->sharedBuffer)) {
            fprintf(ctx->fileHandle, *(ctx->sharedBuffer));
        }
        active_role = role_collector;
        mtx_unlock(&shared_mutex);

        while (active_role == role_collector) {
            thrd_sleep(&short_delay, NULL);
        }
    }
    return 0;
}

char* extract_next_number(regex_t* pattern, char* str, size_t length) {
    static size_t offset = 0;
    if (fresh_line_ready) {
        offset = 0;
        fresh_line_ready = false;
    }
    if (offset == length) {
        return NULL;
    }

    char* start = str + offset;
    size_t group_count = pattern->re_nsub;
    regmatch_t matches[group_count + 1];

    if (regexec(pattern, start, group_count + 1, matches, 0) == REG_NOMATCH) {
        return NULL;
    }

    offset += matches[0].rm_eo;
    size_t match_len = matches[0].rm_eo - matches[0].rm_so;
    start += matches[0].rm_so;

    char* result = calloc(match_len + 1, sizeof(char));
    memcpy(result, start, match_len);
    return result;
}

bool fetch_next_line(FILE* file, char** buffer) {
    if (*buffer) {
        free(*buffer);
        *buffer = NULL;
    }

    static bool eof_reached = false;
    if (eof_reached) {
        return false;
    }

    unsigned long capacity = 5;
    unsigned long growth = 10;
    fpos_t checkpoint;
    fgetpos(file, &checkpoint);

    char* temp_ptr = NULL;

retry:
    *buffer = calloc(capacity, sizeof(char));
    fgets(*buffer, capacity, file);

    if (feof(file)) {
        temp_ptr = realloc(*buffer, strlen(*buffer) + 1);
        if (!temp_ptr) {
            fprintf(stderr, "Reallocation failure at line %d. Rolling back.\n", __LINE__);
            fsetpos(file, &checkpoint);
            free(*buffer);
            *buffer = NULL;
            return false;
        }
        *buffer = temp_ptr;
        eof_reached = true;
        return true;
    }

    size_t len = strlen(*buffer);
    if ((*buffer)[len - 1] == '\n') {
        (*buffer)[len - 2] = '\0';
        if ((*buffer)[0] == '\0') {
            return true;
        }
        temp_ptr = realloc(*buffer, strlen(*buffer) + 1);
        if (!temp_ptr) {
            fprintf(stderr, "Reallocation failure at line %d. Rolling back.\n", __LINE__);
            fsetpos(file, &checkpoint);
            free(*buffer);
            *buffer = NULL;
            return false;
        }
        *buffer = temp_ptr;
        return true;
    }

    capacity += growth;
    free(*buffer);
    fsetpos(file, &checkpoint);
    goto retry;
}

The following header file provides utilities for tracking dynamically allocated memory so it can be relaesed cleanly at program exit:

#ifndef MEMORY_TRACKER_H
#define MEMORY_TRACKER_H

#include <stdlib.h>
#include <stdio.h>

typedef struct MemoryNode {
    void** address;
    struct MemoryNode* next;
} MemoryNode;

static MemoryNode* memory_list = NULL;
static unsigned long tracked_count = 0;

unsigned long register_allocation(void** ptr) {
    MemoryNode* new_node = calloc(1, sizeof(MemoryNode));
    new_node->address = ptr;
    new_node->next = NULL;

    if (memory_list == NULL) {
        memory_list = new_node;
    } else {
        MemoryNode* last = memory_list;
        while (last->next) {
            last = last->next;
        }
        last->next = new_node;
    }
    return ++tracked_count;
}

void free_tracked_memory(void) {
    MemoryNode* current = memory_list;
    while (current) {
        memory_list = current->next;
        free(*(current->address));
        *(current->address) = NULL;
        free(current);
        current = memory_list;
    }
    tracked_count = 0;
}

#endif

Tags: C multithreading regex file-io Synchronization

Posted on Tue, 26 May 2026 18:04:04 +0000 by ionik