Socket 服务端与线程管理的结合-Deepseek

以下是一个综合示例,展示了如何在 Socket 服务端中结合线程创建与销毁,根据客户端下发的 flag 动态管理线程,并确保线程中的结构体和动态内存被正确释放。

方案设计

  • Socket 服务端:

    • 使用 socket、bind、listen 和 accept 创建 TCP 服务端。
    • 在 while 循环中持续接收客户端连接和消息。
  • 线程管理:

    • 当客户端发送 flag=1 时,创建线程。
    • 当客户端发送 flag=2 时,销毁线程。
    • 使用全局变量和互斥锁(pthread_mutex_t)管理线程状态。
  • 资源释放:

    • 使用 pthread_cleanup_push 和 pthread_cleanup_pop 确保线程退出时释放结构体和动态内存。
  • 线程函数:

    • 线程函数模拟工作逻辑,持续运行直到被取消。

完整代码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <arpa/inet.h>

#define PORT 8080
#define BUFFER_SIZE 1024

// 全局变量
pthread_t worker_thread = 0; // 工作线程 ID
volatile int thread_running = 0; // 线程运行状态
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 互斥锁

// 线程数据结构体
typedef struct {
    int id;
    char* message;
} ThreadData;

// 清理函数
void cleanup_handler(void* arg) {
    ThreadData* data = (ThreadData*)arg;
    printf("Cleanup handler: Freeing resources for thread %d\n", data->id);
    free(data->message); // 释放动态内存
    free(data);          // 释放结构体
}

// 线程函数
void* thread_func(void* arg) {
    ThreadData* data = (ThreadData*)arg;

    // 注册清理函数
    pthread_cleanup_push(cleanup_handler, data);

    // 允许线程被取消
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);

    printf("Thread %d: %s\n", data->id, data->message);

    // 模拟线程工作
    while (1) {
        printf("Thread %d: Working...\n", data->id);
        sleep(1);

        // 显式检查取消请求
        pthread_testcancel();
    }

    // 弹出清理函数(不会执行,因为线程会被取消)
    pthread_cleanup_pop(0);

    pthread_exit(NULL);
}

// 创建线程
void create_thread() {
    pthread_mutex_lock(&mutex);
    if (!thread_running) {
        ThreadData* data = (ThreadData*)malloc(sizeof(ThreadData));
        if (!data) {
            perror("malloc");
            exit(EXIT_FAILURE);
        }
        data->id = 1;
        data->message = (char*)malloc(50 * sizeof(char));
        if (!data->message) {
            perror("malloc");
            free(data);
            exit(EXIT_FAILURE);
        }
        snprintf(data->message, 50, "Hello from thread %d", data->id);

        int ret = pthread_create(&worker_thread, NULL, thread_func, data);
        if (ret != 0) {
            perror("pthread_create");
            free(data->message);
            free(data);
            exit(EXIT_FAILURE);
        }

        thread_running = 1;
        printf("Main: Created thread %d\n", data->id);
    } else {
        printf("Main: Thread is already running\n");
    }
    pthread_mutex_unlock(&mutex);
}

// 销毁线程
void destroy_thread() {
    pthread_mutex_lock(&mutex);
    if (thread_running) {
        printf("Main: Stopping thread...\n");
        int ret = pthread_cancel(worker_thread);
        if (ret != 0) {
            perror("pthread_cancel");
            exit(EXIT_FAILURE);
        }

        // 等待线程退出并回收资源
        ret = pthread_join(worker_thread, NULL);
        if (ret != 0) {
            perror("pthread_join");
            exit(EXIT_FAILURE);
        }

        thread_running = 0;
        printf("Main: Thread has exited.\n");
    } else {
        printf("Main: No thread is running\n");
    }
    pthread_mutex_unlock(&mutex);
}

