From: Nils Forssén Date: Sat, 15 Jun 2024 22:00:23 +0000 (+0200) Subject: full duplex communication working X-Git-Url: https://gitweb.forssennils.se/?a=commitdiff_plain;h=0bdd8d7a6bc877ccf8b4d6c520cbf0c065f67886;p=flygplan.git full duplex communication working --- diff --git a/build.sh b/build.sh index 823eb89..ac8ad99 100755 --- a/build.sh +++ b/build.sh @@ -2,4 +2,5 @@ cd $(dirname "$0")/build cmake -G Ninja .. + ninja \ No newline at end of file diff --git a/processes/CMakeLists.txt b/processes/CMakeLists.txt index 3e7fc10..2da4dd5 100644 --- a/processes/CMakeLists.txt +++ b/processes/CMakeLists.txt @@ -1,2 +1,4 @@ add_subdirectory(udp_server) -add_subdirectory(click_reader) \ No newline at end of file +add_subdirectory(click_reader) +add_subdirectory(inter_proc) +add_subdirectory(controller) \ No newline at end of file diff --git a/processes/click_reader/CMakeLists.txt b/processes/click_reader/CMakeLists.txt index 7ba0848..3ab575d 100644 --- a/processes/click_reader/CMakeLists.txt +++ b/processes/click_reader/CMakeLists.txt @@ -4,4 +4,7 @@ add_executable(click_reader target_include_directories(click_reader PRIVATE include) -target_link_libraries(click_reader PRIVATE MAVLink) \ No newline at end of file +target_link_libraries(click_reader PRIVATE MAVLink) +target_link_libraries(click_reader PRIVATE spdlog::spdlog) + +target_include_directories(click_reader PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../include) diff --git a/processes/click_reader/main.cpp b/processes/click_reader/main.cpp index 17edc53..5560363 100644 --- a/processes/click_reader/main.cpp +++ b/processes/click_reader/main.cpp @@ -1,7 +1,62 @@ -#include +// Standard +#include +#include + +// Pipes +#include +#include +#include +#include + +// Libs +#include +#include +#include + +// Local +#include "pipe_def.h" +#include "util.h" + + +static std::shared_ptr logger; int main(int argc, char* argv[]) { - printf("Hello, World!\n"); + logger = spdlog::stdout_color_mt("click_reader"); + logger->set_level(CLICK_LOGLEVEL); + + logger->debug("Opening sendpipe for writing"); + pipe_fd_t sendpipe_fd = open(SEND_PIPE, O_WRONLY); + + mavlink_message_t message; + const uint8_t system_id = 42; + const uint8_t base_mode = 0; + const uint8_t custom_mode = 0; + mavlink_msg_global_position_int_pack_chan( + system_id, + MAV_COMP_ID_PERIPHERAL, + MAVLINK_COMM_0, + &message, + 1111111, + 2222222, + 3333333, + 4444444, + 5555555, + 6666666, + 7777777, + 8888888, + 9999999); + + uint8_t buffer[MAVLINK_MAX_PACKET_LEN]; + const int len = mavlink_msg_to_send_buffer(buffer, &message); + + while(true) + { + write(sendpipe_fd, buffer, len); + THREAD_SLEEP(3s); + } + + logger->error("here"); + close(sendpipe_fd); return 0; } \ No newline at end of file diff --git a/processes/controller/CMakeLists.txt b/processes/controller/CMakeLists.txt new file mode 100755 index 0000000..1bd5bd9 --- /dev/null +++ b/processes/controller/CMakeLists.txt @@ -0,0 +1,10 @@ +add_executable(controller + main.cpp +) + +target_include_directories(controller PRIVATE include) + +target_link_libraries(controller PRIVATE MAVLink) +target_link_libraries(controller PRIVATE spdlog::spdlog) + +target_include_directories(controller PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../include) diff --git a/processes/controller/main.cpp b/processes/controller/main.cpp new file mode 100644 index 0000000..24d5e5d --- /dev/null +++ b/processes/controller/main.cpp @@ -0,0 +1,53 @@ +// Standard +#include +#include + +// Pipes +#include +#include +#include +#include + +// Libs +#include +#include +#include + +// Local +#include "pipe_def.h" +#include "util.h" + +static std::shared_ptr logger; + +int main(int argc, char* argv[]) +{ + logger = spdlog::stdout_color_mt("controller"); + logger->set_level(CONTROLLER_LOGLEVEL); +/* + logger->debug("Opening sendpipe for writing"); + pipe_fd_t sendpipe_fd = open(SEND_PIPE, O_WRONLY); +*/ + logger->debug("Opening recvpipe for reading"); + pipe_fd_t recvpipe_fd = open(RECV_PIPE, O_RDONLY); + + uint8_t buffer[MAVLINK_MAX_PACKET_LEN]; + while(true) + { + int ret = read(recvpipe_fd, buffer, MAVLINK_MAX_PACKET_LEN); + + if (ret == 0) { + logger->error("No writing process on recvpipe. Exiting."); + break; + } + else if (ret < 0) + logger->error("Error while reading message. {}", strerror(errno)); + else { + // handle mavlink message + } + } + + logger->error("here"); + //close(sendpipe_fd); + close(recvpipe_fd); + return 0; +} \ No newline at end of file diff --git a/processes/include/pipe_def.h b/processes/include/pipe_def.h new file mode 100644 index 0000000..23cca7b --- /dev/null +++ b/processes/include/pipe_def.h @@ -0,0 +1,13 @@ +#ifndef PIPE_DEF_H +#define PIPE_DEF_H + +#define UDP_LOGLEVEL spdlog::level::debug +#define CLICK_LOGLEVEL spdlog::level::debug +#define CONTROLLER_LOGLEVEL spdlog::level::debug + +#define RECV_PIPE "/var/lock/recv_pipe" +#define SEND_PIPE "/var/lock/send_pipe" + +typedef int pipe_fd_t; + +#endif \ No newline at end of file diff --git a/processes/include/util.h b/processes/include/util.h new file mode 100644 index 0000000..c9c85f3 --- /dev/null +++ b/processes/include/util.h @@ -0,0 +1,15 @@ +#ifndef UTIL_H +#define UTIL_H + +#include +#include + +#define THREAD_SLEEP(time) std::this_thread::sleep_for(time) +using namespace std::chrono_literals; + +typedef enum { + ERR = -1, + OK = 0 +} err_t; + +#endif \ No newline at end of file diff --git a/processes/inter_proc/CMakeLists.txt b/processes/inter_proc/CMakeLists.txt new file mode 100755 index 0000000..6e9965d --- /dev/null +++ b/processes/inter_proc/CMakeLists.txt @@ -0,0 +1,11 @@ +add_executable(inter_proc + main.cpp +) + +target_include_directories(inter_proc PRIVATE include) + +target_link_libraries(inter_proc PRIVATE spdlog::spdlog) + +target_include_directories(inter_proc PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../include) + + diff --git a/processes/inter_proc/main.cpp b/processes/inter_proc/main.cpp new file mode 100644 index 0000000..a81bdfb --- /dev/null +++ b/processes/inter_proc/main.cpp @@ -0,0 +1,32 @@ +#include + +#include +#include + +#include +#include +#include + + +#include "pipe_def.h" + +static std::shared_ptr logger; + +int main(int argc, char* argv[]) +{ + logger = spdlog::stdout_color_mt("inter_proc"); + + unlink(RECV_PIPE); + unlink(SEND_PIPE); + + if (mkfifo(RECV_PIPE, 0666) != 0) { + logger->warn("mkfifo warning (RECV_PIPE): {}", strerror(errno)); + } + + if (mkfifo(SEND_PIPE, 0666) != 0) { + logger->warn("mkfifo warning (SEND_PIPE): {}", strerror(errno)); + } + + logger->info("Pipes created"); + return 0; +} diff --git a/processes/udp_server/CMakeLists.txt b/processes/udp_server/CMakeLists.txt index 070e2cb..6060c8b 100755 --- a/processes/udp_server/CMakeLists.txt +++ b/processes/udp_server/CMakeLists.txt @@ -1,8 +1,12 @@ -add_executable(upd_server +add_executable(udp_server main.cpp + udp_server.cpp + mavlink_handler.cpp ) -target_include_directories(upd_server PRIVATE include) +target_include_directories(udp_server PRIVATE include) -target_link_libraries(upd_server PRIVATE MAVLink) -target_link_libraries(upd_server PRIVATE spdlog::spdlog) +target_link_libraries(udp_server PRIVATE MAVLink) +target_link_libraries(udp_server PRIVATE spdlog::spdlog) + +target_include_directories(udp_server PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../include) diff --git a/processes/udp_server/include/mavlink_handler.h b/processes/udp_server/include/mavlink_handler.h new file mode 100644 index 0000000..7f73f3c --- /dev/null +++ b/processes/udp_server/include/mavlink_handler.h @@ -0,0 +1,15 @@ +#ifndef MAVLINK_HANDLER_H +#define MAVLINK_HANDLER_H + +// Standard +#include + +// Libs +#include +#include + +void init_mavlink_handler(std::shared_ptr logg); + +void handle_heartbeat(const mavlink_message_t* message); + +#endif \ No newline at end of file diff --git a/processes/udp_server/include/udp_server.h b/processes/udp_server/include/udp_server.h new file mode 100644 index 0000000..65830ee --- /dev/null +++ b/processes/udp_server/include/udp_server.h @@ -0,0 +1,42 @@ +#ifndef UDP_SERVER_H +#define UDP_SERVER_H + +// Standard +#include +#include + +// Socket +#include +#include +#include + +// Pipes +#include + +// Libs +#include +#include + +// Local +#include "util.h" +#include "mavlink_handler.h" +#include "pipe_def.h" +#include "mavlink_handler.h" + +#define UDP_PORT 14552 +#define UDP_RECV_BUF_SIZE 2048 // enough for MTU 1500 bytes + +typedef int socket_fd_t; + +typedef struct { + socket_fd_t socket_fd; + struct sockaddr_in addr; + socklen_t addr_len; + bool addr_set; +} gcs_t; + +err_t udp_server_init(std::shared_ptr logg); + +err_t udp_server_run(pipe_fd_t recvpipe_fd, pipe_fd_t sendpipe_fd); + +#endif \ No newline at end of file diff --git a/processes/udp_server/main.cpp b/processes/udp_server/main.cpp index 032e04c..8f39fb4 100755 --- a/processes/udp_server/main.cpp +++ b/processes/udp_server/main.cpp @@ -1,177 +1,51 @@ -// Simple example receiving and sending MAVLink v2 over UDP -// based on POSIX APIs (e.g. Linux, BSD, macOS). +// Standard +#include +#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include +// Pipes +#include +#include +// Libs #include - +#include #include +// Local +#include "pipe_def.h" +#include "util.h" +#include "udp_server.h" -void receive_some(int socket_fd, struct sockaddr_in* src_addr, socklen_t* src_addr_len, bool* src_addr_set); -void handle_heartbeat(const mavlink_message_t* message); - -void send_some(int socket_fd, const struct sockaddr_in* src_addr, socklen_t src_addr_len); -void send_heartbeat(int socket_fd, const struct sockaddr_in* src_addr, socklen_t src_addr_len); +static std::shared_ptr logger; +void receive_thread(gcs_t* src); +void handle_heartbeat(const mavlink_message_t* message); +void send_some(gcs_t* src); +void send_heartbeat(gcs_t* src); +// Main int main(int argc, char* argv[]) { - // Open UDP socket - const int socket_fd = socket(AF_INET, SOCK_DGRAM, 0); - - if (socket_fd < 0) { - printf("socket error: %s\n", strerror(errno)); - return -1; - } - - // Bind to port - struct sockaddr_in addr = {}; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - inet_pton(AF_INET, "0.0.0.0", &(addr.sin_addr)); // listen on all network interfaces - addr.sin_port = htons(14552); // default port on the ground + logger = spdlog::stdout_color_mt("udp_server"); + logger->set_level(UDP_LOGLEVEL); - if (bind(socket_fd, (struct sockaddr*)(&addr), sizeof(addr)) != 0) { - printf("bind error: %s\n", strerror(errno)); - return -2; - } + udp_server_init(logger); - // We set a timeout at 100ms to prevent being stuck in recvfrom for too - // long and missing our chance to send some stuff. - struct timeval tv; - tv.tv_sec = 0; - tv.tv_usec = 100000; - if (setsockopt(socket_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) { - printf("setsockopt error: %s\n", strerror(errno)); - return -3; - } + logger->debug("Opening pipes, waiting for writing process to start"); + pipe_fd_t sendpipe_fd = open(SEND_PIPE, O_RDONLY); + pipe_fd_t recvpipe_fd = open(RECV_PIPE, O_WRONLY); + + logger->debug("Pipes opened"); - struct sockaddr_in src_addr = {}; - socklen_t src_addr_len = sizeof(src_addr); - bool src_addr_set = false; + // Server mainloop, only exits on error + udp_server_run(recvpipe_fd, sendpipe_fd); - while (true) { - // For illustration purposes we don't bother with threads or async here - // and just interleave receiving and sending. - // This only works if receive_some returns every now and then. - receive_some(socket_fd, &src_addr, &src_addr_len, &src_addr_set); + logger->debug("Closing sendpipe"); + close(sendpipe_fd); - if (src_addr_set) { - send_some(socket_fd, &src_addr, src_addr_len); - printf("sent some\n"); - } - } + logger->debug("Closing recvpipe"); + close(recvpipe_fd); - return 0; + return -1; } -void receive_some(int socket_fd, struct sockaddr_in* src_addr, socklen_t* src_addr_len, bool* src_addr_set) -{ - // We just receive one UDP datagram and then return again. - char buffer[2048]; // enough for MTU 1500 bytes - - const int ret = recvfrom( - socket_fd, buffer, sizeof(buffer), 0, (struct sockaddr*)(src_addr), src_addr_len); - - if (ret < 0) { - printf("recvfrom error: %s\n", strerror(errno)); - } else if (ret == 0) { - // peer has done an orderly shutdown - return; - } - - *src_addr_set = true; - - mavlink_message_t message; - mavlink_status_t status; - for (int i = 0; i < ret; ++i) { - if (mavlink_parse_char(MAVLINK_COMM_0, buffer[i], &message, &status) == 1) { - - // printf( - // "Received message %d from %d/%d\n", - // message.msgid, message.sysid, message.compid); - - switch (message.msgid) { - case MAVLINK_MSG_ID_HEARTBEAT: - handle_heartbeat(&message); - break; - default: - printf("Received message %d from %d/%d\n", message.msgid, message.sysid, message.compid); - break; - } - } - } -} - -void handle_heartbeat(const mavlink_message_t* message) -{ - mavlink_heartbeat_t heartbeat; - mavlink_msg_heartbeat_decode(message, &heartbeat); - - printf("Got heartbeat from "); - switch (heartbeat.autopilot) { - case MAV_AUTOPILOT_GENERIC: - printf("generic"); - break; - case MAV_AUTOPILOT_ARDUPILOTMEGA: - printf("ArduPilot"); - break; - case MAV_AUTOPILOT_PX4: - printf("PX4"); - break; - default: - printf("other"); - break; - } - printf(" autopilot\n"); -} - -void send_some(int socket_fd, const struct sockaddr_in* src_addr, socklen_t src_addr_len) -{ - // Whenever a second has passed, we send a heartbeat. - static time_t last_time = 0; - time_t current_time = time(NULL); - if (current_time - last_time >= 1) { - send_heartbeat(socket_fd, src_addr, src_addr_len); - last_time = current_time; - } -} - -void send_heartbeat(int socket_fd, const struct sockaddr_in* src_addr, socklen_t src_addr_len) -{ - mavlink_message_t message; - - const uint8_t system_id = 42; - const uint8_t base_mode = 0; - const uint8_t custom_mode = 0; - mavlink_msg_heartbeat_pack_chan( - system_id, - MAV_COMP_ID_PERIPHERAL, - MAVLINK_COMM_0, - &message, - MAV_TYPE_GENERIC, - MAV_AUTOPILOT_GENERIC, - base_mode, - custom_mode, - MAV_STATE_STANDBY); - - uint8_t buffer[MAVLINK_MAX_PACKET_LEN]; - const int len = mavlink_msg_to_send_buffer(buffer, &message); - - int ret = sendto(socket_fd, buffer, len, 0, (const struct sockaddr*)src_addr, src_addr_len); - if (ret != len) { - printf("sendto error: %s\n", strerror(errno)); - } else { - printf("Sent heartbeat\n"); - } -} \ No newline at end of file diff --git a/processes/udp_server/mavlink_handler.cpp b/processes/udp_server/mavlink_handler.cpp new file mode 100644 index 0000000..626d9a5 --- /dev/null +++ b/processes/udp_server/mavlink_handler.cpp @@ -0,0 +1,27 @@ +#include "mavlink_handler.h" + +// Libs +#include +#include + +static std::shared_ptr logger; + +void init_mavlink_handler(std::shared_ptr logg) +{ + logger = logg; +} + +void handle_heartbeat(const mavlink_message_t* message) +{ + mavlink_heartbeat_t heartbeat; + mavlink_msg_heartbeat_decode(message, &heartbeat); + + switch (heartbeat.type) { + case MAV_TYPE_GCS: + logger->debug("Heartbeat from GCS!"); + break; + default: + logger->warn("Heartbeat from unknown type: {}", heartbeat.type); + break; + } +} \ No newline at end of file diff --git a/processes/udp_server/udp_server.cpp b/processes/udp_server/udp_server.cpp new file mode 100644 index 0000000..4e1ade5 --- /dev/null +++ b/processes/udp_server/udp_server.cpp @@ -0,0 +1,188 @@ +#include "udp_server.h" + +static std::shared_ptr logger; + +static bool recv_thread_running = true; +static gcs_t gcs = {}; + +// Declarations +static void recv_thread(pipe_fd_t sendpipe_fd); +static void heartbeat_thread(); +static void handle_mavlink(mavlink_message_t& message); + +// Definitions + +static void recv_thread(pipe_fd_t recvpipe_fd) +{ + char buffer[UDP_RECV_BUF_SIZE]; // enough for MTU 1500 bytes + + while (recv_thread_running) + { + int ret = recvfrom( + gcs.socket_fd, buffer, sizeof(buffer), 0, (struct sockaddr*)(&(gcs.addr)), &(gcs.addr_len)); + + if (ret < 0) { + logger->error("recvfrom error: {}", strerror(errno)); + break; + } else if (ret == 0) { + // peer has done an orderly shutdown + logger->info("Peer has done an orderly shutdown"); + break; + } + + gcs.addr_set = true; + + // Handle the received MAVLink message here + mavlink_message_t message; + mavlink_status_t status; + for (int i = 0; i < ret; ++i) { + if (mavlink_parse_char(MAVLINK_COMM_0, buffer[i], &message, &status) == 1) { + handle_mavlink(message); + } + } + logger->debug("Socket -> recvpipe: {} bytes", ret); + ret = write(recvpipe_fd, buffer, ret); + if (ret < 0) { + logger->error("Error while writing message. {}", strerror(errno)); + } + } +} + +static void heartbeat_thread() +{ + + mavlink_message_t message; + + const uint8_t system_id = 42; + const uint8_t base_mode = 0; + const uint8_t custom_mode = 0; + + mavlink_msg_heartbeat_pack_chan( + system_id, + MAV_COMP_ID_PERIPHERAL, + MAVLINK_COMM_0, + &message, + MAV_TYPE_GENERIC, + MAV_AUTOPILOT_GENERIC, + base_mode, + custom_mode, + MAV_STATE_STANDBY); + + uint8_t buffer[MAVLINK_MAX_PACKET_LEN]; + const int len = mavlink_msg_to_send_buffer(buffer, &message); + + while (true) { + + int ret = sendto(gcs.socket_fd, buffer, len, 0, (struct sockaddr*)(&(gcs.addr)), gcs.addr_len); + if (ret != len) { + logger->error("Heartbeat sendto error: {}", strerror(errno)); + } else { + logger->debug("Sent Heartbeat!"); + } + + THREAD_SLEEP(1s); + } +} + +static void handle_mavlink(mavlink_message_t& message) +{ + switch (message.msgid) { + case MAVLINK_MSG_ID_HEARTBEAT: + handle_heartbeat(&message); + break; + default: + logger->debug("Received message {} from {}/{}", (int) message.msgid, message.sysid, message.compid); + break; + } +} + +err_t udp_server_init(std::shared_ptr logg) +{ + logger = logg; + + init_mavlink_handler(logger); + + socket_fd_t socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + + if (socket_fd < 0) { + logger->error("socket error: {}", strerror(errno)); + return ERR; + } + + // Bind to port + struct sockaddr_in addr = {}; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + inet_pton(AF_INET, "0.0.0.0", &(addr.sin_addr)); // listen on all network interfaces + addr.sin_port = htons(UDP_PORT); // default port on the ground + + if (bind(socket_fd, (struct sockaddr*)(&addr), sizeof(addr)) != 0) { + logger->error("bind error: {}", strerror(errno)); + return ERR; + } + + logger->info("UDP server started!", UDP_PORT); + + // Set up GCS + gcs = { + .socket_fd = socket_fd, + .addr = addr, + .addr_len = sizeof(addr), + .addr_set = false + }; + + return OK; +} + +err_t udp_server_run(pipe_fd_t recvpipe_fd, pipe_fd_t sendpipe_fd) +{ + std::thread r_thread(recv_thread, recvpipe_fd); + + while (!(gcs.addr_set)) { + logger->info("Waiting for GCS to connect..."); + THREAD_SLEEP(1s); + } + + logger->info("Connected to GCS!"); + + std::thread h_thread(heartbeat_thread); + + int ret = 0; + uint8_t buffer[MAVLINK_MAX_PACKET_LEN]; + mavlink_message_t message; + mavlink_status_t status; + + while (true) { + ret = read(sendpipe_fd, &buffer, sizeof(buffer)); + + if (ret == 0) { + logger->error("No writing process on sendpipe. Exiting."); + break; + } + else if (ret < 0) + logger->error("Error while reading message. {}", strerror(errno)); + else { + /* + for (int i = 0; i < ret; ++i) { + if (mavlink_parse_char(MAVLINK_COMM_0, buffer[i], &message, &status) == 1) { + logger->debug("sendpipe -> socket: mavlink message id={},", (int) message.msgid); + } + } + */ + logger->debug("sendpipe -> socket: {} bytes", ret); + ret = sendto(gcs.socket_fd, buffer, ret, 0, (struct sockaddr*)(&(gcs.addr)), gcs.addr_len); + if (ret < 0) { + logger->error("Sending mavlink message to GCS sendto error {}", strerror(errno)); + } + + } + } + + logger->debug("Closing recv_thread"); + recv_thread_running = false; + + logger->debug("Closing socket"); + close(gcs.socket_fd); + + return ERR; +} \ No newline at end of file diff --git a/run.sh b/run.sh index fee9765..87b94da 100755 --- a/run.sh +++ b/run.sh @@ -1,4 +1,20 @@ #!/usr/bin/bash cd $(dirname "$0")/bin -./udp_server \ No newline at end of file + +# create pipes for inter processes communication +./inter_proc + +# start udp server +./udp_server & + +# start controller +./controller & + +# start click reader +./click_reader + +echo "Closing all processes" + +trap 'kill $(jobs -p)' EXIT +