full duplex communication working
authorNils Forssén <forssennils@gmail.com>
Sat, 15 Jun 2024 22:00:23 +0000 (00:00 +0200)
committerNils Forssén <forssennils@gmail.com>
Sat, 15 Jun 2024 22:00:23 +0000 (00:00 +0200)
17 files changed:
build.sh
processes/CMakeLists.txt
processes/click_reader/CMakeLists.txt
processes/click_reader/main.cpp
processes/controller/CMakeLists.txt [new file with mode: 0755]
processes/controller/main.cpp [new file with mode: 0644]
processes/include/pipe_def.h [new file with mode: 0644]
processes/include/util.h [new file with mode: 0644]
processes/inter_proc/CMakeLists.txt [new file with mode: 0755]
processes/inter_proc/main.cpp [new file with mode: 0644]
processes/udp_server/CMakeLists.txt
processes/udp_server/include/mavlink_handler.h [new file with mode: 0644]
processes/udp_server/include/udp_server.h [new file with mode: 0644]
processes/udp_server/main.cpp
processes/udp_server/mavlink_handler.cpp [new file with mode: 0644]
processes/udp_server/udp_server.cpp [new file with mode: 0644]
run.sh

index 823eb898143685a62367291ef3c7a944e1a0cd68..ac8ad99bb8591398a9d583841e5a259a3fae6778 100755 (executable)
--- a/build.sh
+++ b/build.sh
@@ -2,4 +2,5 @@
 
 cd $(dirname "$0")/build
 cmake -G Ninja ..
+
 ninja
\ No newline at end of file
index 3e7fc10e715163bf709b23e9fd5a1bdc3e0f53a1..2da4dd508cf2cace1823908515182903e6720f20 100644 (file)
@@ -1,2 +1,4 @@
 add_subdirectory(udp_server)