// 主函数
int main() {
    int server_fd, client_fd;
    struct sockaddr_in server_addr, client_addr;
    socklen_t client_len = sizeof(client_addr);
    char buffer[BUFFER_SIZE];

    // 创建 Socket
    server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd < 0) {
        perror("socket");
        exit(EXIT_FAILURE);
    }

    // 绑定地址和端口
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(PORT);
    if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
        perror("bind");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    // 监听连接
    if (listen(server_fd, 5) < 0) {
        perror("listen");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    printf("Server is listening on port %d...\n", PORT);

    // 主循环
    while (1) {
        // 接受客户端连接
        client_fd = accept(server_fd, (struct sockaddr*)&client_addr, &client_len);
        if (client_fd < 0) {
            perror("accept");
            continue;
        }

        // 接收客户端消息
        memset(buffer, 0, BUFFER_SIZE);
        ssize_t bytes_received = recv(client_fd, buffer, BUFFER_SIZE - 1, 0);
        if (bytes_received < 0) {
            perror("recv");
            close(client_fd);
            continue;
        }

        printf("Received from client: %s\n", buffer);

        // 解析客户端 flag
        int flag = atoi(buffer);
        if (flag == 1) {
            create_thread();
        } else if (flag == 2) {
            destroy_thread();
        } else {
            printf("Invalid flag: %d\n", flag);
        }

        // 关闭客户端连接
        close(client_fd);
    }

    // 关闭服务器 Socket
    close(server_fd);
    return 0;
}

代码解析

  • Socket 服务端:

    • 使用 socket、bind、listen 和 accept 创建 TCP 服务端。
    • 在 while 循环中持续接收客户端连接和消息。
  • 线程管理:

    • create_thread:创建线程并初始化线程数据。
    • destroy_thread:取消线程并等待线程退出。
  • 资源释放:

    • 使用 pthread_cleanup_push 和 pthread_cleanup_pop 确保线程退出时释放资源。
  • 客户端消息处理:

    • 接收客户端消息并解析 flag。
    • 根据 flag 的值创建或销毁线程。

总结

  • 该方案实现了 Socket 服务端与线程管理的结合。
  • 通过 flag 动态创建和销毁线程,确保资源被正确释放。
  • 使用互斥锁保护全局变量,避免竞态条件。
  • 适用于需要动态管理线程的服务器场景。

讲一下互斥锁

互斥锁(Mutex,全称 Mutual Exclusion Lock) 是一种用于多线程编程的同步机制,用于保护共享资源,防止多个线程同时访问或修改这些资源,从而避免竞态条件(Race Condition)和数据不一致的问题。

  1. 互斥锁的作用

在多线程环境中,如果多个线程同时访问或修改共享资源(如全局变量、文件、内存等),可能会导致数据不一致或程序行为异常。互斥锁的作用是:

  • 确保同一时间只有一个线程可以访问共享资源。
  • 当一个线程持有锁时,其他线程必须等待锁释放后才能访问共享资源。
  1. 互斥锁的基本操作

在 POSIX 线程库(pthread)中,互斥锁的操作包括:

  • 初始化互斥锁:

    • 使用 pthread_mutex_init 初始化互斥锁。
    • 也可以使用静态初始化:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER。
  • 加锁:

    • 使用 pthread_mutex_lock 加锁。如果锁已被其他线程持有,则当前线程会阻塞,直到锁被释放。
  • 尝试加锁:

    • 使用 pthread_mutex_trylock 尝试加锁。如果锁已被其他线程持有,则立即返回错误,而不是阻塞。
  • 解锁:

    • 使用 pthread_mutex_unlock 释放锁,允许其他线程获取锁。
  • 销毁互斥锁:

    • 使用 pthread_mutex_destroy 销毁互斥锁,释放相关资源。
  1. 互斥锁的使用场景
  • 保护共享资源:

    • 例如,多个线程需要修改同一个全局变量时,可以使用互斥锁确保线程安全。
  • 同步线程执行:

    • 例如,确保某个操作在所有线程完成特定任务后再执行。
  1. 互斥锁的示例

