Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finale #54

Merged
merged 9 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Directory.json

This file was deleted.

257 changes: 119 additions & 138 deletions cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/bsach64/goback/server"
"github.com/bsach64/goback/utils"
"github.com/charmbracelet/huh"
"github.com/fsnotify/fsnotify"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -67,120 +68,154 @@ func ClientLoop(cmd *cobra.Command, args []string) {

case "Upload File":
path, err := promptForFilePath()

if err != nil {
log.Error("Could not get file path for upload", "err", err)
continue
}

stat, err := os.Stat(path)
err = uploadFile(sshC, path, worker)
if err != nil {
log.Error("Could not stat on file for upload", "err", err)
}

fileInfo := utils.FileInfo{
Filename: stat.Name(),
Size: stat.Size(),
log.Error("Could not upload file", "err", err)
continue
}

log.Infof("Sending file metadata to server: name: %v, size: %v", fileInfo.Filename, fileInfo.Size)
fileInfoDat, err := json.Marshal(&fileInfo)
case "Sync Directory":
err = watchAndUpload("./files", sshC, worker)
if err != nil {
log.Fatal("Could not marshal json", "err", err)
log.Error("Watcher with error", "err", err)
}

success, reply, err := sshC.SendRequest("start-file-upload", true, fileInfoDat)
case "Exit":
fmt.Println("Exiting client.")
_, _, err := sshC.SendRequest("close-connection", false, []byte(worker.Ip))
if err != nil {
log.Fatalf("Failed to send %s request: %v", "start-file-upload", err)
log.Error("Error while closing the connection with server")
}
sshC.Close()
return
}
}
}

if !success {
log.Fatalf("Could not start file upload: %v", string(reply))
}
func watchAndUpload(dir string, sshC *ssh.Client, worker server.Worker) error {
err := os.MkdirAll(dir, 0755)
if err != nil {
return err
}

log.Infof("Starting file upload to other clients: name: %v, size: %v", fileInfo.Filename, fileInfo.Size)
success, reply, err = sshC.SendRequest("create-backup", true, []byte("Get Worker IP"))
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}

if err != nil {
log.Fatalf("Failed to send %s request: %v", "create-backup", err)
}
log.Info("Starting to watch directory:", dir)

if !success {
log.Warn("ssh request for create-backup failed", "reply", string(reply))
continue
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
log.Error("Watcher events channel closed")
return
}
if event.Has(fsnotify.Create) {
err = uploadFile(sshC, event.Name, worker)
if err != nil {
log.Error("Could not upload newly created file", "file", event.Name, "err", err)
}
log.Info("Successfully Uploaded", "file", event.Name)
}
case err, ok := <-watcher.Errors:
if !ok {
log.Error("Watcher errors channel closed")
return
}
log.Errorf("Watcher error: %v", err)
}
}
}()

var otherWorkers []server.Worker
if err := json.Unmarshal(reply, &otherWorkers); err != nil {
log.Fatalf("failed to unmarshal response: %v", err)
}
err = watcher.Add(dir)
if err != nil {
watcher.Close() // Clean up if watcher.Add fails
return fmt.Errorf("failed to add directory to watcher: %v", err)
}

// Worker node ip and port
for _, w := range otherWorkers {
if w.Ip == worker.Ip {
continue
}
wip := fmt.Sprintf("%s:%d", w.Ip, w.Port)
// Worker node username and password for login
// Will change this to digital signature later
c := client.NewClient(w.SftpUser, w.SftpPass)
// Connect to sftp server i.e worker node
sftpClient, err := c.ConnectToServer(wip)
if err != nil {
log.Fatal("Could not connect to worker node", "err", err)
}
err = client.Upload(sftpClient, path)
select {}
}

if err != nil {
log.Fatalf("Cannot upload file to worker node %s at because %s", wip, err)
}
log.Info("Successfully Uploaded", "file", path)
func uploadFile(sshC *ssh.Client, path string, worker server.Worker) error {
stat, err := os.Stat(path)
if err != nil {
log.Error("Could not stat on file for upload", "err", err)
}

sftpClient.Close()
}
fileInfo := utils.FileInfo{
Filename: stat.Name(),
Size: stat.Size(),
}

success, reply, err = sshC.SendRequest("finish-file-upload", true, fileInfoDat)
if err != nil {
log.Fatalf("Failed to send %s request: %v", "finish-file-upload", err)
}
log.Infof("Sending file metadata to server: name: %v, size: %v", fileInfo.Filename, fileInfo.Size)
fileInfoDat, err := json.Marshal(&fileInfo)
if err != nil {
return fmt.Errorf("Could not marshal json: %v", err)
}

if !success {
log.Warn("ssh request for finish-file-upload failed", "reply", string(reply))
continue
}
success, reply, err := sshC.SendRequest("start-file-upload", true, fileInfoDat)
if err != nil {
return fmt.Errorf("Failed to send %s request: %v", "start-file-upload", err)
}

