From b20bafdedd03dd264ad296855101348d996b4111 Mon Sep 17 00:00:00 2001 From: reugn Date: Fri, 31 Jan 2025 11:30:43 +0200 Subject: [PATCH] remove renamed files --- examples/fs/.gitignore | 2 - examples/fs/in.txt | 12 --- examples/fs/main.go | 30 ------ extension/fs.go | 137 -------------------------- extension/net.go | 213 ----------------------------------------- extension/std.go | 85 ---------------- 6 files changed, 479 deletions(-) delete mode 100644 examples/fs/.gitignore delete mode 100644 examples/fs/in.txt delete mode 100644 examples/fs/main.go delete mode 100644 extension/fs.go delete mode 100644 extension/net.go delete mode 100644 extension/std.go diff --git a/examples/fs/.gitignore b/examples/fs/.gitignore deleted file mode 100644 index fc6ab12..0000000 --- a/examples/fs/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -fs -out.txt \ No newline at end of file diff --git a/examples/fs/in.txt b/examples/fs/in.txt deleted file mode 100644 index 09c4565..0000000 --- a/examples/fs/in.txt +++ /dev/null @@ -1,12 +0,0 @@ -January -February -March -April -May -June -July -August -September -October -November -December \ No newline at end of file diff --git a/examples/fs/main.go b/examples/fs/main.go deleted file mode 100644 index 5395bea..0000000 --- a/examples/fs/main.go +++ /dev/null @@ -1,30 +0,0 @@ -package main - -import ( - ext "github.com/reugn/go-streams/extension" - "github.com/reugn/go-streams/flow" -) - -func main() { - source := ext.NewFileSource("in.txt") - reverseMapFlow := flow.NewMap(reverseString, 1) - newLineMapFlow := flow.NewMap(addNewLine, 1) - sink := ext.NewFileSink("out.txt") - - source. - Via(reverseMapFlow). - Via(newLineMapFlow). - To(sink) -} - -func reverseString(s string) string { - runes := []rune(s) - for i, j := 0, len(runes)-1; i < j; i, j = i+1, j-1 { - runes[i], runes[j] = runes[j], runes[i] - } - return string(runes) -} - -func addNewLine(s string) string { - return s + "\n" -} diff --git a/extension/fs.go b/extension/fs.go deleted file mode 100644 index 0ff011c..0000000 --- a/extension/fs.go +++ /dev/null @@ -1,137 +0,0 @@ -package extension - -import ( - "bufio" - "fmt" - "log" - "os" - - "github.com/reugn/go-streams" - "github.com/reugn/go-streams/flow" -) - -// FileSource represents an inbound connector that creates a stream of -// elements from a file. The streaming element is a new line in the file. -type FileSource struct { - fileName string - in chan any -} - -var _ streams.Source = (*FileSource)(nil) - -// NewFileSource returns a new FileSource connector. -func NewFileSource(fileName string) *FileSource { - fileSource := &FileSource{ - fileName: fileName, - in: make(chan any), - } - - // asynchronously send file data downstream - go fileSource.process() - - return fileSource -} - -func (fs *FileSource) process() { - file, err := os.Open(fs.fileName) - if err != nil { - log.Fatalf("FileSource failed to open the file %s: %v", fs.fileName, err) - } - defer func() { - if err := file.Close(); err != nil { - log.Printf("FileSource failed to close the file %s: %v", fs.fileName, err) - } - }() - - scanner := bufio.NewScanner(file) - for scanner.Scan() { - // send the file line downstream - fs.in <- scanner.Text() - } - - // check for errors that occurred during scanning - if err := scanner.Err(); err != nil { - log.Printf("FileSource scanner error: %v", err) - } - - close(fs.in) -} - -// Via asynchronously streams data to the given Flow and returns it. -func (fs *FileSource) Via(operator streams.Flow) streams.Flow { - flow.DoStream(fs, operator) - return operator -} - -// Out returns the output channel of the FileSource connector. -func (fs *FileSource) Out() <-chan any { - return fs.in -} - -// FileSink represents an outbound connector that writes streaming data -// to a file. -type FileSink struct { - fileName string - in chan any - done chan struct{} -} - -var _ streams.Sink = (*FileSink)(nil) - -// NewFileSink returns a new FileSink connector. -func NewFileSink(fileName string) *FileSink { - fileSink := &FileSink{ - fileName: fileName, - in: make(chan any), - done: make(chan struct{}), - } - - // asynchronously process stream data - go fileSink.process() - - return fileSink -} - -func (fs *FileSink) process() { - defer close(fs.done) - - file, err := os.Create(fs.fileName) - if err != nil { - log.Fatalf("FileSink failed to open the file %s: %v", fs.fileName, err) - } - defer func() { - if err := file.Close(); err != nil { - log.Printf("FileSink failed to close the file %s: %v", fs.fileName, err) - } - }() - - for element := range fs.in { - var stringElement string - switch v := element.(type) { - case string: - stringElement = v - case fmt.Stringer: - stringElement = v.String() - default: - log.Printf("FileSink received an unsupported type %T, discarding", v) - continue - } - - // Write the processed string element to the file. If an error occurs, - // terminate the sink. - if _, err := file.WriteString(stringElement); err != nil { - log.Fatalf("FileSink failed to write to the file %s: %v", fs.fileName, err) - } - } -} - -// In returns the input channel of the FileSink connector. -func (fs *FileSink) In() chan<- any { - return fs.in -} - -// AwaitCompletion blocks until the FileSink has completed processing and -// flushing all data to the file. -func (fs *FileSink) AwaitCompletion() { - <-fs.done -} diff --git a/extension/net.go b/extension/net.go deleted file mode 100644 index b67a494..0000000 --- a/extension/net.go +++ /dev/null @@ -1,213 +0,0 @@ -package extension - -import ( - "bufio" - "context" - "fmt" - "log" - "net" - "time" - - "github.com/reugn/go-streams" - "github.com/reugn/go-streams/flow" -) - -// ConnType represents a network connection type. -type ConnType string - -const ( - // TCP connection type. - TCP ConnType = "tcp" - // UDP connection type. - UDP ConnType = "udp" -) - -// NetSource represents an inbound network socket connector. -type NetSource struct { - ctx context.Context - conn net.Conn - listener net.Listener - connType ConnType - out chan any -} - -var _ streams.Source = (*NetSource)(nil) - -// NewNetSource returns a new NetSource connector. -func NewNetSource(ctx context.Context, connType ConnType, address string) (*NetSource, error) { - var ( - conn net.Conn - listener net.Listener - ) - - out := make(chan any) - switch connType { - case TCP: - addr, err := net.ResolveTCPAddr(string(connType), address) - if err != nil { - return nil, fmt.Errorf("failed to ResolveTCPAddr: %w", err) - } - - listener, err = net.ListenTCP(string(connType), addr) - if err != nil { - return nil, fmt.Errorf("failed to ListenTCP: %w", err) - } - - go acceptConnections(listener, out) - case UDP: - addr, err := net.ResolveUDPAddr(string(connType), address) - if err != nil { - return nil, fmt.Errorf("failed to ResolveUDPAddr: %w", err) - } - - conn, err = net.ListenUDP(string(connType), addr) - if err != nil { - return nil, fmt.Errorf("failed to ListenUDP: %w", err) - } - - go handleConnection(conn, out) - default: - return nil, fmt.Errorf("invalid connection type: %s", connType) - } - - netSource := &NetSource{ - ctx: ctx, - conn: conn, - listener: listener, - connType: connType, - out: out, - } - - // start a goroutine to await the context cancellation and then - // shut down the network source - go netSource.awaitShutdown() - - return netSource, nil -} - -func (ns *NetSource) awaitShutdown() { - <-ns.ctx.Done() - - if ns.conn != nil { - _ = ns.conn.Close() - } - - if ns.listener != nil { - _ = ns.listener.Close() - } - - close(ns.out) -} - -// acceptConnections accepts new TCP connections. -func acceptConnections(listener net.Listener, out chan<- any) { - for { - // block and return the next connection to the listener - conn, err := listener.Accept() - if err != nil { - log.Printf("listener.Accept() failed with: %s", err) - return - } - - // handle the new connection - go handleConnection(conn, out) - } -} - -// handleConnection manages a single network connection, reading newline-delimited data -// from it and sending it to the provided output channel. -func handleConnection(conn net.Conn, out chan<- any) { - log.Printf("NetSource connected on: %v", conn.LocalAddr()) - reader := bufio.NewReader(conn) - - for { - bufferBytes, err := reader.ReadBytes('\n') - if len(bufferBytes) > 0 { - out <- string(bufferBytes) - } - - if err != nil { - log.Printf("handleConnection failed with: %s", err) - break - } - } - - log.Printf("Closing the NetSource connection %v", conn.LocalAddr()) - if err := conn.Close(); err != nil { - log.Printf("Failed to close connection %v", conn.LocalAddr()) - } -} - -// Via asynchronously streams data to the given Flow and returns it. -func (ns *NetSource) Via(operator streams.Flow) streams.Flow { - flow.DoStream(ns, operator) - return operator -} - -// Out returns the output channel of the NetSource connector. -func (ns *NetSource) Out() <-chan any { - return ns.out -} - -// NetSink represents an outbound network socket connector. -type NetSink struct { - conn net.Conn - connType ConnType - in chan any - done chan struct{} -} - -var _ streams.Sink = (*NetSink)(nil) - -// NewNetSink returns a new NetSink connector. -func NewNetSink(connType ConnType, address string) (*NetSink, error) { - conn, err := net.DialTimeout(string(connType), address, 10*time.Second) - if err != nil { - return nil, fmt.Errorf("failed to connect: %w", err) - } - log.Printf("NetSink connected on: %v", conn.LocalAddr()) - - netSink := &NetSink{ - conn: conn, - connType: connType, - in: make(chan any), - done: make(chan struct{}), - } - - // asynchronously process stream data - go netSink.process() - - return netSink, nil -} - -func (ns *NetSink) process() { - defer close(ns.done) - - for msg := range ns.in { - switch message := msg.(type) { - case string: - if _, err := ns.conn.Write([]byte(message)); err != nil { - log.Printf("NetSink failed to write to connection %v: %v", - ns.conn.LocalAddr(), err) - } - default: - log.Printf("NetSink unsupported message type: %T", message) - } - } - - log.Printf("Closing the NetSink connection %v", ns.conn.LocalAddr()) - if err := ns.conn.Close(); err != nil { - log.Printf("Failed to close connection %v", ns.conn.LocalAddr()) - } -} - -// In returns the input channel of the NetSink connector. -func (ns *NetSink) In() chan<- any { - return ns.in -} - -// AwaitCompletion blocks until the NetSink has processed all received data, -// closed the connection, and released all resources. -func (ns *NetSink) AwaitCompletion() { - <-ns.done -} diff --git a/extension/std.go b/extension/std.go deleted file mode 100644 index adb6679..0000000 --- a/extension/std.go +++ /dev/null @@ -1,85 +0,0 @@ -package extension - -import ( - "fmt" - - "github.com/reugn/go-streams" -) - -// StdoutSink represents a simple outbound connector that writes -// streaming data to standard output. -type StdoutSink struct { - in chan any - done chan struct{} -} - -var _ streams.Sink = (*StdoutSink)(nil) - -// NewStdoutSink returns a new StdoutSink connector. -func NewStdoutSink() *StdoutSink { - stdoutSink := &StdoutSink{ - in: make(chan any), - done: make(chan struct{}), - } - - // asynchronously process stream data - go stdoutSink.process() - - return stdoutSink -} - -func (stdout *StdoutSink) process() { - defer close(stdout.done) - for elem := range stdout.in { - fmt.Println(elem) - } -} - -// In returns the input channel of the StdoutSink connector. -func (stdout *StdoutSink) In() chan<- any { - return stdout.in -} - -// AwaitCompletion blocks until the StdoutSink has processed all received data. -func (stdout *StdoutSink) AwaitCompletion() { - <-stdout.done -} - -// IgnoreSink represents a simple outbound connector that discards -// all elements of a stream. -type IgnoreSink struct { - in chan any -} - -var _ streams.Sink = (*IgnoreSink)(nil) - -// NewIgnoreSink returns a new IgnoreSink connector. -func NewIgnoreSink() *IgnoreSink { - ignoreSink := &IgnoreSink{ - in: make(chan any), - } - - // asynchronously process stream data - go ignoreSink.process() - - return ignoreSink -} - -func (ignore *IgnoreSink) process() { - for { - _, ok := <-ignore.in - if !ok { - break - } - } -} - -// In returns the input channel of the IgnoreSink connector. -func (ignore *IgnoreSink) In() chan<- any { - return ignore.in -} - -// AwaitCompletion is a no-op for the IgnoreSink. -func (ignore *IgnoreSink) AwaitCompletion() { - // no-op -}