以下是一个简单的示例,展示了如何使用互斥锁保护共享资源:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

// 共享资源
int counter = 0;

// 互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

// 线程函数
void* thread_func(void* arg) {
    for (int i = 0; i < 100000; i++) {
        // 加锁
        pthread_mutex_lock(&mutex);

        // 修改共享资源
        counter++;

        // 解锁
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

int main() {
    pthread_t thread1, thread2;

    // 创建两个线程
    pthread_create(&thread1, NULL, thread_func, NULL);
    pthread_create(&thread2, NULL, thread_func, NULL);

    // 等待线程结束
    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);

    // 打印最终结果
    printf("Counter value: %d\n", counter);

    // 销毁互斥锁
    pthread_mutex_destroy(&mutex);

    return 0;
}
  1. 代码解析
  • 共享资源:

    • counter 是一个全局变量,被两个线程同时修改。
  • 互斥锁保护:

    • 在修改 counter 之前,线程调用 pthread_mutex_lock 加锁。
    • 修改完成后,线程调用 pthread_mutex_unlock 解锁。
  • 线程安全:

    • 如果没有互斥锁,两个线程可能同时修改 counter,导致最终结果不正确。

    使用互斥锁后,counter 的最终值一定是 200000。

  1. 互斥锁的注意事项
  • 避免死锁:

    • 死锁是指多个线程互相等待对方释放锁,导致程序无法继续执行。
    • 例如,线程 A 持有锁 1 并请求锁 2,而线程 B 持有锁 2 并请求锁 1。
    • 解决方法:按固定顺序加锁,或使用超时机制。
  • 锁的粒度:

    • 锁的粒度不宜过大,否则会降低程序的并发性能。
    • 例如,如果锁保护了整个函数,而函数中有大量不涉及共享资源的代码,会导致其他线程长时间等待。
  • 锁的性能开销:

    • 加锁和解锁操作有一定的性能开销,尤其是在高并发场景下。

    如果共享资源的访问频率很高,可以考虑使用更高效的同步机制(如读写锁或无锁数据结构)。

  1. 互斥锁的变体
  • 读写锁(pthread_rwlock_t):

    • 允许多个线程同时读取共享资源,但只允许一个线程写入。
    • 适用于读多写少的场景。
  • 自旋锁(pthread_spinlock_t):

    • 在加锁时,线程会忙等待(busy-wait),而不是阻塞。
    • 适用于锁持有时间非常短的场景。
  • 条件变量(pthread_cond_t):

    • 用于线程间的条件同步,通常与互斥锁配合使用。
  1. 总结
  • 互斥锁是保护共享资源的基本工具,可以避免竞态条件和数据不一致。
  • 使用互斥锁时需要注意死锁、锁粒度和性能开销。
  • 在实际开发中,应根据具体场景选择合适的同步机制。

通过合理使用互斥锁,可以编写出高效且线程安全的并发程序。

前边示例为什么创建线程和销毁线程都要加锁

在前面的示例中,创建线程和销毁线程时加锁是为了解决**竞态条件(Race Condition)**问题,确保多个线程不会同时修改共享资源(如 worker_thread 和 thread_running)而导致数据不一致或程序行为异常。

为什么需要加锁?

1.1 共享资源的竞争

  • 在示例中,worker_thread 和 thread_running 是全局变量,被多个线程共享。
  • 如果没有锁保护,可能会出现以下问题:

    • 创建线程时:

      • 两个客户端同时发送 flag=1,导致两个线程同时调用 create_thread。
      • 如果没有锁,可能会创建多个线程,而 worker_thread 只能保存一个线程 ID,导致其他线程无法被正确管理。
    • 销毁线程时:

      • 一个线程正在销毁线程,而另一个线程尝试创建线程,可能导致 worker_thread 和 thread_running 的状态不一致。

1.2 数据一致性

  • 加锁可以确保对共享资源的修改是原子操作,即同一时间只有一个线程可以修改这些资源。
  • 例如,在 create_thread 中,加锁后可以确保:

    • 检查 thread_running 的状态。
    • 创建线程并更新 worker_thread 和 thread_running。
    • 这些操作不会被其他线程打断。

加锁的具体作用

2.1 创建线程时的加锁

在 create_thread 函数中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
void create_thread() {
    pthread_mutex_lock(&mutex); // 加锁
    if (!thread_running) {
        // 创建线程并更新共享资源
        ThreadData* data = (ThreadData*)malloc(sizeof(ThreadData));
        // ... 初始化 data
        int ret = pthread_create(&worker_thread, NULL, thread_func, data);
        // ... 错误处理
        thread_running = 1; // 更新状态
    }
    pthread_mutex_unlock(&mutex); // 解锁
}
  • 加锁:确保在检查 thread_running 和创建线程的过程中,不会被其他线程打断。
  • 解锁:操作完成后释放锁,允许其他线程访问共享资源。

2.2 销毁线程时的加锁

在 destroy_thread 函数中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
void destroy_thread() {
    pthread_mutex_lock(&mutex); // 加锁
    if (thread_running) {
        // 取消线程并更新共享资源
        int ret = pthread_cancel(worker_thread);
        // ... 错误处理
        ret = pthread_join(worker_thread, NULL);
        // ... 错误处理
        thread_running = 0; // 更新状态
    }
    pthread_mutex_unlock(&mutex); // 解锁
}
  • 加锁:确保在检查 thread_running 和销毁线程的过程中,不会被其他线程打断。
  • 解锁:操作完成后释放锁,允许其他线程访问共享资源。

不加锁的后果

如果不加锁,可能会出现以下问题:

3.1 创建多个线程

  • 两个客户端同时发送 flag=1,导致两个线程同时调用 create_thread。
  • 由于没有锁保护,两个线程可能同时检查 thread_running,发现其为 0,然后同时创建线程。
  • 最终会导致多个线程运行,而 worker_thread 只能保存一个线程 ID,其他线程无法被管理。

3.2 状态不一致

  • 一个线程正在销毁线程,而另一个线程尝试创建线程。
  • 由于没有锁保护,可能会导致 worker_thread 和 thread_running 的状态不一致,例如:

    • thread_running 被设置为 0,但 worker_thread 仍然指向一个无效的线程 ID。
    • 或者 thread_running 被设置为 1,但 worker_thread 尚未被正确赋值。

3.3 资源泄漏

  • 如果没有锁保护,可能会多次创建线程,导致资源(如动态内存)泄漏。
  • 例如,多个线程同时分配 ThreadData 结构体,但只有最后一个线程的 ThreadData 会被正确释放。

4. 加锁的最佳实践

  • 锁的粒度:

    • 锁的粒度不宜过大,否则会降低程序的并发性能。
    • 在示例中,锁只保护了必要的共享资源(worker_thread 和 thread_running)。
  • 避免死锁:

    • 确保加锁和解锁成对出现。
    • 在函数的所有退出路径(如 return 或错误处理)中都要解锁。
  • 锁的性能:

    • 加锁和解锁操作有一定的性能开销,尤其是在高并发场景下。
    • 如果共享资源的访问频率很高,可以考虑使用更高效的同步机制(如读写锁或无锁数据结构)。

总结

  • 加锁的目的是保护共享资源,避免竞态条件和数据不一致。
  • 在示例中,创建线程和销毁线程时加锁,是为了确保对 worker_thread 和 thread_running 的修改是原子操作。
  • 如果不加锁,可能会导致多个线程同时修改共享资源,从而引发数据不一致、资源泄漏或程序行为异常。

通过合理使用互斥锁,可以编写出线程安全的并发程序。