Skip to content

Commit 66f5b57

Browse files
committed
feat(supervisor/posix): run non-detached
Signed-off-by: Marco Casaroli <marco.casaroli@gmail.com>
1 parent cab039d commit 66f5b57

4 files changed

Lines changed: 210 additions & 3 deletions

File tree

src/samples/supervisor/posix/server/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ add_executable(ocred
1212
ocred.c
1313
handlers.c
1414
waiters.c
15+
runners.c
1516
../zcbor_helpers.c
1617
../zcbor/src/zcbor_encode.c
1718
../zcbor/src/zcbor_decode.c

src/samples/supervisor/posix/server/handlers.c

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525

2626
#include "../ipc.h"
2727
#include "../zcbor_helpers.h"
28-
#include "../server/waiters.h"
28+
#include "ocre/container.h"
29+
#include "waiters.h"
30+
#include "runners.h"
2931

3032
#define DEFAULT_PID_FILE "ocre.pid"
3133

@@ -339,7 +341,7 @@ static int handle_context_get_working_directory(struct ocre_context *ctx, zcbor_
339341
return enc_state->payload - tx_buf;
340342
}
341343

342-
static int handle_container_start(struct ocre_context *ctx, zcbor_state_t *dec_state, uint8_t *tx_buf,
344+
static int handle_container_start(struct ocre_context *ctx, int socket, zcbor_state_t *dec_state, uint8_t *tx_buf,
343345
size_t tx_buf_size)
344346
{
345347
char id[STRING_BUFFER_SIZE];
@@ -358,6 +360,13 @@ static int handle_container_start(struct ocre_context *ctx, zcbor_state_t *dec_s
358360
return -1;
359361
}
360362

363+
/* If container is not detached, we need to dispatch it to the runners */
364+
if (!ocre_container_is_detached(container)) {
365+
return container_runner_dispatch(container, socket);
366+
}
367+
368+
/* Otherwise we handle here */
369+
361370
/* Call the actual function */
362371
int result = ocre_container_start(container);
363372

@@ -680,7 +689,7 @@ int process_request(struct ocre_context *ctx, int socket, uint8_t *rx_buf, int r
680689
break;
681690

682691
case OCRE_IPC_CONTAINER_START_REQ:
683-
response_len = handle_container_start(ctx, decoding_state, tx_buf, tx_buf_size);
692+
response_len = handle_container_start(ctx, socket, decoding_state, tx_buf, tx_buf_size);
684693
break;
685694

686695
case OCRE_IPC_CONTAINER_GET_STATUS_REQ:
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/**
2+
* @copyright Copyright (c) contributors to Project Ocre,
3+
* which has been established as Project Ocre a Series of LF Projects, LLC
4+
*
5+
* SPDX-License-Identifier: Apache-2.0
6+
*/
7+
8+
#include <pthread.h>
9+
#include <stdlib.h>
10+
#include <stdint.h>
11+
#include <stdbool.h>
12+
#include <unistd.h>
13+
#include <stdio.h>
14+
#include <sys/socket.h>
15+
16+
#include <ocre/ocre.h>
17+
#include <uthash/utlist.h>
18+
#include <zcbor_encode.h>
19+
20+
#include "../ipc.h"
21+
#include "ocre/container.h"
22+
23+
#define TX_BUFFER_SIZE 32
24+
#define RX_BUFFER_SIZE 32
25+
26+
struct runner {
27+
pthread_mutex_t mutex;
28+
int socket;
29+
bool finished;
30+
struct ocre_container *container;
31+
pthread_t run_thread;
32+
pthread_t socket_thread;
33+
};
34+
35+
static void *run_thread(void *arg)
36+
{
37+
uint8_t tx_buf[TX_BUFFER_SIZE];
38+
struct runner *runner = (struct runner *)arg;
39+
40+
/* Call the actual function */
41+
int result = ocre_container_start(runner->container);
42+
43+
pthread_mutex_lock(&runner->mutex);
44+
45+
if (runner->finished) {
46+
pthread_mutex_unlock(&runner->mutex);
47+
return NULL;
48+
}
49+
50+
pthread_mutex_unlock(&runner->mutex);
51+
52+
/* Encode response */
53+
ZCBOR_STATE_E(enc_state, 0, tx_buf, TX_BUFFER_SIZE, 0);
54+
55+
bool success = zcbor_uint32_put(enc_state, OCRE_IPC_CONTAINER_START_RSP);
56+
if (!success) {
57+
fprintf(stderr, "Encoding response failed\n");
58+
return NULL;
59+
}
60+
61+
success = zcbor_int32_put(enc_state, result);
62+
if (!success) {
63+
fprintf(stderr, "Encoding result failed\n");
64+
return NULL;
65+
}
66+
67+
int response_len = enc_state->payload - tx_buf;
68+
69+
if (send(runner->socket, tx_buf, response_len, 0) < 0) {
70+
perror("send");
71+
}
72+
73+
if (close(runner->socket) < 0) {
74+
perror("close");
75+
}
76+
77+
pthread_mutex_lock(&runner->mutex);
78+
79+
runner->finished = true;
80+
81+
pthread_mutex_unlock(&runner->mutex);
82+
83+
/* Wait for the socket thread to finish */
84+
pthread_join(runner->socket_thread, NULL);
85+
86+
return NULL;
87+
}
88+
89+
static void *socket_thread(void *arg)
90+
{
91+
struct runner *runner = (struct runner *)arg;
92+
uint8_t rx_buf[RX_BUFFER_SIZE];
93+
94+
while (true) {
95+
pthread_mutex_lock(&runner->mutex);
96+
97+
if (runner->finished) {
98+
pthread_mutex_unlock(&runner->mutex);
99+
return NULL;
100+
};
101+
102+
pthread_mutex_unlock(&runner->mutex);
103+
104+
ssize_t n = recv(runner->socket, rx_buf, sizeof(rx_buf), 0);
105+
106+
if (n <= 0) {
107+
if (n < 0) {
108+
perror("recv");
109+
} else {
110+
fprintf(stderr, "Client disconnected\n");
111+
}
112+
113+
pthread_mutex_lock(&runner->mutex);
114+
115+
runner->finished = true;
116+
117+
close(runner->socket);
118+
pthread_mutex_unlock(&runner->mutex);
119+
120+
ocre_container_kill(runner->container);
121+
122+
return NULL;
123+
}
124+
}
125+
126+
return NULL;
127+
}
128+
129+
int container_runner_dispatch(struct ocre_container *container, int socket)
130+
{
131+
struct runner *runner = NULL;
132+
133+
runner = malloc(sizeof(struct runner));
134+
if (!runner) {
135+
fprintf(stderr, "Failed to allocate memory for runner\n");
136+
return -1;
137+
}
138+
139+
memset(runner, 0, sizeof(struct runner));
140+
141+
runner->container = container;
142+
runner->socket = socket;
143+
144+
int rc = pthread_mutex_init(&runner->mutex, NULL);
145+
if (rc) {
146+
fprintf(stderr, "Failed to initialize mutex: rc=%d", rc);
147+
goto error_runner;
148+
}
149+
150+
rc = pthread_create(&runner->socket_thread, NULL, socket_thread, runner);
151+
if (rc) {
152+
fprintf(stderr, "Failed to create socket thread: rc=%d", rc);
153+
goto error_mutex;
154+
}
155+
156+
rc = pthread_create(&runner->run_thread, NULL, run_thread, runner);
157+
if (rc) {
158+
fprintf(stderr, "Failed to create runner thread: rc=%d", rc);
159+
goto error_thread;
160+
}
161+
162+
return 0;
163+
164+
error_thread:
165+
rc = pthread_mutex_lock(&runner->mutex);
166+
if (rc) {
167+
fprintf(stderr, "Failed to lock mutex: rc=%d", rc);
168+
}
169+
170+
runner->finished = true;
171+
172+
rc = pthread_mutex_unlock(&runner->mutex);
173+
if (rc) {
174+
fprintf(stderr, "Failed to unlock mutex: rc=%d", rc);
175+
}
176+
177+
pthread_join(runner->socket_thread, NULL);
178+
179+
error_mutex:
180+
pthread_mutex_destroy(&runner->mutex);
181+
182+
error_runner:
183+
free(runner);
184+
return -1;
185+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/**
2+
* @copyright Copyright (c) contributors to Project Ocre,
3+
* which has been established as Project Ocre a Series of LF Projects, LLC
4+
*
5+
* SPDX-License-Identifier: Apache-2.0
6+
*/
7+
8+
#include <ocre/ocre.h>
9+
10+
struct waiter;
11+
12+
int container_runner_dispatch(struct ocre_container *container, int socket);

0 commit comments

Comments
 (0)