Skip to content

Commit

Permalink
chore: reorganize the extension package
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Jan 31, 2025
1 parent 209586b commit 0bc86e9
Show file tree
Hide file tree
Showing 7 changed files with 482 additions and 0 deletions.
2 changes: 2 additions & 0 deletions examples/file/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
file
out.txt
12 changes: 12 additions & 0 deletions examples/file/in.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
January
February
March
April
May
June
July
August
September
October
November
December
30 changes: 30 additions & 0 deletions examples/file/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package main

import (
ext "github.com/reugn/go-streams/extension"
"github.com/reugn/go-streams/flow"
)

func main() {
source := ext.NewFileSource("in.txt")
reverseMapFlow := flow.NewMap(reverseString, 1)
newLineMapFlow := flow.NewMap(addNewLine, 1)
sink := ext.NewFileSink("out.txt")

source.
Via(reverseMapFlow).
Via(newLineMapFlow).
To(sink)
}

func reverseString(s string) string {
runes := []rune(s)
for i, j := 0, len(runes)-1; i < j; i, j = i+1, j-1 {
runes[i], runes[j] = runes[j], runes[i]
}
return string(runes)
}

func addNewLine(s string) string {
return s + "\n"
}
137 changes: 137 additions & 0 deletions extension/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package extension

import (
"bufio"
"fmt"
"log"
"os"

"github.com/reugn/go-streams"
"github.com/reugn/go-streams/flow"
)

// FileSource represents an inbound connector that creates a stream of
// elements from a file. The streaming element is a new line in the file.
type FileSource struct {
fileName string
in chan any
}

var _ streams.Source = (*FileSource)(nil)

// NewFileSource returns a new FileSource connector.
func NewFileSource(fileName string) *FileSource {
fileSource := &FileSource{
fileName: fileName,
in: make(chan any),
}

// asynchronously send file data downstream
go fileSource.process()

return fileSource
}

func (fs *FileSource) process() {
file, err := os.Open(fs.fileName)
if err != nil {
log.Fatalf("FileSource failed to open the file %s: %v", fs.fileName, err)
}
defer func() {
if err := file.Close(); err != nil {
log.Printf("FileSource failed to close the file %s: %v", fs.fileName, err)
}
}()

scanner := bufio.NewScanner(file)
for scanner.Scan() {
// send the file line downstream
fs.in <- scanner.Text()
}

// check for errors that occurred during scanning
if err := scanner.Err(); err != nil {
log.Printf("FileSource scanner error: %v", err)
}

close(fs.in)
}

// Via asynchronously streams data to the given Flow and returns it.
func (fs *FileSource) Via(operator streams.Flow) streams.Flow {
flow.DoStream(fs, operator)
return operator
}

// Out returns the output channel of the FileSource connector.
func (fs *FileSource) Out() <-chan any {
return fs.in
}

// FileSink represents an outbound connector that writes streaming data
// to a file.
type FileSink struct {
fileName string
in chan any
done chan struct{}
}

var _ streams.Sink = (*FileSink)(nil)

// NewFileSink returns a new FileSink connector.
func NewFileSink(fileName string) *FileSink {
fileSink := &FileSink{
fileName: fileName,
in: make(chan any),
done: make(chan struct{}),
}

// asynchronously process stream data
go fileSink.process()

return fileSink
}

func (fs *FileSink) process() {
defer close(fs.done)

file, err := os.Create(fs.fileName)
if err != nil {
log.Fatalf("FileSink failed to open the file %s: %v", fs.fileName, err)
}
defer func() {
if err := file.Close(); err != nil {
log.Printf("FileSink failed to close the file %s: %v", fs.fileName, err)
}
}()

for element := range fs.in {
var stringElement string
switch v := element.(type) {
case string:
stringElement = v
case fmt.Stringer:
stringElement = v.String()
default:
log.Printf("FileSink received an unsupported type %T, discarding", v)
continue
}

// Write the processed string element to the file. If an error occurs,
// terminate the sink.
if _, err := file.WriteString(stringElement); err != nil {
log.Fatalf("FileSink failed to write to the file %s: %v", fs.fileName, err)
}
}
}

// In returns the input channel of the FileSink connector.
func (fs *FileSink) In() chan<- any {
return fs.in
}

// AwaitCompletion blocks until the FileSink has completed processing and
// flushing all data to the file.
func (fs *FileSink) AwaitCompletion() {
<-fs.done
}
Loading

0 comments on commit 0bc86e9

Please sign in to comment.