case "Add Directory to Sync":
dir, err := promptForDirectory()
if err != nil {
log.Error("Could not get directory for sync", "err", err)
continue
}
if !success {
return fmt.Errorf("Could not start file upload: reply: %v", string(reply))
}

if dir == "" {
dir = "./.data" // Default directory
}
log.Infof("Starting file upload to other clients: name: %v, size: %v", fileInfo.Filename, fileInfo.Size)
success, reply, err = sshC.SendRequest("create-backup", true, []byte("Get Worker IP"))

log.Infof("Updating Directory.json with directory: %s", dir)
if err != nil {
return fmt.Errorf("Failed to send %s request err: %v", "create-backup", err)
}

// Update the JSON file
configPath := "Directory.json"
err = updateDirectoryInConfig(configPath, dir)
if err != nil {
log.Error("Failed to update directory in config file", "err", err)
continue
}
if !success {
return fmt.Errorf("ssh request for create-backup failed: reply: %v", string(reply))
}

log.Infof("Directory.json updated successfully with: %s", dir)
case "Exit":
fmt.Println("Exiting client.")
_, _, err := sshC.SendRequest("close-connection", false, []byte(worker.Ip))
if err != nil {
log.Error("Error while closing the connection with server")
}
sshC.Close()
return
var otherWorkers []server.Worker
if err := json.Unmarshal(reply, &otherWorkers); err != nil {
return fmt.Errorf("failed to unmarshal response: %v", err)
}

// Worker node ip and port
for _, w := range otherWorkers {
if w.Ip == worker.Ip {
continue
}
wip := fmt.Sprintf("%s:%d", w.Ip, w.Port)
c := client.NewClient(w.SftpUser, w.SftpPass)
// Connect to sftp server i.e worker node
sftpClient, err := c.ConnectToServer(wip)
if err != nil {
log.Fatal("Could not connect to worker node", "err", err)
}
err = client.Upload(sftpClient, path)

if err != nil {
log.Fatalf("Cannot upload file to worker node %s at because %s", wip, err)
}
log.Info("Successfully Uploaded", "file", path)
sftpClient.Close()
}

success, reply, err = sshC.SendRequest("finish-file-upload", true, fileInfoDat)
if err != nil {
return fmt.Errorf("Failed to send %s request: err = %v", "finish-file-upload", err)
}

if !success {
return fmt.Errorf("ssh request for finish-file-upload failed: reply: %v", string(reply))
}
return nil
}

func CreateWorker() (server.Worker, error) {
Expand Down Expand Up @@ -209,25 +244,6 @@ func SendWorkerDetails(worker server.Worker, sshC *ssh.Client) error {
return nil
}

func promptForDirectory() (string, error) {
var directory string
directoryPrompt := huh.NewForm(
huh.NewGroup(
huh.NewInput().
Title("Enter Directory to watch").
Prompt("? ").
Placeholder(".data").
Suggestions([]string{"./.data"}).
Value(&directory),
),
)
err := directoryPrompt.Run()
if err != nil {
return "", err
}
return directory, nil
}

func promptForIP() (string, error) {
var ip string
ipPrompt := huh.NewForm(
Expand Down Expand Up @@ -256,8 +272,7 @@ func promptForAction() (string, error) {
Title("Choose an option:").
Options(
huh.NewOption("Upload File", "Upload File"),
huh.NewOption("List Directory", "List Directory"),
huh.NewOption("Add Directory to Sync", "Add Directory to Sync"),
huh.NewOption("Sync Directory", "Sync Directory"),
huh.NewOption("Exit", "Exit"),
).
Value(&selectedOption),
Expand Down Expand Up @@ -300,40 +315,6 @@ func validateFilePath(input string) error {
return nil
}

func updateDirectoryInConfig(path, newDir string) error {
config := server.Config{}

if _, err := os.Stat(path); os.IsNotExist(err) {
config.Directory = newDir
} else {
file, err := os.Open(path)
if err != nil {
return fmt.Errorf("failed to open config file: %w", err)
}
defer file.Close()

decoder := json.NewDecoder(file)
if err := decoder.Decode(&config); err != nil {
return fmt.Errorf("failed to parse config file: %w", err)
}

config.Directory = newDir
}

file, err := os.Create(path)
if err != nil {
return fmt.Errorf("failed to create or truncate config file: %w", err)
}
defer file.Close()

encoder := json.NewEncoder(file)
if err := encoder.Encode(&config); err != nil {
return fmt.Errorf("failed to write updated config to file: %w", err)
}

return nil
}

func init() {
// Persistent flags for subcommands
rootCmd.AddCommand(clientCmd)
Expand Down
71 changes: 0 additions & 71 deletions cmd/reconstruct.go

This file was deleted.

Loading
Loading