-add_subdirectory(click_reader)
\ No newline at end of file
+add_subdirectory(click_reader)
+add_subdirectory(inter_proc)
+add_subdirectory(controller)
\ No newline at end of file
index 7ba0848d56e4c5bdd05fc62c04916099b7e5b828..3ab575d8a43d9be0f66b9e9502b8016378bc9d63 100644 (file)
@@ -4,4 +4,7 @@ add_executable(click_reader
 
 target_include_directories(click_reader PRIVATE include)
 
-target_link_libraries(click_reader PRIVATE MAVLink)
\ No newline at end of file
+target_link_libraries(click_reader PRIVATE MAVLink)
+target_link_libraries(click_reader PRIVATE spdlog::spdlog)
+
+target_include_directories(click_reader PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../include)
index 17edc53646383eab406aec0273eeb04dce1d03c7..55603631ab4c5b28177ffddf3b11cd78c5e3e018 100644 (file)
@@ -1,7 +1,62 @@
-#include <cstdio>
+// Standard
+#include <memory>
+#include <thread>
+
+// Pipes
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <unistd.h>
+
+// Libs
+#include <spdlog/spdlog.h>
+#include <spdlog/sinks/stdout_color_sinks.h>
+#include <common/mavlink.h>
+
+// Local
+#include "pipe_def.h"
+#include "util.h"
+
+
+static std::shared_ptr<spdlog::logger> logger;
 
 int main(int argc, char* argv[])
 {
-    printf("Hello, World!\n");
+    logger = spdlog::stdout_color_mt("click_reader");
+    logger->set_level(CLICK_LOGLEVEL);
+
+    logger->debug("Opening sendpipe for writing");
+    pipe_fd_t sendpipe_fd = open(SEND_PIPE, O_WRONLY);
+
+    mavlink_message_t message;
+    const uint8_t system_id = 42;
+    const uint8_t base_mode = 0;
+    const uint8_t custom_mode = 0;
+    mavlink_msg_global_position_int_pack_chan(
+        system_id,
+        MAV_COMP_ID_PERIPHERAL,
+        MAVLINK_COMM_0,
+        &message,
+        1111111,
+        2222222,
+        3333333,
+        4444444,
+        5555555,
+        6666666,
+        7777777,
+        8888888,
+        9999999);
+
+    uint8_t buffer[MAVLINK_MAX_PACKET_LEN];
+    const int len = mavlink_msg_to_send_buffer(buffer, &message);
+
+    while(true)
+    {
+        write(sendpipe_fd, buffer, len);
+        THREAD_SLEEP(3s);
+    }
+
+    logger->error("here");
+    close(sendpipe_fd);
     return 0;
 }
\ No newline at end of file
diff --git a/processes/controller/CMakeLists.txt b/processes/controller/CMakeLists.txt
new file mode 100755 (executable)
index 0000000..1bd5bd9
--- /dev/null
@@ -0,0 +1,10 @@
+add_executable(controller
+    main.cpp
+)
+
+target_include_directories(controller PRIVATE include)
+
+target_link_libraries(controller PRIVATE MAVLink)
+target_link_libraries(controller PRIVATE spdlog::spdlog)
+
+target_include_directories(controller PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../include)
diff --git a/processes/controller/main.cpp b/processes/controller/main.cpp
new file mode 100644 (file)
index 0000000..24d5e5d
--- /dev/null
@@ -0,0 +1,53 @@
+// Standard
+#include <memory>
+#include <thread>
+
+// Pipes
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <unistd.h>
+
+// Libs
+#include <spdlog/spdlog.h>
+#include <spdlog/sinks/stdout_color_sinks.h>
+#include <common/mavlink.h>
+
+// Local
+#include "pipe_def.h"
+#include "util.h"
+
+static std::shared_ptr<spdlog::logger> logger;
+
+int main(int argc, char* argv[])
+{
+    logger = spdlog::stdout_color_mt("controller");
+    logger->set_level(CONTROLLER_LOGLEVEL);
+/*
+    logger->debug("Opening sendpipe for writing");
+    pipe_fd_t sendpipe_fd = open(SEND_PIPE, O_WRONLY);
+*/
+    logger->debug("Opening recvpipe for reading");
+    pipe_fd_t recvpipe_fd = open(RECV_PIPE, O_RDONLY);
+
+    uint8_t buffer[MAVLINK_MAX_PACKET_LEN];
+    while(true)
+    {
+        int ret = read(recvpipe_fd, buffer, MAVLINK_MAX_PACKET_LEN);
+
+        if (ret == 0) {
+            logger->error("No writing process on recvpipe. Exiting.");
+            break;
+        }
+        else if (ret < 0)
+            logger->error("Error while reading message. {}", strerror(errno));
+        else {
+            // handle mavlink message
+        }
+    }
+
+    logger->error("here");
+    //close(sendpipe_fd);
+    close(recvpipe_fd);
+    return 0;
+}
\ No newline at end of file
diff --git a/processes/include/pipe_def.h b/processes/include/pipe_def.h
new file mode 100644 (file)
index 0000000..23cca7b
--- /dev/null
@@ -0,0 +1,13 @@
+#ifndef PIPE_DEF_H
+#define PIPE_DEF_H
+
+#define UDP_LOGLEVEL spdlog::level::debug
+#define CLICK_LOGLEVEL spdlog::level::debug
+#define CONTROLLER_LOGLEVEL spdlog::level::debug
+
+#define RECV_PIPE "/var/lock/recv_pipe"
+#define SEND_PIPE "/var/lock/send_pipe"
+
+typedef int pipe_fd_t;
+
+#endif
\ No newline at end of file
diff --git a/processes/include/util.h b/processes/include/util.h
new file mode 100644 (file)
index 0000000..c9c85f3
--- /dev/null
@@ -0,0 +1,15 @@
+#ifndef UTIL_H
+#define UTIL_H
+
+#include <chrono>
+#include <thread>
+
+#define THREAD_SLEEP(time) std::this_thread::sleep_for(time)
+using namespace std::chrono_literals;
+
+typedef enum {
+    ERR = -1,
+    OK = 0
+} err_t;
+
+#endif
\ No newline at end of file
diff --git a/processes/inter_proc/CMakeLists.txt b/processes/inter_proc/CMakeLists.txt
new file mode 100755 (executable)
index 0000000..6e9965d
--- /dev/null
@@ -0,0 +1,11 @@
+add_executable(inter_proc
+    main.cpp
+)
+
+target_include_directories(inter_proc PRIVATE include)
+
+target_link_libraries(inter_proc PRIVATE spdlog::spdlog)
+
+target_include_directories(inter_proc PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../include)
+
+
diff --git a/processes/inter_proc/main.cpp b/processes/inter_proc/main.cpp
new file mode 100644 (file)
index 0000000..a81bdfb
--- /dev/null
@@ -0,0 +1,32 @@
+#include <memory>
+
+#include <spdlog/spdlog.h>
+#include <spdlog/sinks/stdout_color_sinks.h>
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+
+#include "pipe_def.h"
+
+static std::shared_ptr<spdlog::logger> logger;
+
+int main(int argc, char* argv[])
+{
+    logger = spdlog::stdout_color_mt("inter_proc");
+
+    unlink(RECV_PIPE);
+    unlink(SEND_PIPE);
+
+    if (mkfifo(RECV_PIPE, 0666) != 0) {
+        logger->warn("mkfifo warning (RECV_PIPE): {}", strerror(errno));
+    }
+
+    if (mkfifo(SEND_PIPE, 0666) != 0) {
+        logger->warn("mkfifo warning (SEND_PIPE): {}", strerror(errno));
+    }
+
+    logger->info("Pipes created");
+    return 0;
+}
index 070e2cb08af82a395f7b3bfd2f5ab7d8867417c1..6060c8b4ba46b5d35aadf22c6f58cc3f30add6f1 100755 (executable)
@@ -1,8 +1,12 @@
-add_executable(upd_server
+add_executable(udp_server
     main.cpp
+    udp_server.cpp
+    mavlink_handler.cpp
 )
 
-target_include_directories(upd_server PRIVATE include)
+target_include_directories(udp_server PRIVATE include)
 
-target_link_libraries(upd_server PRIVATE MAVLink)
-target_link_libraries(upd_server PRIVATE spdlog::spdlog)
+target_link_libraries(udp_server PRIVATE MAVLink)
+target_link_libraries(udp_server PRIVATE spdlog::spdlog)
+
+target_include_directories(udp_server PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../include)
diff --git a/processes/udp_server/include/mavlink_handler.h b/processes/udp_server/include/mavlink_handler.h
new file mode 100644 (file)
index 0000000..7f73f3c
--- /dev/null
@@ -0,0 +1,15 @@
+#ifndef MAVLINK_HANDLER_H
+#define MAVLINK_HANDLER_H
+
+// Standard
+#include <memory>
+
+// Libs
+#include <spdlog/spdlog.h>
+#include <common/mavlink.h>
+
+void init_mavlink_handler(std::shared_ptr<spdlog::logger> logg);
+
+void handle_heartbeat(const mavlink_message_t* message);
+
+#endif
\ No newline at end of file
diff --git a/processes/udp_server/include/udp_server.h b/processes/udp_server/include/udp_server.h
new file mode 100644 (file)
index 0000000..65830ee
--- /dev/null
@@ -0,0 +1,42 @@
+#ifndef UDP_SERVER_H
+#define UDP_SERVER_H
+
+// Standard
+#include <memory>
+#include <thread>
+
+// Socket
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+
+// Pipes
+#include <unistd.h>
+
+// Libs
+#include <spdlog/spdlog.h>
+#include <common/mavlink.h>
+
+// Local
+#include "util.h"
+#include "mavlink_handler.h"
+#include "pipe_def.h"
+#include "mavlink_handler.h"
+
+#define UDP_PORT 14552
+#define UDP_RECV_BUF_SIZE 2048 // enough for MTU 1500 bytes
+
+typedef int socket_fd_t;
+
+typedef struct {
+    socket_fd_t socket_fd;
+    struct sockaddr_in addr;
+    socklen_t addr_len;
+    bool addr_set;
+} gcs_t;
+
+err_t udp_server_init(std::shared_ptr<spdlog::logger> logg);
+
+err_t udp_server_run(pipe_fd_t recvpipe_fd, pipe_fd_t sendpipe_fd);
+
+#endif
\ No newline at end of file
index 032e04cc0da10b69394c803ddda504f1e8f5af6b..8f39fb4952d77d4d9099ca37424b98810c77d5c0 100755 (executable)
-// Simple example receiving and sending MAVLink v2 over UDP
-// based on POSIX APIs (e.g. Linux, BSD, macOS).
+// Standard
+#include <memory>
+#include <thread>
 
-#include <errno.h>
-#include <stdio.h>
-#include <string.h>
-#include <stdbool.h>
-#include <stdint.h>
-
-#include <arpa/inet.h>
-#include <netinet/in.h>
-#include <sys/socket.h>
-#include <time.h>
+// Pipes
+#include <fcntl.h>
+#include <unistd.h>
 
+// Libs
 #include <spdlog/spdlog.h>
-
+#include <spdlog/sinks/stdout_color_sinks.h>
 #include <common/mavlink.h>
 
+// Local
+#include "pipe_def.h"
+#include "util.h"
+#include "udp_server.h"
 
-void receive_some(int socket_fd, struct sockaddr_in* src_addr, socklen_t* src_addr_len, bool* src_addr_set);
-void handle_heartbeat(const mavlink_message_t* message);
-
-void send_some(int socket_fd, const struct sockaddr_in* src_addr, socklen_t src_addr_len);
-void send_heartbeat(int socket_fd, const struct sockaddr_in* src_addr, socklen_t src_addr_len);
+static std::shared_ptr<spdlog::logger> logger;
 
+void receive_thread(gcs_t* src);
+void handle_heartbeat(const mavlink_message_t* message);
+void send_some(gcs_t* src);
+void send_heartbeat(gcs_t* src);
 
+// Main
 int main(int argc, char* argv[])
 {
-    // Open UDP socket
-    const int socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
-
-    if (socket_fd < 0) {
-        printf("socket error: %s\n", strerror(errno));
-        return -1;
-    }
-
-    // Bind to port
-    struct sockaddr_in addr = {};
-    memset(&addr, 0, sizeof(addr));
-    addr.sin_family = AF_INET;
-    inet_pton(AF_INET, "0.0.0.0", &(addr.sin_addr)); // listen on all network interfaces
-    addr.sin_port = htons(14552); // default port on the ground
+    logger = spdlog::stdout_color_mt("udp_server");
+    logger->set_level(UDP_LOGLEVEL);
 
-    if (bind(socket_fd, (struct sockaddr*)(&addr), sizeof(addr)) != 0) {
-        printf("bind error: %s\n", strerror(errno));
-        return -2;
-    }
+    udp_server_init(logger);
 
-    // We set a timeout at 100ms to prevent being stuck in recvfrom for too
-    // long and missing our chance to send some stuff.
-    struct timeval tv;
-    tv.tv_sec = 0;
-    tv.tv_usec = 100000;
-    if (setsockopt(socket_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) {
-        printf("setsockopt error: %s\n", strerror(errno));
-        return -3;
-    }
+    logger->debug("Opening pipes, waiting for writing process to start");
+    pipe_fd_t sendpipe_fd = open(SEND_PIPE, O_RDONLY);
+    pipe_fd_t recvpipe_fd = open(RECV_PIPE, O_WRONLY);
+    
+    logger->debug("Pipes opened");
 
-    struct sockaddr_in src_addr = {};
-    socklen_t src_addr_len = sizeof(src_addr);
-    bool src_addr_set = false;
+    // Server mainloop, only exits on error
+    udp_server_run(recvpipe_fd, sendpipe_fd);
 
-    while (true) {
-        // For illustration purposes we don't bother with threads or async here
-        // and just interleave receiving and sending.
-        // This only works  if receive_some returns every now and then.
-        receive_some(socket_fd, &src_addr, &src_addr_len, &src_addr_set);
+    logger->debug("Closing sendpipe");
+    close(sendpipe_fd);
 
-        if (src_addr_set) {
-            send_some(socket_fd, &src_addr, src_addr_len);
-            printf("sent some\n");
-        }
-    }
+    logger->debug("Closing recvpipe");
+    close(recvpipe_fd);
 
-    return 0;
+    return -1;
 }
 
-void receive_some(int socket_fd, struct sockaddr_in* src_addr, socklen_t* src_addr_len, bool* src_addr_set)
-{
-    // We just receive one UDP datagram and then return again.
-    char buffer[2048]; // enough for MTU 1500 bytes
-
-    const int ret = recvfrom(
-            socket_fd, buffer, sizeof(buffer), 0, (struct sockaddr*)(src_addr), src_addr_len);
-
-    if (ret < 0) {
-        printf("recvfrom error: %s\n", strerror(errno));
-    } else if (ret == 0) {
-        // peer has done an orderly shutdown
-        return;
-    }
-
-    *src_addr_set = true;
-
-    mavlink_message_t message;
-    mavlink_status_t status;
-    for (int i = 0; i < ret; ++i) {
-        if (mavlink_parse_char(MAVLINK_COMM_0, buffer[i], &message, &status) == 1) {
-
-            // printf(
-            //     "Received message %d from %d/%d\n",
-            //     message.msgid, message.sysid, message.compid);
-
-            switch (message.msgid) {
-            case MAVLINK_MSG_ID_HEARTBEAT:
-                handle_heartbeat(&message);
-                break;
-            default:
-                printf("Received message %d from %d/%d\n", message.msgid, message.sysid, message.compid);
-                break;
-            }
-        }
-    }
-}
-
-void handle_heartbeat(const mavlink_message_t* message)
-{
-    mavlink_heartbeat_t heartbeat;
-    mavlink_msg_heartbeat_decode(message, &heartbeat);
-
-    printf("Got heartbeat from ");
-    switch (heartbeat.autopilot) {
-        case MAV_AUTOPILOT_GENERIC:
-            printf("generic");
-            break;
-        case MAV_AUTOPILOT_ARDUPILOTMEGA:
-            printf("ArduPilot");
-            break;
-        case MAV_AUTOPILOT_PX4:
-            printf("PX4");
-            break;
-        default:
-            printf("other");
-            break;
-    }
-    printf(" autopilot\n");
-}
-
-void send_some(int socket_fd, const struct sockaddr_in* src_addr, socklen_t src_addr_len)
-{
-    // Whenever a second has passed, we send a heartbeat.
-    static time_t last_time = 0;
-    time_t current_time = time(NULL);
-    if (current_time - last_time >= 1) {
-        send_heartbeat(socket_fd, src_addr, src_addr_len);
-        last_time = current_time;
-    }
-}
-
-void send_heartbeat(int socket_fd, const struct sockaddr_in* src_addr, socklen_t src_addr_len)
-{
-    mavlink_message_t message;
-
-    const uint8_t system_id = 42;
-    const uint8_t base_mode = 0;
-    const uint8_t custom_mode = 0;
-    mavlink_msg_heartbeat_pack_chan(
-        system_id,
-        MAV_COMP_ID_PERIPHERAL,
-        MAVLINK_COMM_0,
-        &message,
-        MAV_TYPE_GENERIC,
-        MAV_AUTOPILOT_GENERIC,
-        base_mode,
-        custom_mode,
-        MAV_STATE_STANDBY);
-
-    uint8_t buffer[MAVLINK_MAX_PACKET_LEN];
-    const int len = mavlink_msg_to_send_buffer(buffer, &message);
-
-    int ret = sendto(socket_fd, buffer, len, 0, (const struct sockaddr*)src_addr, src_addr_len);
-    if (ret != len) {
-        printf("sendto error: %s\n", strerror(errno));
-    } else {
-        printf("Sent heartbeat\n");
-    }
-}
\ No newline at end of file
diff --git a/processes/udp_server/mavlink_handler.cpp b/processes/udp_server/mavlink_handler.cpp
new file mode 100644 (file)
index 0000000..626d9a5
--- /dev/null
@@ -0,0 +1,27 @@
+#include "mavlink_handler.h"
+
+// Libs
+#include <memory>
+#include <spdlog/spdlog.h>
+
+static std::shared_ptr<spdlog::logger> logger;
+
+void init_mavlink_handler(std::shared_ptr<spdlog::logger> logg)
+{
+    logger = logg;
+}
+
+void handle_heartbeat(const mavlink_message_t* message)
+{
+    mavlink_heartbeat_t heartbeat;
+    mavlink_msg_heartbeat_decode(message, &heartbeat);
+
+    switch (heartbeat.type) {
+        case MAV_TYPE_GCS:
+            logger->debug("Heartbeat from GCS!");
+            break;
+        default:
+            logger->warn("Heartbeat from unknown type: {}", heartbeat.type);
+            break;
+    }
+}
\ No newline at end of file
diff --git a/processes/udp_server/udp_server.cpp b/processes/udp_server/udp_server.cpp
new file mode 100644 (file)
index 0000000..4e1ade5
--- /dev/null
@@ -0,0 +1,188 @@
+#include "udp_server.h"
+
+static std::shared_ptr<spdlog::logger> logger;
+
+static bool recv_thread_running = true;
+static gcs_t gcs = {};
+
+// Declarations
+static void recv_thread(pipe_fd_t sendpipe_fd);
+static void heartbeat_thread();
+static void handle_mavlink(mavlink_message_t& message);
+
+// Definitions
+
+static void recv_thread(pipe_fd_t recvpipe_fd)
+{
+    char buffer[UDP_RECV_BUF_SIZE]; // enough for MTU 1500 bytes
+
+    while (recv_thread_running)
+    {
+        int ret = recvfrom(
+                gcs.socket_fd, buffer, sizeof(buffer), 0, (struct sockaddr*)(&(gcs.addr)), &(gcs.addr_len));
+
+        if (ret < 0) {
+            logger->error("recvfrom error: {}", strerror(errno));
+            break;
+        } else if (ret == 0) {
+            // peer has done an orderly shutdown
+            logger->info("Peer has done an orderly shutdown");
+            break;
+        }
+
+        gcs.addr_set = true;
+
+        // Handle the received MAVLink message here
+        mavlink_message_t message;
+        mavlink_status_t status;
+        for (int i = 0; i < ret; ++i) {
+            if (mavlink_parse_char(MAVLINK_COMM_0, buffer[i], &message, &status) == 1) {
+                handle_mavlink(message);
+            }
+        }
+        logger->debug("Socket -> recvpipe: {} bytes", ret);
+        ret = write(recvpipe_fd, buffer, ret);
+        if (ret < 0) {
+            logger->error("Error while writing message. {}", strerror(errno));
+        }
+    }
+}
+
+static void heartbeat_thread()
+{
+
+    mavlink_message_t message;
+
+    const uint8_t system_id = 42;
+    const uint8_t base_mode = 0;
+    const uint8_t custom_mode = 0;
+
+    mavlink_msg_heartbeat_pack_chan(
+        system_id,
+        MAV_COMP_ID_PERIPHERAL,
+        MAVLINK_COMM_0,
+        &message,
+        MAV_TYPE_GENERIC,
+        MAV_AUTOPILOT_GENERIC,
+        base_mode,
+        custom_mode,
+        MAV_STATE_STANDBY);
+
+    uint8_t buffer[MAVLINK_MAX_PACKET_LEN];
+    const int len = mavlink_msg_to_send_buffer(buffer, &message);
+
+    while (true) {
+
+        int ret = sendto(gcs.socket_fd, buffer, len, 0, (struct sockaddr*)(&(gcs.addr)), gcs.addr_len);
+        if (ret != len) {
+            logger->error("Heartbeat sendto error: {}", strerror(errno));
+        } else {
+            logger->debug("Sent Heartbeat!");
+        }
+
+        THREAD_SLEEP(1s);
+    }
+}
+
+static void handle_mavlink(mavlink_message_t& message)
+{
+    switch (message.msgid) {
+    case MAVLINK_MSG_ID_HEARTBEAT:
+        handle_heartbeat(&message);
+        break;
+    default:
+        logger->debug("Received message {} from {}/{}", (int) message.msgid, message.sysid, message.compid);
+        break;
+    }
+}
+
+err_t udp_server_init(std::shared_ptr<spdlog::logger> logg)
+{
+    logger = logg;
+
+    init_mavlink_handler(logger);
+
+    socket_fd_t socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
+
+    if (socket_fd < 0) {
+        logger->error("socket error: {}", strerror(errno));
+        return ERR;
+    }
+
+    // Bind to port
+    struct sockaddr_in addr = {};
+    memset(&addr, 0, sizeof(addr));
+    addr.sin_family = AF_INET;
+    inet_pton(AF_INET, "0.0.0.0", &(addr.sin_addr)); // listen on all network interfaces
+    addr.sin_port = htons(UDP_PORT); // default port on the ground
+
+    if (bind(socket_fd, (struct sockaddr*)(&addr), sizeof(addr)) != 0) {
+        logger->error("bind error: {}", strerror(errno));
+        return ERR;
+    }
+
+    logger->info("UDP server started!", UDP_PORT);
+
+    // Set up GCS
+    gcs = {
+        .socket_fd = socket_fd,
+        .addr = addr, 
+        .addr_len = sizeof(addr),
+        .addr_set = false
+    };
+
+    return OK;
+}
+
+err_t udp_server_run(pipe_fd_t recvpipe_fd, pipe_fd_t sendpipe_fd)
+{
+    std::thread r_thread(recv_thread, recvpipe_fd);
+
+    while (!(gcs.addr_set)) {
+        logger->info("Waiting for GCS to connect...");
+        THREAD_SLEEP(1s);
+    }
+    
+    logger->info("Connected to GCS!");
+
+    std::thread h_thread(heartbeat_thread);
+
+    int ret = 0;
+    uint8_t buffer[MAVLINK_MAX_PACKET_LEN];
+    mavlink_message_t message;
+    mavlink_status_t status;
+
+    while (true) {
+        ret = read(sendpipe_fd, &buffer, sizeof(buffer));
+
+        if (ret == 0) {
+            logger->error("No writing process on sendpipe. Exiting.");
+            break;
+        }
+        else if (ret < 0)
+            logger->error("Error while reading message. {}", strerror(errno));
+        else {
+            /*
+            for (int i = 0; i < ret; ++i) {
+                if (mavlink_parse_char(MAVLINK_COMM_0, buffer[i], &message, &status) == 1) {
+                    logger->debug("sendpipe -> socket: mavlink message id={},", (int) message.msgid);
+                }
+            }
+            */
+            logger->debug("sendpipe -> socket: {} bytes", ret);
+            ret = sendto(gcs.socket_fd, buffer, ret, 0, (struct sockaddr*)(&(gcs.addr)), gcs.addr_len);
+            if (ret < 0) {
+                logger->error("Sending mavlink message to GCS sendto error {}", strerror(errno));
+            }
+
+        }
+    }
+
+    logger->debug("Closing recv_thread");
+    recv_thread_running = false;
+
+    logger->debug("Closing socket");
+    close(gcs.socket_fd);
+
+    return ERR;
+}
\ No newline at end of file
diff --git a/run.sh b/run.sh
index fee9765f862a861d3326b4a7a84fe398a70f07e8..87b94da3aea06b418c5b15b8cb87d521526c2018 100755 (executable)
--- a/run.sh
+++ b/run.sh
@@ -1,4 +1,20 @@
 #!/usr/bin/bash
 
 cd $(dirname "$0")/bin
-./udp_server
\ No newline at end of file
+
+# create pipes for inter processes communication
+./inter_proc
+
+# start udp server
+./udp_server &
+
+# start controller
+./controller &
+
+# start click reader
+./click_reader
+
+echo "Closing all processes"
+
+trap 'kill $(jobs -p)' EXIT
+