From 2d86d3535f5c651418ed6c66e6bebe55a22c8d5e Mon Sep 17 00:00:00 2001 From: Daan Steenbergen Date: Sat, 17 Feb 2024 14:26:27 +0100 Subject: [PATCH] wip --- .../nmea2000/fast_packet_protocol/main.cpp | 6 +- isobus/CMakeLists.txt | 2 - .../isobus/isobus/can_network_manager.hpp | 8 +- isobus/include/isobus/isobus/can_protocol.hpp | 95 --- .../isobus/isobus/can_transport_protocol.hpp | 2 +- .../isobus/isobus_diagnostic_protocol.hpp | 1 - .../isobus/isobus/isobus_functionalities.hpp | 1 - .../isobus_shortcut_button_interface.hpp | 1 - .../isobus/nmea2000_fast_packet_protocol.hpp | 269 +++---- isobus/src/can_network_manager.cpp | 46 +- isobus/src/can_protocol.cpp | 60 -- isobus/src/can_transport_protocol.cpp | 1 - isobus/src/nmea2000_fast_packet_protocol.cpp | 708 ++++++++---------- 13 files changed, 470 insertions(+), 730 deletions(-) delete mode 100644 isobus/include/isobus/isobus/can_protocol.hpp delete mode 100644 isobus/src/can_protocol.cpp diff --git a/examples/nmea2000/fast_packet_protocol/main.cpp b/examples/nmea2000/fast_packet_protocol/main.cpp index 1aab94c6f..d11fa43e1 100644 --- a/examples/nmea2000/fast_packet_protocol/main.cpp +++ b/examples/nmea2000/fast_packet_protocol/main.cpp @@ -88,7 +88,7 @@ int main() auto TestInternalECU = isobus::InternalControlFunction::create(TestDeviceNAME, 0x1C, 0); - isobus::CANNetworkManager::CANNetwork.get_fast_packet_protocol().register_multipacket_message_callback(0x1F001, nmea2k_callback, nullptr); + isobus::CANNetworkManager::CANNetwork.get_fast_packet_protocol(0).register_multipacket_message_callback(0x1F001, nmea2k_callback, nullptr); // Wait to make sure address claiming is done. The time is arbitrary. //! @todo Check this instead of assuming it is done @@ -106,13 +106,13 @@ int main() while (running) { // Send a fast packet message - isobus::CANNetworkManager::CANNetwork.get_fast_packet_protocol().send_multipacket_message(0x1F001, testMessageData, TEST_MESSAGE_LENGTH, TestInternalECU, nullptr, isobus::CANIdentifier::CANPriority::PriorityLowest7, nmea2k_transmit_complete_callback); + isobus::CANNetworkManager::CANNetwork.get_fast_packet_protocol(0).send_multipacket_message(0x1F001, testMessageData, TEST_MESSAGE_LENGTH, TestInternalECU, nullptr, isobus::CANIdentifier::CANPriority::PriorityLowest7, nmea2k_transmit_complete_callback); // Sleep for a while std::this_thread::sleep_for(std::chrono::milliseconds(2000)); } isobus::CANHardwareInterface::stop(); - isobus::CANNetworkManager::CANNetwork.get_fast_packet_protocol().remove_multipacket_message_callback(0x1F001, nmea2k_callback, nullptr); + isobus::CANNetworkManager::CANNetwork.get_fast_packet_protocol(0).remove_multipacket_message_callback(0x1F001, nmea2k_callback, nullptr); return 0; } diff --git a/isobus/CMakeLists.txt b/isobus/CMakeLists.txt index 0954a6871..9552a7722 100644 --- a/isobus/CMakeLists.txt +++ b/isobus/CMakeLists.txt @@ -10,7 +10,6 @@ set(ISOBUS_INCLUDE_DIR "include/isobus/isobus") # Set source files set(ISOBUS_SRC "can_NAME.cpp" - "can_protocol.cpp" "can_identifier.cpp" "can_control_function.cpp" "can_message.cpp" @@ -53,7 +52,6 @@ prepend(ISOBUS_SRC ${ISOBUS_SRC_DIR} ${ISOBUS_SRC}) # Set the include files set(ISOBUS_INCLUDE "can_NAME.hpp" - "can_protocol.hpp" "can_badge.hpp" "can_identifier.hpp" "can_control_function.hpp" diff --git a/isobus/include/isobus/isobus/can_network_manager.hpp b/isobus/include/isobus/isobus/can_network_manager.hpp index abf797b1b..97ebc267d 100644 --- a/isobus/include/isobus/isobus/can_network_manager.hpp +++ b/isobus/include/isobus/isobus/can_network_manager.hpp @@ -187,8 +187,9 @@ namespace isobus /// @brief Returns the class instance of the NMEA2k fast packet protocol. /// Use this to register for FP multipacket messages + /// @param[in] canPortIndex The CAN channel index to get the fast packet protocol for /// @returns The class instance of the NMEA2k fast packet protocol. - FastPacketProtocol &get_fast_packet_protocol(); + std::unique_ptr &get_fast_packet_protocol(std::uint8_t canPortIndex); /// @brief Returns the configuration of this network manager /// @returns The configuration class for this network manager @@ -207,7 +208,6 @@ namespace isobus friend class DiagnosticProtocol; ///< Allows the diagnostic protocol to access the protected functions on the network manager friend class ParameterGroupNumberRequestProtocol; ///< Allows the PGN request protocol to access the network manager protected functions friend class FastPacketProtocol; ///< Allows the FP protocol to access the network manager protected functions - friend class CANLibProtocol; ///< Allows the CANLib protocol base class functions to access the network manager protected functions /// @brief Adds a PGN callback for a protocol class /// @param[in] parameterGroupNumber The PGN to register for @@ -245,8 +245,6 @@ namespace isobus /// @param[in] message The completed protocol message void protocol_message_callback(const CANMessage &message); - std::vector protocolList; ///< A list of all created protocol classes - private: /// @brief Constructor for the network manager. Sets default values for members CANNetworkManager(); @@ -380,7 +378,7 @@ namespace isobus CANNetworkConfiguration configuration; ///< The configuration for this network manager std::array, CAN_PORT_MAXIMUM> transportProtocols; ///< One instance of the transport protocol manager for each channel std::array, CAN_PORT_MAXIMUM> extendedTransportProtocols; ///< One instance of the extended transport protocol manager for each channel - FastPacketProtocol fastPacketProtocol; ///< Instance of the fast packet protocol + std::array, CAN_PORT_MAXIMUM> fastPacketProtocol; ///< One instance of the fast packet protocol for each channel std::array, CAN_PORT_MAXIMUM> busloadMessageBitsHistory; ///< Stores the approximate number of bits processed on each channel over multiple previous time windows std::array currentBusloadBitAccumulator; ///< Accumulates the approximate number of bits processed on each channel during the current time window diff --git a/isobus/include/isobus/isobus/can_protocol.hpp b/isobus/include/isobus/isobus/can_protocol.hpp deleted file mode 100644 index 7742e1f51..000000000 --- a/isobus/include/isobus/isobus/can_protocol.hpp +++ /dev/null @@ -1,95 +0,0 @@ -//================================================================================================ -/// @file can_protocol.hpp -/// -/// @brief A base class for all protocol classes. Allows the network manager to update them -/// in a generic, dynamic way. -/// @author Adrian Del Grosso -/// -/// @copyright 2022 Adrian Del Grosso -//================================================================================================ - -#ifndef CAN_PROTOCOL_HPP -#define CAN_PROTOCOL_HPP - -#include "isobus/isobus/can_badge.hpp" -#include "isobus/isobus/can_callbacks.hpp" -#include "isobus/isobus/can_control_function.hpp" -#include "isobus/isobus/can_message.hpp" - -#include - -namespace isobus -{ - class CANNetworkManager; - - //================================================================================================ - /// @class CANLibProtocol - /// - /// @brief A base class for a CAN protocol - /// @details CANLibProtocols are objects that manage different statful CAN protocols defined by - /// ISO11783 and/or J1939. They could also be used for abitrary processing inside the CAN stack. - //================================================================================================ - class CANLibProtocol - { - public: - /// @brief The base class constructor for a CANLibProtocol - CANLibProtocol(); - - /// @brief Deleted copy constructor for a CANLibProtocol - CANLibProtocol(CANLibProtocol &) = delete; - - /// @brief The base class destructor for a CANLibProtocol - virtual ~CANLibProtocol(); - - /// @brief Returns whether or not the protocol has been initialized by the network manager - /// @returns true if the protocol has been initialized by the network manager - bool get_is_initialized() const; - - /// @brief Gets a CAN protocol by index from the list of all protocols - /// @param[in] index The index of the protocol to get from the list of protocols - /// @param[out] returnedProtocol The returned protocol - /// @returns true if a protocol was successfully returned, false if index was out of range - static bool get_protocol(std::uint32_t index, CANLibProtocol *&returnedProtocol); - - /// @brief Returns the number of all created protocols - /// @returns The number of all created protocols - static std::uint32_t get_number_protocols(); - - /// @brief A generic way to initialize a protocol - /// @details The network manager will call a protocol's initialize function - /// when it is first updated, if it has yet to be initialized. - virtual void initialize(CANLibBadge); - - /// @brief A generic way for a protocol to process a received message - /// @param[in] message A received CAN message - virtual void process_message(const CANMessage &message) = 0; - - /// @brief The network manager calls this to see if the protocol can accept a non-raw CAN message for processing - /// @param[in] parameterGroupNumber The PGN of the message - /// @param[in] data The data to be sent - /// @param[in] messageLength The length of the data to be sent - /// @param[in] source The source control function - /// @param[in] destination The destination control function - /// @param[in] transmitCompleteCallback A callback for when the protocol completes its work - /// @param[in] parentPointer A generic context object for the tx complete and chunk callbacks - /// @param[in] frameChunkCallback A callback to get some data to send - /// @returns true if the message was accepted by the protocol for processing - virtual bool protocol_transmit_message(std::uint32_t parameterGroupNumber, - const std::uint8_t *data, - std::uint32_t messageLength, - std::shared_ptr source, - std::shared_ptr destination, - TransmitCompleteCallback transmitCompleteCallback, - void *parentPointer, - DataChunkCallback frameChunkCallback) = 0; - - /// @brief This will be called by the network manager on every cyclic update of the stack - virtual void update(CANLibBadge) = 0; - - protected: - bool initialized; ///< Keeps track of if the protocol has been initialized by the network manager - }; - -} // namespace isobus - -#endif // CAN_PROTOCOL_HPP diff --git a/isobus/include/isobus/isobus/can_transport_protocol.hpp b/isobus/include/isobus/isobus/can_transport_protocol.hpp index f07c8d628..ca6152b9b 100644 --- a/isobus/include/isobus/isobus/can_transport_protocol.hpp +++ b/isobus/include/isobus/isobus/can_transport_protocol.hpp @@ -304,7 +304,7 @@ namespace isobus /// @param[in] message The CAN message to be processed. void process_data_transfer_message(const CANMessage &message); - /// @brief Gets a TP session from the passed in source and destination and PGN combination + /// @brief Gets a TP session from the passed in source and destination combination /// @param[in] source The source control function for the session /// @param[in] destination The destination control function for the session /// @returns a matching session, or nullptr if no session matched the supplied parameters diff --git a/isobus/include/isobus/isobus/isobus_diagnostic_protocol.hpp b/isobus/include/isobus/isobus/isobus_diagnostic_protocol.hpp index e9490c2ae..214ce3802 100644 --- a/isobus/include/isobus/isobus/isobus_diagnostic_protocol.hpp +++ b/isobus/include/isobus/isobus/isobus_diagnostic_protocol.hpp @@ -37,7 +37,6 @@ #define ISOBUS_DIAGNOSTIC_PROTOCOL_HPP #include "isobus/isobus/can_internal_control_function.hpp" -#include "isobus/isobus/can_protocol.hpp" #include "isobus/isobus/isobus_functionalities.hpp" #include "isobus/utility/processing_flags.hpp" diff --git a/isobus/include/isobus/isobus/isobus_functionalities.hpp b/isobus/include/isobus/isobus/isobus_functionalities.hpp index 1090d770a..cb3499157 100644 --- a/isobus/include/isobus/isobus/isobus_functionalities.hpp +++ b/isobus/include/isobus/isobus/isobus_functionalities.hpp @@ -15,7 +15,6 @@ #include "isobus/isobus/can_internal_control_function.hpp" #include "isobus/isobus/can_parameter_group_number_request_protocol.hpp" -#include "isobus/isobus/can_protocol.hpp" #include "isobus/utility/processing_flags.hpp" #include "isobus/utility/thread_synchronization.hpp" diff --git a/isobus/include/isobus/isobus/isobus_shortcut_button_interface.hpp b/isobus/include/isobus/isobus/isobus_shortcut_button_interface.hpp index 8088c51ed..70306a755 100644 --- a/isobus/include/isobus/isobus/isobus_shortcut_button_interface.hpp +++ b/isobus/include/isobus/isobus/isobus_shortcut_button_interface.hpp @@ -26,7 +26,6 @@ #include "isobus/isobus/can_NAME.hpp" #include "isobus/isobus/can_internal_control_function.hpp" #include "isobus/isobus/can_message.hpp" -#include "isobus/isobus/can_protocol.hpp" #include "isobus/utility/event_dispatcher.hpp" #include "isobus/utility/processing_flags.hpp" diff --git a/isobus/include/isobus/isobus/nmea2000_fast_packet_protocol.hpp b/isobus/include/isobus/isobus/nmea2000_fast_packet_protocol.hpp index 8f3b793fd..a55135532 100644 --- a/isobus/include/isobus/isobus/nmea2000_fast_packet_protocol.hpp +++ b/isobus/include/isobus/isobus/nmea2000_fast_packet_protocol.hpp @@ -19,42 +19,110 @@ /// Electronics Association in any way. /// /// @author Adrian Del Grosso +/// @author Daan Steenbergen /// -/// @copyright 2022 Adrian Del Grosso +/// @copyright 2024 The Open-Agriculture Developers //================================================================================================ #ifndef NMEA2000_FAST_PACKET_PROTOCOL_HPP #define NMEA2000_FAST_PACKET_PROTOCOL_HPP -#include "isobus/isobus/can_internal_control_function.hpp" -#include "isobus/isobus/can_protocol.hpp" +#include "isobus/isobus/can_transport_protocol_base.hpp" +#include "isobus/utility/event_dispatcher.hpp" #include "isobus/utility/thread_synchronization.hpp" namespace isobus { /// @brief A protocol that handles the NMEA 2000 fast packet protocol. - class FastPacketProtocol : public CANLibProtocol + class FastPacketProtocol { public: - /// @brief A generic way to initialize a protocol - /// @details The network manager will call a protocol's initialize function - /// when it is first updated, if it has yet to be initialized. - void initialize(CANLibBadge) override; - - /// @brief Similar to add_parameter_group_number_callback but tells the stack to parse those PGNs as Fast Packet - /// @param[in] parameterGroupNumber The PGN to parse as fast packet - /// @param[in] callback The callback that the stack will call when a matching message is received - /// @param[in] parent Generic context variable - /// @param[in] internalControlFunction An internal control function to use as an additional filter for the callback. - /// Only messages destined for the specified ICF will generate a callback. Use nullptr to receive all messages. - void register_multipacket_message_callback(std::uint32_t parameterGroupNumber, CANLibCallback callback, void *parent, std::shared_ptr internalControlFunction = nullptr); - - // @brief Removes a callback previously added with register_multipacket_message_callback - /// @param[in] parameterGroupNumber The PGN to parse as fast packet - /// @param[in] callback The callback that the stack will call when a matching message is received - /// @param[in] parent Generic context variable - /// @param[in] internalControlFunction An internal control function to use as an additional filter for the callback - void remove_multipacket_message_callback(std::uint32_t parameterGroupNumber, CANLibCallback callback, void *parent, std::shared_ptr internalControlFunction = nullptr); + /// @brief An object for tracking fast packet session state + class FastPacketProtocolSession : public TransportProtocolSessionBase + { + public: + /// @brief The constructor for a session, for advanced use only. + /// In most cases, you should use the CANNetworkManager::get_fast_packet_protocol().send_message() function to transmit messages. + /// @param[in] direction The direction of the session + /// @param[in] data Data buffer (will be moved into the session) + /// @param[in] parameterGroupNumber The PGN of the message + /// @param[in] totalMessageSize The total size of the message in bytes + /// @param[in] sequenceNumber The sequence number for this PGN + /// @param[in] priority The priority to encode in the IDs of the component CAN messages + /// @param[in] source The source control function + /// @param[in] destination The destination control function + /// @param[in] sessionCompleteCallback A callback for when the session completes + /// @param[in] parentPointer A generic context object for the tx complete and chunk callbacks + FastPacketProtocolSession(TransportProtocolSessionBase::Direction direction, + std::unique_ptr data, + std::uint32_t parameterGroupNumber, + std::uint16_t totalMessageSize, + std::uint8_t sequenceNumber, + CANIdentifier::CANPriority priority, + std::shared_ptr source, + std::shared_ptr destination, + TransmitCompleteCallback sessionCompleteCallback, + void *parentPointer); + + /// @brief Get the total number of bytes that will be sent or received in this session + /// @note The maximum number of bytes that can be sent in a single session is 6 + 31 * 7 = 223 + /// @return The length of the message in number of bytes + std::uint8_t get_message_length() const; + + /// @brief Get the number of bytes that have been sent or received in this session + /// @return The number of bytes that have been sent or received + std::uint32_t get_total_bytes_transferred() const override; + + /// @brief Get whether or not this session is a broadcast session (BAM) + /// @return True if this session is a broadcast session, false if not + bool is_broadcast() const; + + protected: + friend class FastPacketProtocol; ///< Allows the TP manager full access + + /// @brief Get the last packet number that was sent or received in this session + /// @return The last packet number that was sent or received in this session + std::uint8_t get_last_packet_number() const; + + /// @brief Get the number of packets that remain to be sent or received in this session + /// @return The number of packets that remain to be sent or received in this session + std::uint8_t get_number_of_remaining_packets() const; + + /// @brief Get the total number of packets that will be sent or received in this session + /// @return The total number of packets that will be sent or received in this session + std::uint8_t get_total_number_of_packets() const; + + private: + std::uint8_t numberOfBytesTransferred = 0; ///< The total number of bytes that have been processed in this session + std::uint8_t sequenceNumber; ///< The sequence number for this PGN + CANIdentifier::CANPriority priority; ///< The priority to encode in the IDs of the component CAN messages + }; + + /// @brief A structure for keeping track of past sessions so we can resume with the right session number + struct FastPacketHistory + { + NAME isoName; ///< The ISO name of the internal control function used in a session + std::uint32_t parameterGroupNumber; ///< The PGN of the session being saved + std::uint8_t sequenceNumber; ///< The sequence number to use in the next matching session + }; + + /// @brief The constructor for the FastPacketProtocol, for advanced use only. + /// In most cases, you should use the CANNetworkManager::get_fast_packet_protocol().send_message() function to transmit messages. + /// @param[in] sendCANFrameCallback A callback for sending a CAN frame to hardware + explicit FastPacketProtocol(const CANMessageFrameCallback &sendCANFrameCallback); + + /// @brief The event dispatcher for messages received by the Fast Packet protocol + /// All messages received by the network manager are parsed by this protocol, and therefore + /// this function is called for any (internal) control function that is registered with the network manager. + /// @note You can also sniff all messages by allowing messages for non-internal control functions to be parsed by this protocol. + /// use the "allow_any_control_function()" function to enable this. + /// @return The event dispatcher for messages received by the Fast Packet protocol + EventDispatcher &get_message_received_event_dispatcher(); + + /// @brief The event dispatcher for messages sent by the Fast Packet protocol + /// The second parameter is a boolean that indicates if the message was sent successfully (true) or not (false). + /// @return The event dispatcher for messages sent by the Fast Packet protocol + EventDispatcher &get_message_sent_event_dispatcher(); /// @brief Used to send CAN messages using fast packet /// @details You have to use this function instead of the network manager @@ -80,119 +148,55 @@ namespace isobus void *parentPointer = nullptr, DataChunkCallback frameChunkCallback = nullptr); - /// @brief This will be called by the network manager on every cyclic update of the stack - void update(CANLibBadge) override; - - private: - /// @brief An object for tracking fast packet session state - class FastPacketProtocolSession - { - public: - /// @brief Enumerates the possible session directions, Rx or Tx - enum class Direction - { - Transmit, ///< We are transmitting a message - Receive ///< We are receiving a message - }; - - /// @brief A useful way to compare session objects to each other for equality - /// @param[in] obj The object to compare against - /// @returns true if the objects are equal, false otherwise - bool operator==(const FastPacketProtocolSession &obj); - - /// @brief Get the total number of bytes that will be sent or received in this session - /// @return The length of the message in number of bytes - std::uint32_t get_message_data_length() const; + /// @brief Set whether or not to allow messages for non-internal control functions to be parsed by this protocol + /// @param[in] allow Denotes if messages for non-internal control functions should be parsed by this protocol + void allow_any_control_function(bool allow); - private: - friend class FastPacketProtocol; ///< Allows the TP manager full access + /// @brief Updates all sessions managed by this protocol manager instance. + void update(); - /// @brief The constructor for a TP session - /// @param[in] sessionDirection Tx or Rx - explicit FastPacketProtocolSession(Direction sessionDirection); - - /// @brief The destructor for a TP session - ~FastPacketProtocolSession(); - - CANMessage sessionMessage; ///< A CAN message is used in the session to represent and store data like PGN - TransmitCompleteCallback sessionCompleteCallback; ///< A callback that is to be called when the session is completed - DataChunkCallback frameChunkCallback; ///< A callback that might be used to get chunks of data to send - std::uint32_t frameChunkCallbackMessageLength; ///< The length of the message that is being sent in chunks - void *parent; ///< A generic context variable that helps identify what object callbacks are destined for. Can be nullptr - std::uint32_t timestamp_ms; ///< A timestamp used to track session timeouts - std::uint16_t lastPacketNumber; ///< The last processed sequence number for this set of packets - std::uint8_t packetCount; ///< The total number of packets to receive or send in this session - std::uint8_t processedPacketsThisSession; ///< The total processed packet count for the whole session so far - std::uint8_t sequenceNumber; ///< The sequence number for this PGN - const Direction sessionDirection; ///< Represents Tx or Rx session - }; + /// @brief A generic way for a protocol to process a received message + /// @param[in] message A received CAN message + void process_message(const CANMessage &message); - /// @brief A structure for keeping track of past sessions so we can resume with the right session number - struct FastPacketHistory - { - NAME isoName; ///< The ISO name of the internal control function used in a session - std::uint32_t parameterGroupNumber; ///< The PGN of the session being saved - std::uint8_t sequenceNumber; ///< The sequence number to use in the next matching session - }; + /// @brief Calculates the number of frames needed for a message + /// @param[in] messageLength The length of the message in bytes + /// @returns The number of frames needed for the message + static std::uint8_t calculate_number_of_frames(std::uint8_t messageLength); + private: /// @brief Adds a session's info to the history so that we can continue the sequence number later /// @param[in] session The session to add to the history - void add_session_history(FastPacketProtocolSession *session); + void add_session_history(const std::shared_ptr &session); - /// @brief Ends a session and cleans up the memory associated with its metadata + /// @brief Gracefully closes a session to prepare for a new session /// @param[in] session The session to close - /// @param[in] successful `true` if the session was closed successfully, otherwise `false` - void close_session(FastPacketProtocolSession *session, bool successful); - - /// @brief Gets the sequence number to use for a new session based on the history - /// @param[in] session The new session we're starting - /// @returns The new sequence number to use - std::uint8_t get_new_sequence_number(FastPacketProtocolSession *session); - - /// @brief Returns a session that matches the parameters, if one exists - /// @param[in,out] returnedSession The returned session - /// @param[in] parameterGroupNumber The PGN - /// @param[in] source The session source control function - /// @param[in] destination The session destination control function - /// @returns `true` if a session was found that matches, otherwise `false` - bool get_session(FastPacketProtocolSession *&returnedSession, std::uint32_t parameterGroupNumber, std::shared_ptr source, std::shared_ptr destination); - - /// @brief A generic way for a protocol to process a received message - /// @param[in] message A received CAN message - void process_message(const CANMessage &message) override; - - /// @brief A generic way for a protocol to process a received message - /// @param[in] message A received CAN message - /// @param[in] parent Provides the context to the actual FP manager object - static void process_message(const CANMessage &message, void *parent); - - /// @brief Processes end of session callbacks - /// @param[in] session The session we've just completed - /// @param[in] success Denotes if the session was successful - void process_session_complete_callback(FastPacketProtocolSession *session, bool success); - - /// @brief The network manager calls this to see if the protocol can accept a non-raw CAN message for processing - /// @param[in] parameterGroupNumber The PGN of the message - /// @param[in] data The data to be sent - /// @param[in] messageLength The length of the data to be sent - /// @param[in] source The source control function - /// @param[in] destination The destination control function - /// @param[in] transmitCompleteCallback A callback for when the protocol completes its work - /// @param[in] parentPointer A generic context object for the tx complete and chunk callbacks - /// @param[in] frameChunkCallback A callback to get some data to send - /// @returns true if the message was accepted by the protocol for processing - bool protocol_transmit_message(std::uint32_t parameterGroupNumber, - const std::uint8_t *data, - std::uint32_t messageLength, - std::shared_ptr source, - std::shared_ptr destination, - TransmitCompleteCallback transmitCompleteCallback, - void *parentPointer, - DataChunkCallback frameChunkCallback) override; - - /// @brief Updates in-progress sessions - /// @param[in] session The session to process - void update_state_machine(FastPacketProtocolSession *session); + /// @param[in] successful Denotes if the session was successful + void close_session(std::shared_ptr session, bool successful); + + /// @brief Get the sequence number to use for a new session based on the history of past sessions + /// @param[in] name The ISO name of the internal control function used in a session + /// @param[in] parameterGroupNumber The PGN of the session being started + /// @returns The sequence number to use for the new session + std::uint8_t get_new_sequence_number(NAME name, std::uint32_t parameterGroupNumber); + + /// @brief Gets a FP session from the passed in source and destination and PGN combination + /// @param[in] parameterGroupNumber The PGN of the session + /// @param[in] source The source control function for the session + /// @param[in] destination The destination control function for the session + /// @returns a matching session, or nullptr if no session matched the supplied parameters + std::shared_ptr get_session(std::uint32_t parameterGroupNumber, std::shared_ptr source, std::shared_ptr destination); + + /// @brief Checks if a session by the passed in source and destination and PGN combination exists + /// @param[in] parameterGroupNumber The PGN of the session + /// @param[in] source The source control function for the session + /// @param[in] destination The destination control function for the session + /// @returns true if a matching session exists, false if not + bool has_session(std::uint32_t parameterGroupNumber, std::shared_ptr source, std::shared_ptr destination); + + /// @brief Update a single session + /// @param[in] session The session to update + void update_session(const std::shared_ptr &session); static constexpr std::uint32_t FP_MIN_PARAMETER_GROUP_NUMBER = 0x1F000; ///< Start of PGNs that can be received via Fast Packet static constexpr std::uint32_t FP_MAX_PARAMETER_GROUP_NUMBER = 0x1FFFF; ///< End of PGNs that can be received via Fast Packet @@ -203,10 +207,13 @@ namespace isobus static constexpr std::uint8_t SEQUENCE_NUMBER_BIT_OFFSET = 0x05; ///< The bit offset into the first byte of data to get the seq number static constexpr std::uint8_t PROTOCOL_BYTES_PER_FRAME = 7; ///< The number of payload bytes per frame for all but the first message, which has 6 - std::vector activeSessions; ///< A list of all active TP sessions - std::vector sessionHistory; ///< Used to keep track of sequence numbers for future sessions - std::vector parameterGroupNumberCallbacks; ///< A list of all parameter group number callbacks that will be parsed as fast packet messages + std::vector> activeSessions; ///< A list of all active TP sessions Mutex sessionMutex; ///< A mutex to lock the sessions list in case someone starts a Tx while the stack is processing sessions + std::vector sessionHistory; ///< Used to keep track of sequence numbers for future sessions + EventDispatcher messageReceivedEventDispatcher; ///< The event dispatcher for messages received by the Fast Packet protocol + EventDispatcher messageSentEventDispatcher; ///< The event dispatcher for messages sent by the Fast Packet protocol + bool allowAnyControlFunction = false; ///< Denotes if messages for non-internal control functions should be parsed by this protocol + const CANMessageFrameCallback sendCANFrameCallback; ///< A callback for sending a CAN frame }; } // namespace isobus diff --git a/isobus/src/can_network_manager.cpp b/isobus/src/can_network_manager.cpp index bf0de053f..0cc068c92 100644 --- a/isobus/src/can_network_manager.cpp +++ b/isobus/src/can_network_manager.cpp @@ -15,7 +15,6 @@ #include "isobus/isobus/can_hardware_abstraction.hpp" #include "isobus/isobus/can_message.hpp" #include "isobus/isobus/can_partnered_control_function.hpp" -#include "isobus/isobus/can_protocol.hpp" #include "isobus/isobus/can_stack_logger.hpp" #include "isobus/utility/system_timing.hpp" #include "isobus/utility/to_string.hpp" @@ -166,31 +165,6 @@ namespace isobus // Successfully sent via the extended transport protocol retVal = true; } - else - { - //! @todo convert the other protocols to stop using the abstract protocollib class - CANLibProtocol *currentProtocol; - // See if any transport layer protocol can handle this message - for (std::uint32_t i = 0; i < CANLibProtocol::get_number_protocols(); i++) - { - if (CANLibProtocol::get_protocol(i, currentProtocol)) - { - retVal = currentProtocol->protocol_transmit_message(parameterGroupNumber, - dataBuffer, - dataLength, - sourceControlFunction, - destinationControlFunction, - transmitCompleteCallback, - parentPointer, - frameChunkCallback); - - if (retVal) - { - break; - } - } - } - } //! @todo Allow sending 8 byte message with the frameChunkCallback if ((!retVal) && @@ -241,20 +215,7 @@ namespace isobus { transportProtocols[i]->update(); extendedTransportProtocols[i]->update(); - } - - for (std::size_t i = 0; i < CANLibProtocol::get_number_protocols(); i++) - { - CANLibProtocol *currentProtocol = nullptr; - - if (CANLibProtocol::get_protocol(i, currentProtocol)) - { - if (!currentProtocol->get_is_initialized()) - { - currentProtocol->initialize({}); - } - currentProtocol->update({}); - } + fastPacketProtocol[i]->update(); } update_busload_history(); updateTimestamp_ms = SystemTiming::get_timestamp_ms(); @@ -469,9 +430,9 @@ namespace isobus return retVal; } - FastPacketProtocol &CANNetworkManager::get_fast_packet_protocol() + std::unique_ptr &CANNetworkManager::get_fast_packet_protocol(std::uint8_t canPortIndex) { - return fastPacketProtocol; + return fastPacketProtocol[canPortIndex]; } CANNetworkConfiguration &CANNetworkManager::get_configuration() @@ -542,6 +503,7 @@ namespace isobus }; transportProtocols[i].reset(new TransportProtocolManager(send_frame_callback, receive_message_callback, &configuration)); extendedTransportProtocols[i].reset(new ExtendedTransportProtocolManager(send_frame_callback, receive_message_callback, &configuration)); + fastPacketProtocol[i].reset(new FastPacketProtocol(send_frame_callback)); } } diff --git a/isobus/src/can_protocol.cpp b/isobus/src/can_protocol.cpp deleted file mode 100644 index 8df98e03e..000000000 --- a/isobus/src/can_protocol.cpp +++ /dev/null @@ -1,60 +0,0 @@ -//================================================================================================ -/// @file can_protocol.cpp -/// -/// @brief A base class for all protocol classes. Allows the network manager to update them -/// in a generic, dynamic way. -/// @author Adrian Del Grosso -/// -/// @copyright 2022 Adrian Del Grosso -//================================================================================================ -#include "isobus/isobus/can_protocol.hpp" - -#include "isobus/isobus/can_network_manager.hpp" - -#include - -namespace isobus -{ - CANLibProtocol::CANLibProtocol() : - initialized(false) - { - CANNetworkManager::CANNetwork.protocolList.push_back(this); - } - - CANLibProtocol::~CANLibProtocol() - { - auto protocolLocation = find(CANNetworkManager::CANNetwork.protocolList.begin(), CANNetworkManager::CANNetwork.protocolList.end(), this); - - if (CANNetworkManager::CANNetwork.protocolList.end() != protocolLocation) - { - CANNetworkManager::CANNetwork.protocolList.erase(protocolLocation); - } - } - - bool CANLibProtocol::get_is_initialized() const - { - return initialized; - } - - bool CANLibProtocol::get_protocol(std::uint32_t index, CANLibProtocol *&returnedProtocol) - { - returnedProtocol = nullptr; - - if (index < CANNetworkManager::CANNetwork.protocolList.size()) - { - returnedProtocol = CANNetworkManager::CANNetwork.protocolList[index]; - } - return (nullptr != returnedProtocol); - } - - std::uint32_t CANLibProtocol::get_number_protocols() - { - return CANNetworkManager::CANNetwork.protocolList.size(); - } - - void CANLibProtocol::initialize(CANLibBadge) - { - initialized = true; - } - -} // namespace isobus diff --git a/isobus/src/can_transport_protocol.cpp b/isobus/src/can_transport_protocol.cpp index 74a575a56..c9d81128a 100644 --- a/isobus/src/can_transport_protocol.cpp +++ b/isobus/src/can_transport_protocol.cpp @@ -1003,7 +1003,6 @@ namespace isobus auto result = std::find_if(activeSessions.begin(), activeSessions.end(), [&](const std::shared_ptr &session) { return session->matches(source, destination); }); - // Instead of returning a pointer, we return by reference to indicate it should not be deleted or stored return (activeSessions.end() != result) ? (*result) : nullptr; } diff --git a/isobus/src/nmea2000_fast_packet_protocol.cpp b/isobus/src/nmea2000_fast_packet_protocol.cpp index bc17181ef..cff7ccd7b 100644 --- a/isobus/src/nmea2000_fast_packet_protocol.cpp +++ b/isobus/src/nmea2000_fast_packet_protocol.cpp @@ -4,8 +4,9 @@ /// @brief A protocol that handles the NMEA 2000 fast packet protocol. /// /// @author Adrian Del Grosso +/// @author Daan Steenbergen /// -/// @copyright 2022 Adrian Del Grosso +/// @copyright 2024 The Open-Agriculture Developers //================================================================================================ #include "isobus/isobus/nmea2000_fast_packet_protocol.hpp" @@ -19,67 +20,104 @@ namespace isobus { - FastPacketProtocol::FastPacketProtocolSession::FastPacketProtocolSession(Direction sessionDirection) : - sessionMessage(CANMessage::create_invalid_message()), - sessionCompleteCallback(nullptr), - frameChunkCallback(nullptr), - parent(nullptr), - timestamp_ms(0), - lastPacketNumber(0), - packetCount(0), - processedPacketsThisSession(0), - sequenceNumber(0), - sessionDirection(sessionDirection) + FastPacketProtocol::FastPacketProtocolSession::FastPacketProtocolSession(TransportProtocolSessionBase::Direction direction, + std::unique_ptr data, + std::uint32_t parameterGroupNumber, + std::uint16_t totalMessageSize, + std::uint8_t sequenceNumber, + CANIdentifier::CANPriority priority, + std::shared_ptr source, + std::shared_ptr destination, + TransmitCompleteCallback sessionCompleteCallback, + void *parentPointer) : + TransportProtocolSessionBase(direction, std::move(data), parameterGroupNumber, totalMessageSize, source, destination, sessionCompleteCallback, parentPointer), + sequenceNumber(sequenceNumber), + priority(priority) { } - bool FastPacketProtocol::FastPacketProtocolSession::operator==(const FastPacketProtocolSession &obj) + std::uint8_t FastPacketProtocol::FastPacketProtocolSession::get_message_length() const { - return ((sessionMessage.get_source_control_function() == obj.sessionMessage.get_source_control_function()) && - (sessionMessage.get_destination_control_function() == obj.sessionMessage.get_destination_control_function()) && - (sessionMessage.get_identifier().get_parameter_group_number() == obj.sessionMessage.get_identifier().get_parameter_group_number())); + // We know that this session can only be used to transfer 223 bytes of data, so we can safely cast to a uint8_t + return static_cast(TransportProtocolSessionBase::get_message_length()); } - std::uint32_t FastPacketProtocol::FastPacketProtocolSession::get_message_data_length() const + bool FastPacketProtocol::FastPacketProtocolSession::is_broadcast() const { - if (nullptr != frameChunkCallback) - { - return frameChunkCallbackMessageLength; - } - return sessionMessage.get_data_length(); + return (nullptr == get_destination()); } - FastPacketProtocol::FastPacketProtocolSession::~FastPacketProtocolSession() + std::uint32_t FastPacketProtocol::FastPacketProtocolSession::get_total_bytes_transferred() const { + return numberOfBytesTransferred; } - void FastPacketProtocol::initialize(CANLibBadge) + std::uint8_t FastPacketProtocol::FastPacketProtocolSession::get_last_packet_number() const { - if (!initialized) + // We know that this session can only be used to transfer 223 bytes of data, so we can safely cast to a uint8_t + std::uint8_t numberOfFrames = calculate_number_of_frames(static_cast(get_total_bytes_transferred())); + if (numberOfFrames > 0) { - initialized = true; + return numberOfFrames - 1; } + else + { + return 0; + } + } + + std::uint8_t FastPacketProtocol::FastPacketProtocolSession::get_number_of_remaining_packets() const + { + return get_total_number_of_packets() - get_last_packet_number(); } - void FastPacketProtocol::register_multipacket_message_callback(std::uint32_t parameterGroupNumber, CANLibCallback callback, void *parent, std::shared_ptr internalControlFunction) + std::uint8_t FastPacketProtocol::FastPacketProtocolSession::get_total_number_of_packets() const { - parameterGroupNumberCallbacks.push_back(ParameterGroupNumberCallbackData(parameterGroupNumber, callback, parent, internalControlFunction)); - CANNetworkManager::CANNetwork.add_protocol_parameter_group_number_callback(parameterGroupNumber, process_message, this); + return calculate_number_of_frames(get_message_length()); } - void FastPacketProtocol::remove_multipacket_message_callback(std::uint32_t parameterGroupNumber, CANLibCallback callback, void *parent, std::shared_ptr internalControlFunction) + std::uint8_t FastPacketProtocol::calculate_number_of_frames(std::uint8_t messageLength) { - ParameterGroupNumberCallbackData tempObject(parameterGroupNumber, callback, parent, internalControlFunction); - auto callbackLocation = std::find(parameterGroupNumberCallbacks.begin(), parameterGroupNumberCallbacks.end(), tempObject); - if (parameterGroupNumberCallbacks.end() != callbackLocation) + std::uint8_t numberOfFrames = 0; + // Account for the 6 bytes of data in the first frame + if (messageLength > 6) { - parameterGroupNumberCallbacks.erase(callbackLocation); + messageLength -= 6; + numberOfFrames = (messageLength / PROTOCOL_BYTES_PER_FRAME); + if (0 != (messageLength % PROTOCOL_BYTES_PER_FRAME)) + { + numberOfFrames++; + } + } + else + { + numberOfFrames = 1; } - CANNetworkManager::CANNetwork.remove_protocol_parameter_group_number_callback(parameterGroupNumber, process_message, this); + return numberOfFrames; + } + + FastPacketProtocol::FastPacketProtocol(const CANMessageFrameCallback &sendCANFrameCallback) : + sendCANFrameCallback(sendCANFrameCallback) + { + } + + EventDispatcher &FastPacketProtocol::get_message_received_event_dispatcher() + { + return messageReceivedEventDispatcher; + } + + EventDispatcher &FastPacketProtocol::get_message_sent_event_dispatcher() + { + return messageSentEventDispatcher; + } + + void FastPacketProtocol::allow_any_control_function(bool allow) + { + allowAnyControlFunction = allow; } bool FastPacketProtocol::send_multipacket_message(std::uint32_t parameterGroupNumber, - const std::uint8_t *data, + const std::uint8_t *messageData, std::uint8_t messageLength, std::shared_ptr source, std::shared_ptr destination, @@ -88,82 +126,88 @@ namespace isobus void *parentPointer, DataChunkCallback frameChunkCallback) { - bool retVal = false; - - if ((nullptr != source) && - (source->get_address_valid()) && - (parameterGroupNumber >= FP_MIN_PARAMETER_GROUP_NUMBER) && - (parameterGroupNumber <= FP_MAX_PARAMETER_GROUP_NUMBER) && - (messageLength <= MAX_PROTOCOL_MESSAGE_LENGTH) && - ((nullptr != data) || - (nullptr != frameChunkCallback))) + std::unique_ptr data; + if (nullptr != frameChunkCallback) { - FastPacketProtocolSession *tempSession = nullptr; - - if (!get_session(tempSession, parameterGroupNumber, source, destination)) - { - tempSession = new FastPacketProtocolSession(FastPacketProtocolSession::Direction::Transmit); - CANIdentifier identifier(CANIdentifier::Type::Extended, parameterGroupNumber, priority, (destination == nullptr ? 0xFF : destination->get_address()), source->get_address()); - tempSession->sessionMessage = CANMessage(CANMessage::Type::Transmit, - identifier, - data, - messageLength, - source, - destination, - source->get_can_port()); - - tempSession->parent = parentPointer; - tempSession->packetCount = ((messageLength - 6) / PROTOCOL_BYTES_PER_FRAME); - tempSession->timestamp_ms = SystemTiming::get_timestamp_ms(); - tempSession->processedPacketsThisSession = 0; - tempSession->sessionCompleteCallback = txCompleteCallback; - tempSession->sequenceNumber = get_new_sequence_number(tempSession); - - if ((messageLength > 6) && - (0 != ((messageLength - 6) % PROTOCOL_BYTES_PER_FRAME))) - { - tempSession->packetCount++; - } - LOCK_GUARD(Mutex, sessionMutex); + data.reset(new CANMessageDataCallback(messageLength, frameChunkCallback, parentPointer)); + } + else if (nullptr != messageData) + { + data.reset(new CANMessageDataView(messageData, messageLength)); + } - activeSessions.push_back(tempSession); - retVal = true; - } - else - { - // Already in a matching session, can't start another. - LOG_WARNING("[FP]: Can't send fast packet message, already in matching session."); - } + // Return false early if we can't send the message + if ((nullptr == data) || (data->size() <= CAN_DATA_LENGTH) || (data->size() > MAX_PROTOCOL_MESSAGE_LENGTH)) + { + LOG_ERROR("[FP]: Unable to send multipacket message, data is invalid or has invalid length."); + return false; } - else + else if ((nullptr == source) || (!source->get_address_valid()) || has_session(parameterGroupNumber, source, destination)) + { + LOG_ERROR("[FP]: Unable to send multipacket message, source is invalid or already in a session for the PGN."); + return false; + } + else if ((parameterGroupNumber < FP_MIN_PARAMETER_GROUP_NUMBER) || (parameterGroupNumber > FP_MAX_PARAMETER_GROUP_NUMBER)) + { + LOG_ERROR("[FP]: Unable to send multipacket message, PGN is unsupported by this protocol."); + return false; + } + else if ((nullptr != destination) && (!destination->get_address_valid())) { - LOG_ERROR("[FP]: Can't send fast packet message, bad parameters or ICF is invalid"); + LOG_ERROR("[FP]: Unable to send multipacket message, destination is invalid."); + return false; } - return retVal; + + std::uint8_t sequenceNumber = get_new_sequence_number(source->get_NAME(), parameterGroupNumber); + auto session = std::make_shared(FastPacketProtocolSession::Direction::Transmit, + std::move(data), + parameterGroupNumber, + messageLength, + sequenceNumber, + priority, + source, + destination, + txCompleteCallback, + parentPointer); + + LOCK_GUARD(Mutex, sessionMutex); + activeSessions.push_back(session); + return true; } - void FastPacketProtocol::update(CANLibBadge) + void FastPacketProtocol::update() { LOCK_GUARD(Mutex, sessionMutex); - - for (auto i : activeSessions) + // We use a fancy for loop here to allow us to remove sessions from the list while iterating + for (std::size_t i = activeSessions.size(); i > 0; i--) { - update_state_machine(i); + auto session = activeSessions.at(i - 1); + if (!session->get_source()->get_address_valid()) + { + LOG_WARNING("[FP]: Closing active session as the source control function is no longer valid"); + close_session(session, false); + } + else if (!session->is_broadcast() && !session->get_destination()->get_address_valid()) + { + LOG_WARNING("[FP]: Closing active session as the destination control function is no longer valid"); + close_session(session, false); + } + update_session(session); } } - void FastPacketProtocol::add_session_history(FastPacketProtocolSession *session) + void FastPacketProtocol::add_session_history(const std::shared_ptr &session) { if (nullptr != session) { bool formerSessionMatched = false; - for (std::size_t i = 0; i < sessionHistory.size(); i++) + for (auto &formerSessions : sessionHistory) { - if ((sessionHistory[i].isoName == session->sessionMessage.get_source_control_function()->get_NAME()) && - (sessionHistory[i].parameterGroupNumber == session->sessionMessage.get_identifier().get_parameter_group_number())) + if ((formerSessions.isoName == session->get_source()->get_NAME()) && + (formerSessions.parameterGroupNumber == session->get_parameter_group_number())) { - sessionHistory[i].sequenceNumber++; + formerSessions.sequenceNumber = session->sequenceNumber; formerSessionMatched = true; break; } @@ -172,364 +216,254 @@ namespace isobus if (!formerSessionMatched) { FastPacketHistory history{ - session->sessionMessage.get_source_control_function()->get_NAME(), - session->sessionMessage.get_identifier().get_parameter_group_number(), + session->get_source()->get_NAME(), + session->get_parameter_group_number(), session->sequenceNumber }; - history.sequenceNumber++; sessionHistory.push_back(history); } } } - void FastPacketProtocol::close_session(FastPacketProtocolSession *session, bool successful) + void FastPacketProtocol::close_session(std::shared_ptr session, bool successful) { if (nullptr != session) { - process_session_complete_callback(session, successful); - for (auto currentSession = activeSessions.begin(); currentSession != activeSessions.end(); currentSession++) + session->complete(successful); + add_session_history(session); + + auto sessionLocation = std::find(activeSessions.begin(), activeSessions.end(), session); + if (activeSessions.end() != sessionLocation) { - if (session == *currentSession) - { - activeSessions.erase(currentSession); - delete session; - break; - } + activeSessions.erase(sessionLocation); + LOG_DEBUG("[FP]: Session Closed"); } } } - std::uint8_t FastPacketProtocol::get_new_sequence_number(FastPacketProtocolSession *session) + std::uint8_t FastPacketProtocol::get_new_sequence_number(NAME name, std::uint32_t parameterGroupNumber) { - std::uint8_t retVal = 0; - - if (nullptr != session) + std::uint8_t sequenceNumber = 0; + for (const auto &formerSessions : sessionHistory) { - for (auto &formerSessions : sessionHistory) + if ((formerSessions.isoName == name) && (formerSessions.parameterGroupNumber == parameterGroupNumber)) { - if ((formerSessions.isoName == session->sessionMessage.get_source_control_function()->get_NAME()) && - (formerSessions.parameterGroupNumber == session->sessionMessage.get_identifier().get_parameter_group_number())) - { - retVal = formerSessions.sequenceNumber; - break; - } + sequenceNumber = formerSessions.sequenceNumber + 1; + break; } } - return retVal; + return sequenceNumber; } - bool FastPacketProtocol::get_session(FastPacketProtocolSession *&returnedSession, std::uint32_t parameterGroupNumber, std::shared_ptr source, std::shared_ptr destination) + void FastPacketProtocol::process_message(const CANMessage &message) { - returnedSession = nullptr; - LOCK_GUARD(Mutex, sessionMutex); + if ((CAN_DATA_LENGTH != message.get_data_length()) || + (message.get_identifier().get_parameter_group_number() < FP_MIN_PARAMETER_GROUP_NUMBER) || + (message.get_identifier().get_parameter_group_number() > FP_MAX_PARAMETER_GROUP_NUMBER)) + { + // Not a valid message for this protocol + return; + } - for (auto session : activeSessions) + if (messageReceivedEventDispatcher.get_listener_count() == 0) { - if ((session->sessionMessage.get_identifier().get_parameter_group_number() == parameterGroupNumber) && - (session->sessionMessage.get_source_control_function() == source) && - (session->sessionMessage.get_destination_control_function() == destination)) - { - returnedSession = session; - break; - } + // No listeners, no need to process the message + return; } - return (nullptr != returnedSession); - } - void FastPacketProtocol::process_message(const CANMessage &message, void *parent) - { - if (nullptr != parent) + if ((!message.is_destination_our_device()) && (!allowAnyControlFunction)) { - static_cast(parent)->process_message(message); + // We only want to process messages that are destined for us + return; } - } - void FastPacketProtocol::process_message(const CANMessage &message) - { - if ((CAN_DATA_LENGTH == message.get_data_length()) && - (message.get_identifier().get_parameter_group_number() >= FP_MIN_PARAMETER_GROUP_NUMBER) && - (message.get_identifier().get_parameter_group_number() <= FP_MAX_PARAMETER_GROUP_NUMBER)) + std::shared_ptr session = get_session(message.get_identifier().get_parameter_group_number(), + message.get_source_control_function(), + message.get_destination_control_function()); + std::uint8_t actualFrameCount = (message.get_uint8_at(0) & FRAME_COUNTER_BIT_MASK); + + if (nullptr != session) { - // See if we care about parsing this message - if (parameterGroupNumberCallbacks.size() > 0) + // We have a matching active session + if (0 == actualFrameCount) { - bool pgnNeedsParsing = false; - - for (auto &callback : parameterGroupNumberCallbacks) + // This is the beginning of a new message, but we already have a session + LOG_ERROR("[FP]: Existing session matched new frame counter, aborting the matching session."); + close_session(session, false); + } + else + { + // Correct sequence number, copy the data + // Convert data type to a vector to allow for manipulation + auto &data = static_cast(session->get_data()); + for (std::uint8_t i = 0; i < PROTOCOL_BYTES_PER_FRAME; i++) { - if ((callback.get_parameter_group_number() == message.get_identifier().get_parameter_group_number()) && - ((nullptr == callback.get_internal_control_function()) || - (callback.get_internal_control_function()->get_address() == message.get_destination_control_function()->get_address()))) + if (session->numberOfBytesTransferred < session->get_message_length()) + { + data.set_byte(session->numberOfBytesTransferred, message.get_uint8_at(1 + i)); + session->numberOfBytesTransferred++; + } + else { - pgnNeedsParsing = true; + // Reached the end of the message, no need to copy any more data break; } } - if (pgnNeedsParsing) + if (session->numberOfBytesTransferred >= session->get_message_length()) + { + // Complete + // Find the appropriate callback and let them know + messageReceivedEventDispatcher.invoke({ CANMessage::Type::Receive, + message.get_identifier(), + std::move(data), + message.get_source_control_function(), + message.get_destination_control_function(), + message.get_can_port_index() }); + close_session(session, true); + } + } + } + else + { + // No matching session. See if we need to start a new session + if (0 != actualFrameCount) + { + // This is the middle of some message that we have no context for. + // Ignore the message for now until we receive it with a fresh packet counter. + LOG_WARNING("[FP]: Ignoring FP message with PGN %u, no context available. The message may be processed when packet count returns to zero.", + message.get_identifier().get_parameter_group_number()); + } + else + { + // This is the beginning of a new message + std::uint8_t messageLength = message.get_uint8_at(1); + if (messageLength > MAX_PROTOCOL_MESSAGE_LENGTH) { - FastPacketProtocolSession *currentSession = nullptr; - std::vector messageData = message.get_data(); - std::uint8_t frameCount = (messageData[0] & FRAME_COUNTER_BIT_MASK); + LOG_WARNING("[FP]: Ignoring possible new FP session with advertised length > 233."); + return; + } - // Check for a valid session - if (get_session(currentSession, message.get_identifier().get_parameter_group_number(), message.get_source_control_function(), message.get_destination_control_function())) - { - // Matched a session - if (0 != frameCount) - { - // Continue processing the message - for (std::uint8_t i = 0; i < PROTOCOL_BYTES_PER_FRAME; i++) - { - if (static_cast(i + (currentSession->processedPacketsThisSession * PROTOCOL_BYTES_PER_FRAME) - 1) < currentSession->sessionMessage.get_data_length()) - { - currentSession->sessionMessage.set_data(messageData[1 + i], i + (currentSession->processedPacketsThisSession * PROTOCOL_BYTES_PER_FRAME) - 1); - } - else - { - break; - } - } - currentSession->processedPacketsThisSession++; - - if (static_cast((currentSession->processedPacketsThisSession * PROTOCOL_BYTES_PER_FRAME) - 1) >= currentSession->sessionMessage.get_data_length()) - { - // Complete - // Find the appropriate callback and let them know - for (auto &callback : parameterGroupNumberCallbacks) - { - if (callback.get_parameter_group_number() == currentSession->sessionMessage.get_identifier().get_parameter_group_number()) - { - callback.get_callback()(currentSession->sessionMessage, callback.get_parent()); - } - } - close_session(currentSession, true); // All done - } - } - else - { - LOG_ERROR("[FP]: Existing session matched new frame counter, aborting the matching session."); - close_session(currentSession, false); - } - } - else - { - // No matching session. See if we need to start a new session - if (0 == frameCount) - { - if (messageData[1] <= MAX_PROTOCOL_MESSAGE_LENGTH) - { - // This is the beginning of a new message - currentSession = new FastPacketProtocolSession(FastPacketProtocolSession::Direction::Receive); - currentSession->frameChunkCallback = nullptr; - if (messageData[1] >= PROTOCOL_BYTES_PER_FRAME - 1) - { - currentSession->packetCount = ((messageData[1] - 6) / PROTOCOL_BYTES_PER_FRAME); - } - else - { - currentSession->packetCount = 1; - } - currentSession->lastPacketNumber = ((messageData[0] >> SEQUENCE_NUMBER_BIT_OFFSET) & SEQUENCE_NUMBER_BIT_MASK); - currentSession->processedPacketsThisSession = 1; - - currentSession->sessionMessage = CANMessage(CANMessage::Type::Receive, - message.get_identifier(), - nullptr, - 0, - message.get_source_control_function(), - message.get_destination_control_function(), - message.get_can_port_index()); - currentSession->sessionMessage.set_data_size(messageData[1]); - currentSession->timestamp_ms = SystemTiming::get_timestamp_ms(); - - if (0 != (messageData[1] % PROTOCOL_BYTES_PER_FRAME)) - { - currentSession->packetCount++; - } - - // Save the 6 bytes of payload in this first message - for (std::uint8_t i = 0; i < (PROTOCOL_BYTES_PER_FRAME - 1); i++) - { - currentSession->sessionMessage.set_data(messageData[2 + i], i); - } - LOCK_GUARD(Mutex, sessionMutex); - - activeSessions.push_back(currentSession); - } - else - { - LOG_WARNING("[FP]: Ignoring possible new FP session with advertised length > 233."); - } - } - else - { - // This is the middle of some message that we have no context for. - // Ignore the message for now until we receive it with a fresh packet counter. - LOG_WARNING("[FP]: Ignoring FP message with PGN %u, no context available. The message may be processed when packet count returns to zero.", - message.get_identifier().get_parameter_group_number()); - } - } + // Create a new session + session = std::make_shared(FastPacketProtocolSession::Direction::Receive, + std::unique_ptr(new CANMessageDataVector(messageLength)), + message.get_identifier().get_parameter_group_number(), + messageLength, + message.get_source_control_function(), + message.get_destination_control_function(), + nullptr, // No callback + nullptr); + session->sequenceNumber = (message.get_uint8_at(0) >> SEQUENCE_NUMBER_BIT_OFFSET); + session->priority = message.get_identifier().get_priority(); + + // Save the 6 bytes of payload in this first message + // Convert data type to a vector to allow for manipulation + auto &data = static_cast(session->get_data()); + for (std::uint8_t i = 0; i < (PROTOCOL_BYTES_PER_FRAME - 1); i++) + { + data.set_byte(session->numberOfBytesTransferred, message.get_uint8_at(2 + i)); + session->numberOfBytesTransferred++; } + + LOCK_GUARD(Mutex, sessionMutex); + activeSessions.push_back(session); } } } - void FastPacketProtocol::process_session_complete_callback(FastPacketProtocolSession *session, bool success) + void FastPacketProtocol::update_session(const std::shared_ptr &session) { - if ((nullptr != session) && - (nullptr != session->sessionCompleteCallback) && - (nullptr != session->sessionMessage.get_source_control_function()) && - (ControlFunction::Type::Internal == session->sessionMessage.get_source_control_function()->get_type())) + if (nullptr == session) { - session->sessionCompleteCallback(session->sessionMessage.get_identifier().get_parameter_group_number(), - session->get_message_data_length(), - std::static_pointer_cast(session->sessionMessage.get_source_control_function()), - session->sessionMessage.get_destination_control_function(), - success, - session->parent); + return; } - } - bool FastPacketProtocol::protocol_transmit_message(std::uint32_t, - const std::uint8_t *, - std::uint32_t, - std::shared_ptr, - std::shared_ptr, - TransmitCompleteCallback, - void *, - DataChunkCallback) - { - return false; - } - - void FastPacketProtocol::update_state_machine(FastPacketProtocolSession *session) - { - if (nullptr != session) + if (session->get_direction() == FastPacketProtocolSession::Direction::Receive) { - switch (session->sessionDirection) + // We are receiving a message, only need to check for timeouts + if (session->get_time_since_last_update() > FP_TIMEOUT_MS) { - case FastPacketProtocolSession::Direction::Receive: + LOG_ERROR("[FP]: Rx session timed out."); + close_session(session, false); + } + } + else + { + std::array buffer; + // We are transmitting a message, let's try and send remaining packets + for (std::uint8_t i = 0; i < session->get_number_of_remaining_packets(); i++) + { + buffer[0] = session->get_last_packet_number(); + buffer[0] |= (session->sequenceNumber << SEQUENCE_NUMBER_BIT_OFFSET); + + std::uint8_t startIndex = 1; + std::uint8_t bytesThisFrame = PROTOCOL_BYTES_PER_FRAME; + if (0 == session->get_total_bytes_transferred()) { - if (SystemTiming::time_expired_ms(session->timestamp_ms, FP_TIMEOUT_MS)) - { - LOG_ERROR("[FP]: Rx session timed out."); - close_session(session, false); - } + // This is the first frame, so we need to send the message length + buffer[1] = session->get_message_length(); + startIndex++; + bytesThisFrame--; } - break; - case FastPacketProtocolSession::Direction::Transmit: + std::uint16_t dataOffset = session->get_last_packet_number() * PROTOCOL_BYTES_PER_FRAME; + for (std::uint8_t j = 0; j < bytesThisFrame; j++) { - std::array dataBuffer; - std::vector messageData; - bool txSessionCancelled = false; - - for (std::uint8_t i = session->processedPacketsThisSession; i <= session->packetCount; i++) + std::uint16_t index = dataOffset + j; + if (index < session->get_message_length()) { - std::uint8_t bytesProcessedSoFar = (session->processedPacketsThisSession > 0 ? 6 : 0); - - if (0 != bytesProcessedSoFar) - { - bytesProcessedSoFar += (PROTOCOL_BYTES_PER_FRAME * (session->processedPacketsThisSession - 1)); - } - - std::uint16_t numberBytesLeft = (session->get_message_data_length() - bytesProcessedSoFar); - - if (numberBytesLeft > PROTOCOL_BYTES_PER_FRAME) - { - numberBytesLeft = PROTOCOL_BYTES_PER_FRAME; - } - - if (nullptr != session->frameChunkCallback) - { - std::uint8_t callbackBuffer[CAN_DATA_LENGTH] = { 0 }; // Only need 7 but give them 8 in case they make a mistake - bool callbackSuccessful = session->frameChunkCallback(dataBuffer[0], (PROTOCOL_BYTES_PER_FRAME * session->processedPacketsThisSession), numberBytesLeft, callbackBuffer, session->parent); - - if (callbackSuccessful) - { - for (std::uint8_t j = 0; j < PROTOCOL_BYTES_PER_FRAME; j++) - { - dataBuffer[1 + j] = callbackBuffer[j]; - } - } - else - { - close_session(session, false); - break; - } - } - else - { - messageData = session->sessionMessage.get_data(); - if (0 == session->processedPacketsThisSession) - { - dataBuffer[0] = session->processedPacketsThisSession; - dataBuffer[0] |= (session->sequenceNumber << SEQUENCE_NUMBER_BIT_OFFSET); - dataBuffer[1] = session->get_message_data_length(); - dataBuffer[2] = messageData[0]; - dataBuffer[3] = messageData[1]; - dataBuffer[4] = messageData[2]; - dataBuffer[5] = messageData[3]; - dataBuffer[6] = messageData[4]; - dataBuffer[7] = messageData[5]; - } - else - { - dataBuffer[0] = session->processedPacketsThisSession; - dataBuffer[0] |= (session->sequenceNumber << SEQUENCE_NUMBER_BIT_OFFSET); - - if (numberBytesLeft < PROTOCOL_BYTES_PER_FRAME) - { - dataBuffer[1] = 0xFF; - dataBuffer[2] = 0xFF; - dataBuffer[3] = 0xFF; - dataBuffer[4] = 0xFF; - dataBuffer[5] = 0xFF; - dataBuffer[6] = 0xFF; - dataBuffer[7] = 0xFF; - } - - for (std::uint8_t j = 0; j < numberBytesLeft; j++) - { - dataBuffer[1 + j] = messageData[6 + ((i - 1) * PROTOCOL_BYTES_PER_FRAME) + j]; - } - } - } - if (CANNetworkManager::CANNetwork.send_can_message(session->sessionMessage.get_identifier().get_parameter_group_number(), - dataBuffer.data(), - CAN_DATA_LENGTH, - std::static_pointer_cast(session->sessionMessage.get_source_control_function()), - session->sessionMessage.get_destination_control_function(), - session->sessionMessage.get_identifier().get_priority(), - nullptr, - nullptr)) - { - session->processedPacketsThisSession++; - session->timestamp_ms = SystemTiming::get_timestamp_ms(); - } - else - { - if (SystemTiming::time_expired_ms(session->timestamp_ms, FP_TIMEOUT_MS)) - { - LOG_ERROR("[FP]: Tx session timed out."); - close_session(session, false); - txSessionCancelled = true; - } - break; - } + buffer[startIndex + j] = session->get_data().get_byte(index); } + else + { + buffer[startIndex + j] = 0xFF; + } + } - if ((!txSessionCancelled) && - (session->processedPacketsThisSession >= session->packetCount)) + if (sendCANFrameCallback(session->get_parameter_group_number(), + CANDataSpan(buffer.data(), buffer.size()), + std::static_pointer_cast(session->get_source()), + session->get_destination(), + session->priority)) + { + session->numberOfBytesTransferred += bytesThisFrame; + session->update_timestamp(); + } + else + { + if (session->get_time_since_last_update() > FP_TIMEOUT_MS) { - add_session_history(session); - close_session(session, true); // Session is done! + LOG_ERROR("[FP]: Tx session timed out."); + close_session(session, false); } + break; } - break; + } + + if (session->get_number_of_remaining_packets() == 0) + { + close_session(session, true); } } } + bool FastPacketProtocol::has_session(std::uint32_t parameterGroupNumber, std::shared_ptr source, std::shared_ptr destination) + { + LOCK_GUARD(Mutex, sessionMutex); + return std::any_of(activeSessions.begin(), activeSessions.end(), [&](const std::shared_ptr &session) { + return session->matches(source, destination) && (session->get_parameter_group_number() == parameterGroupNumber); + }); + } + + std::shared_ptr FastPacketProtocol::get_session(std::uint32_t parameterGroupNumber, + std::shared_ptr source, + std::shared_ptr destination) + { + LOCK_GUARD(Mutex, sessionMutex); + auto result = std::find_if(activeSessions.begin(), activeSessions.end(), [&](const std::shared_ptr &session) { + return session->matches(source, destination) && (session->get_parameter_group_number() == parameterGroupNumber); + }); + return (activeSessions.end() != result) ? (*result) : nullptr; + } + } // namespace isobus