diff --git a/Makefile b/Makefile index b55330d..c142eb8 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,10 @@ all: build build: - go build -o diode -ldflags="-X main.SemVer=0.0.8" diode.go + go build -o diode -ldflags="-X main.SemVer=0.0.9" diode.go test: - go test -v + go test -v ./... run: go run diode.go diff --git a/config/settings.yaml b/config/settings.yaml index c5d5713..932fda2 100644 --- a/config/settings.yaml +++ b/config/settings.yaml @@ -8,6 +8,6 @@ output: port: 13337 tls: false broker: - server: "test.mosquitto.org" + server: "localhost" port: 1883 topic: "diode/telemetry" diff --git a/diode.go b/diode.go index 2d9ffb7..daa5d4f 100644 --- a/diode.go +++ b/diode.go @@ -13,6 +13,7 @@ import ( "os" "time" + analysis "github.com/acep-uaf/data-diode/insights" utility "github.com/acep-uaf/data-diode/utility" "github.com/urfave/cli/v2" "gopkg.in/yaml.v2" @@ -136,7 +137,7 @@ func main() { Usage: "Testing state synchronization via diode I/O", Action: func(tCtx *cli.Context) error { fmt.Println("----- TEST -----") - exampleContents(InputTextFile) + utility.RepublishContents(InputTextFile, mqttBrokerIP, mqttBrokerTopic, mqttBrokerPort) return nil }, }, @@ -156,8 +157,7 @@ func main() { Usage: "System benchmark analysis + report performance metrics", Action: func(bCtx *cli.Context) error { fmt.Println("----- BENCHMARKS -----") - // TODO: Perform specific benchmarks here... - // e.g. ping test, network throughput, system performance, disk I/O, memory usage + analysis.Pong() return nil }, }, @@ -167,7 +167,7 @@ func main() { Usage: "MQTT → TCP stream demo", Action: func(mCtx *cli.Context) error { fmt.Println("----- MQTT -----") - republishContents(InputTextFile, mqttBrokerIP, mqttBrokerTopic, mqttBrokerPort) + utility.Subscription(mqttBrokerIP, mqttBrokerPort, mqttBrokerTopic, targetServerIP, targetServerPort) return nil }, }, diff --git a/diode_test.go b/diode_test.go index e8cef85..4fd0df2 100644 --- a/diode_test.go +++ b/diode_test.go @@ -1,33 +1,77 @@ package main import ( + "bytes" "fmt" + "net" "os" + "os/exec" + "path/filepath" + "strings" "testing" insights "github.com/acep-uaf/data-diode/insights" + "github.com/acep-uaf/data-diode/utility" ) var ( - BackupConfiguration = "config/B4-0144-355112.json" - SystemSettings = "config/settings.yaml" - FileChecksum = "477076c6fd8cf48ff2d0159b22bada27588c6fa84918d1c4fc20cd9ddd291dbd" + BackupConfiguration = "config/B4-0144-355112.json" + SystemSettings = "config/settings.yaml" + ProjectDocumentation = "docs/SOP.md" + FileChecksum = "477076c6fd8cf48ff2d0159b22bada27588c6fa84918d1c4fc20cd9ddd291dbd" + SampleMessage = "Hello, world." + InterfaceSize = 1024 + InterfaceProtocol = "tcp" + InterfaceAddress = "localhost:13337" ) +type TCP struct { + ClientTargetIP string + ClientTargetPort int + ServerTargetIP string + ServerPort int + ServerSocketTimeout int +} + func TestAPI(t *testing.T) { jsonFile, err := os.Open(BackupConfiguration) + schema := "CAMIO.2024.1.0" + version := filepath.Base(jsonFile.Name()) + + // FIXME: Cross reference the JSON contents, schema version, & configuration file? + fmt.Println(version, schema) + if err != nil { t.Errorf("[?] %s via %s", err, jsonFile.Name()) } } func TestCLI(t *testing.T) { - got := "diode" - want := "diode" + binary := exec.Command("go", "build", "-o", "diode") + buildErr := binary.Run() + if buildErr != nil { + t.Fatalf("[!] Failed to build CLI binary: %v", buildErr) + } - if got != want { - t.Errorf("got %q, want %q", got, want) + cmd := exec.Command("./diode") + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + err := cmd.Run() + if err != nil { + t.Fatalf("[!] Failed to execute CLI command: %v", err) + } + + expectation := "diode: try 'diode --help' for more information" + reality := strings.TrimSpace(stdout.String()) + if reality != expectation { + t.Errorf("[?] Expected output: %q, but got: %q", expectation, reality) + } + + if stderr.Len() > 0 { + t.Errorf("[?] Unexpected error output: %q", stderr.String()) } } @@ -43,10 +87,99 @@ func TestFileContents(t *testing.T) { want := FileChecksum if got != want { - t.Errorf("got %q, want %q", got, want) + t.Errorf(">> got %q, want %q", got, want) + } +} + +func TestBinaryContents(t *testing.T) { + // TODO: Implement the following: + // - Craft a text message containing binary data + checksum. + // - Ensure transmission across data diode without corrupted information. + // - Check for uuenconding and base64 encoding / delimiters. + + sample := []byte(SampleMessage) + + if len(sample) == 0 { + t.Errorf("[!] No binary contents...") + } +} + +func TestEchoMessage(t *testing.T) { + go func() { + listener, err := net.Listen(InterfaceProtocol, InterfaceAddress) + if err != nil { + t.Errorf("[!] Failed to start TCP server: %v", err) + } + defer listener.Close() + + conn, err := listener.Accept() + if err != nil { + t.Errorf("[!] Failed to accept connection: %v", err) + } + defer conn.Close() + + buf := make([]byte, InterfaceSize) + n, err := conn.Read(buf) + if err != nil { + t.Errorf("[!] Failed to read message: %v", err) + } + + _, err = conn.Write(buf[:n]) + if err != nil { + t.Errorf("[!] Failed to write message: %v", err) + } + }() + + // TODO: Mock the TCP client/server to simulate the transmission of data. + + conn, err := net.Dial(InterfaceProtocol, InterfaceAddress) + if err != nil { + t.Fatalf("[!] Failed to connect to TCP server: %v", err) + } + defer conn.Close() + + message := SampleMessage + _, err = conn.Write([]byte(message)) + if err != nil { + t.Fatalf("[!] Failed to send message: %v", err) + } + + buf := make([]byte, len(message)) + n, err := conn.Read(buf) + if err != nil { + t.Fatalf("[!] Failed to read echoed message: %v", err) + } + + match := string(buf[:n]) + if match != message { + t.Errorf("[!] Echoed message does not match original message: got %q, want %q", match, message) } } func TestRepublishContents(t *testing.T) { - // TODO: Mock the MQTT client. + location := ProjectDocumentation + broker := "localhost" + topic := "test/message" + port := 1883 + + // TODO: Mock the MQTT connection. + + utility.RepublishContents(location, broker, topic, port) + + if len(location) == 0 { + t.Errorf("[!] No location specified...") + } + + if len(broker) == 0 { + t.Errorf("[!] No broker specified...") + } + + if len(topic) == 0 { + t.Errorf("[!] No topic specified...") + } + + if port == 0 { + t.Errorf("[!] No port specified...") + } } + diff --git a/go.mod b/go.mod index 987ffcf..ef6ea9b 100644 --- a/go.mod +++ b/go.mod @@ -4,13 +4,14 @@ go 1.22.1 require ( github.com/eclipse/paho.mqtt.golang v1.4.3 + github.com/google/uuid v1.6.0 github.com/olekukonko/tablewriter v0.0.5 github.com/urfave/cli/v2 v2.27.1 gopkg.in/yaml.v2 v2.4.0 ) require ( - github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect + github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect github.com/gorilla/websocket v1.5.1 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/rivo/uniseg v0.4.7 // indirect diff --git a/go.sum b/go.sum index bee9865..599af3b 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,9 @@ -github.com/cpuguy83/go-md2man/v2 v2.0.3 h1:qMCsGGgs+MAzDFyp9LpAe1Lqy/fY/qCovCm0qnXZOBM= -github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4= +github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= diff --git a/insights/benchmark.go b/insights/benchmark.go index 80cd4f8..439b2a1 100644 --- a/insights/benchmark.go +++ b/insights/benchmark.go @@ -8,6 +8,21 @@ import ( "time" ) +func Pong() bool { + location := "https://example.com/" + + resp, err := http.Get(location) + + if err != nil { + return false + } + defer resp.Body.Close() + + println(">> Status: ", resp.Status) + + return true +} + func Checksum() [32]byte { location := "https://www.gutenberg.org/cache/epub/84/pg84.txt" diff --git a/utility/application.go b/utility/application.go index c42cf35..95ceb92 100644 --- a/utility/application.go +++ b/utility/application.go @@ -87,7 +87,6 @@ func StartPlaceholderClient(host string, port int) { func StartPlaceholderServer(host string, port int) { listener, err := net.Listen(CONN_TYPE, fmt.Sprintf("%s:%d", host, port)) - if err != nil { fmt.Println(">> [!] Error listening: ", err.Error()) return @@ -100,7 +99,6 @@ func StartPlaceholderServer(host string, port int) { fmt.Println(">> Server waiting for connection...") conn, err := listener.Accept() - if err != nil { fmt.Println(">> [!] Error accepting: ", err.Error()) return @@ -108,22 +106,28 @@ func StartPlaceholderServer(host string, port int) { fmt.Println(">> Server accepted connection from: ", conn.RemoteAddr()) - for { - data := make([]byte, SAMPLE) - _, err := conn.Read(data) - if err != nil { - fmt.Println(">> [!] Error receiving data: ", err.Error()) - break - } + go func(conn net.Conn) { + defer conn.Close() - fmt.Printf(">> Received data: %s\n", string(data)) + for { + data := make([]byte, SAMPLE) + bytesRead, err := conn.Read(data) + if err != nil { + fmt.Println(">> [!] Error receiving data: ", err.Error()) + if err.Error() == "EOF" { + fmt.Println(">> Connection closed by client.") + return + } + } - _, err = conn.Write([]byte(ACKNOWLEDGEMENT)) + fmt.Printf(">> Received data: %s\n", string(data[:bytesRead])) - if err != nil { - fmt.Println(">> [!] Error sending ACK: ", err.Error()) - return + _, err = conn.Write([]byte(ACKNOWLEDGEMENT)) + if err != nil { + fmt.Println(">> [!] Error sending ACK: ", err.Error()) + return + } } - } + }(conn) } } diff --git a/utility/application_test.go b/utility/application_test.go new file mode 100644 index 0000000..66c3c46 --- /dev/null +++ b/utility/application_test.go @@ -0,0 +1,21 @@ +package utility + +import "testing" + +func TestClient(t *testing.T) { + got := "server" + want := "client" + + if got != want { + t.Errorf("got %q, want %q", got, want) + } +} + +func TestServer(t *testing.T) { + got := "client" + want := "server" + + if got != want { + t.Errorf("got %q, want %q", got, want) + } +} diff --git a/utility/content.go b/utility/content.go index acbd9ff..79f9a08 100644 --- a/utility/content.go +++ b/utility/content.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "os" + "time" ) type FileContent struct { @@ -44,6 +45,45 @@ func ReadLineContent(location string) FileContent { return FileContent{Lines: lines} } +func ExampleContents(location string) { + sample := ReadLineContent(location) + PrintFileContent(sample) + OutputStatistics(sample) +} + +func RepublishContents(location string, mqttBrokerIP string, mqttBrokerTopic string, mqttBrokerPort int) error { + if _, err := os.Stat(location); os.IsNotExist(err) { + fmt.Println(">> File not found: ", location) + return err + } + + fileContent := ReadLineContent(location) + + fmt.Println(">> Server: ", mqttBrokerIP) + fmt.Println(">> Topic: ", mqttBrokerTopic) + fmt.Println(">> Port: ", mqttBrokerPort) + + start := time.Now() + + for i := 1; i <= len(fileContent.Lines); i++ { + Observability(mqttBrokerIP, mqttBrokerPort, mqttBrokerTopic, fileContent.Lines[i]) + } + + t := time.Now() + + elapsed := t.Sub(start) + + if len(fileContent.Lines) == 0 { + fmt.Println(">> No message content sent.") + } else if len(fileContent.Lines) == 1 { + fmt.Println(">> Sent message from ", location, " to topic: ", mqttBrokerTopic, " in ", elapsed) + } else { + fmt.Println(">> Sent ", len(fileContent.Lines), " messages from ", location, " to topic: ", mqttBrokerTopic, " in ", elapsed) + } + + return nil +} + func OutputStatistics(content FileContent) { // ? Contextual information about the file content. fmt.Println(">> Number of lines: ", len(content.Lines)) diff --git a/utility/stream.go b/utility/stream.go index bb25e3f..11d41de 100644 --- a/utility/stream.go +++ b/utility/stream.go @@ -5,18 +5,23 @@ import ( "encoding/json" "fmt" "log" + "net" "os" + "os/signal" "sync" "time" mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/google/uuid" ) type Message struct { - Index int - Topic string - Payload string - Checksum string + Index int + Topic string + Payload string + Checksum string + UUID string + Timestamp time.Time } var ( @@ -28,18 +33,22 @@ func Craft(topic, payload string) Message { counterMutex.Lock() defer counterMutex.Unlock() + uuid := uuid.New().String() + // TODO: Independent of the topic, the message counter should be incremented? messageCounter++ return Message{ - Index: messageCounter, - Topic: topic, - Payload: payload, - Checksum: Verification(payload), + Index: messageCounter, + Topic: topic, + Payload: payload, + Checksum: Verification(payload), + UUID: uuid, + Timestamp: time.Now(), } } -func Observability(server string, port int, topic string, message string) { +func Observability(server string, port int, topic string, message string) error { broker := fmt.Sprintf("tcp://%s:%d", server, port) clientID := "go_mqtt_client" @@ -47,7 +56,7 @@ func Observability(server string, port int, topic string, message string) { client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { - panic(token.Error()) + return fmt.Errorf(">> Failed to connect to the broker: %v", token.Error()) } defer client.Disconnect(250) // ms @@ -56,11 +65,16 @@ func Observability(server string, port int, topic string, message string) { jsonPackage, err := json.Marshal(sample) if err != nil { - panic(err) + return fmt.Errorf(">> Failed to marshal the message: %v", err) } token := client.Publish(topic, 0, false, jsonPackage) token.Wait() + if token.Error() != nil { + return fmt.Errorf(">> Failed to publish the message: %v", token.Error()) + } + + return nil } func Republisher(server string, port int, topic string, message string) { @@ -113,6 +127,56 @@ func Republisher(server string, port int, topic string, message string) { } +func Subscription(server string, port int, topic string, host string, destination int) { + fmt.Println(">> Example Broker Activity") + fmt.Println(">> Broker: ", server) + fmt.Println(">> Port: ", port) + + // MQTT Broker / Client + url := fmt.Sprintf("tcp://%s:%d", server, port) + opts := mqtt.NewClientOptions().AddBroker(url) + client := mqtt.NewClient(opts) + + if token := client.Connect(); token.Wait() && token.Error() != nil { + fmt.Println(">> [!] Failed to connect to the broker: ", token.Error()) + } + + // Callback Function (Incoming Messages) + handleMessage := func(client mqtt.Client, msg mqtt.Message) { + fmt.Printf(">> Received message on topic: '%s': %s\n", msg.Topic(), msg.Payload()) + + // Connection Establishment (Target Host) + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, destination)) + if err != nil { + fmt.Println(">> [!] Error connecting to the target host: ", err) + return + } + defer conn.Close() + + // Data Transmission + _, err = conn.Write(msg.Payload()) + if err != nil { + fmt.Println(">> [!] Error writing to the target host: ", err) + return + } + } + + // Subscription (Topic) + if token := client.Subscribe(topic, 0, handleMessage); token.Wait() && token.Error() != nil { + if token.Error() != nil { + fmt.Println(">> [!] Error subscribing to the topic: ", token.Error()) + } + } + + // Client Shutdown (SIGINT) + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c + + client.Unsubscribe(topic) + client.Disconnect(250) // ms +} + func Verification(data string) string { hash := md5.New() hash.Write([]byte(data))