package main import ( "bufio" "encoding/csv" "fmt" "net" "os" "sflow-abuse/src" "syscall" "time" ) // Function to check if an IP address is within any of the specified subnets. func isIPInSubnets(ip string, subnets []string, ignored []string) bool { addr := net.ParseIP(ip) if addr == nil { return false } for _, subnetStr := range subnets { _, subnet, err := net.ParseCIDR(subnetStr) if err != nil { fmt.Printf("Error parsing subnet %s: %v\n", subnetStr, err) continue } if subnet.Contains(addr) && !isIPInSubnets(ip, ignored, []string{}) { return true } } return false } func main() { // Define the path for the named pipe (FIFO). pipePath := "/tmp/sflow-abuse" // Create the named pipe (FIFO) if it doesn't exist. if _, err := os.Stat(pipePath); os.IsNotExist(err) { if err := syscall.Mkfifo(pipePath, 0666); err != nil { fmt.Printf("Error creating named pipe: %v\n", err) return } } fmt.Printf("Listening on named pipe (FIFO): %s\n", pipePath) // Read subnets from a file (one subnet per line). subnetsFile, err := os.Open("subnets.txt") if err != nil { fmt.Printf("Error opening subnets file: %v\n", err) return } defer subnetsFile.Close() var subnets []string scanner := bufio.NewScanner(subnetsFile) for scanner.Scan() { subnets = append(subnets, scanner.Text()) } // Read ignored subnets from a file (one subnet per line). ignoredFile, err := os.Open("ignored.txt") if err != nil { fmt.Printf("Error opening ignored file: %v\n", err) return } defer ignoredFile.Close() var ignored []string scanner = bufio.NewScanner(ignoredFile) for scanner.Scan() { ignored = append(ignored, scanner.Text()) } // Print the subnets that will be used for filtering. fmt.Printf("Loaded %d subnets: %v\n", len(subnets), subnets) fmt.Printf("Loaded %d ignored subnets: %v\n", len(ignored), ignored) // Create an instance of EventCounter to track events. eventCounter := src.NewEventCounter(10*time.Second, src.HandleAlert) go eventCounter.StartMonitoring() // Open the named pipe for reading. pipeFile, err := os.OpenFile(pipePath, os.O_RDONLY, os.ModeNamedPipe) if err != nil { fmt.Printf("Error opening named pipe: %v\n", err) return } defer pipeFile.Close() // Create a CSV reader to read data from the named pipe. csvReader := csv.NewReader(pipeFile) // Process each row of CSV data. for { row, err := csvReader.Read() if err != nil { if err.Error() == "EOF" { fmt.Println("Reached EOF, exiting.") break } else { // CSV is malformed, skip the line. continue } } // Ensure the row contains at least six columns (source IP, destination IP, and destination port). if len(row) >= 16 { if net.ParseIP(row[9]) != nil { sourceIP := row[9] // Check if the source IP is within any of the specified subnets. if isIPInSubnets(sourceIP, subnets, ignored) { destinationPort := row[15] // Count the event for the source IP and port combination. eventCounter.CountEvent(sourceIP, destinationPort) } } } } // Stop the event monitoring when finished. eventCounter.StopMonitoring() }