Skip to content

Commit

Permalink
core: Add ability to stop upload from post-receive hook (tus#279)
Browse files Browse the repository at this point in the history
* First implementation of stopping an upload from the server

* Remove unnecessary json tag

* Use golang.org/x/net/context for support in Go < 1.7
  • Loading branch information
Acconut authored May 26, 2019
1 parent 14faaaf commit d23be46
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 38 deletions.
2 changes: 1 addition & 1 deletion cmd/tusd/cli/composer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (

"github.com/tus/tusd"
"github.com/tus/tusd/filestore"
"github.com/tus/tusd/gcsstore"
"github.com/tus/tusd/limitedstore"
"github.com/tus/tusd/memorylocker"
"github.com/tus/tusd/s3store"
"github.com/tus/tusd/gcsstore"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
Expand Down
42 changes: 22 additions & 20 deletions cmd/tusd/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,27 @@ import (
)

var Flags struct {
HttpHost string
HttpPort string
HttpSock string
MaxSize int64
UploadDir string
StoreSize int64
Basepath string
Timeout int64
S3Bucket string
S3ObjectPrefix string
S3Endpoint string
GCSBucket string
FileHooksDir string
HttpHooksEndpoint string
HttpHooksRetry int
HttpHooksBackoff int
ShowVersion bool
ExposeMetrics bool
MetricsPath string
BehindProxy bool
HttpHost string
HttpPort string
HttpSock string
MaxSize int64
UploadDir string
StoreSize int64
Basepath string
Timeout int64
S3Bucket string
S3ObjectPrefix string
S3Endpoint string
GCSBucket string
FileHooksDir string
HttpHooksEndpoint string
HttpHooksRetry int
HttpHooksBackoff int
HooksStopUploadCode int
ShowVersion bool
ExposeMetrics bool
MetricsPath string
BehindProxy bool

FileHooksInstalled bool
HttpHooksInstalled bool
Expand All @@ -48,6 +49,7 @@ func ParseFlags() {
flag.StringVar(&Flags.HttpHooksEndpoint, "hooks-http", "", "An HTTP endpoint to which hook events will be sent to")
flag.IntVar(&Flags.HttpHooksRetry, "hooks-http-retry", 3, "Number of times to retry on a 500 or network timeout")
flag.IntVar(&Flags.HttpHooksBackoff, "hooks-http-backoff", 1, "Number of seconds to wait before retrying each retry")
flag.IntVar(&Flags.HooksStopUploadCode, "hooks-stop-code", 0, "Return code from post-receive hook which causes tusd to stop and delete the current upload. A zero value means that no uploads will be stopped")
flag.BoolVar(&Flags.ShowVersion, "version", false, "Print tusd version information")
flag.BoolVar(&Flags.ExposeMetrics, "expose-metrics", true, "Expose metrics about tusd usage")
flag.StringVar(&Flags.MetricsPath, "metrics-path", "/metrics", "Path under which the metrics endpoint will be accessible")
Expand Down
35 changes: 22 additions & 13 deletions cmd/tusd/cli/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,14 @@ func invokeHookSync(typ HookType, info tusd.FileInfo, captureOutput bool) ([]byt

output := []byte{}
err := error(nil)
returnCode := 0

if Flags.FileHooksInstalled {
output, err = invokeFileHook(name, typ, info, captureOutput)
output, returnCode, err = invokeFileHook(name, typ, info, captureOutput)
}

if Flags.HttpHooksInstalled {
output, err = invokeHttpHook(name, typ, info, captureOutput)
output, returnCode, err = invokeHttpHook(name, typ, info, captureOutput)
}

if err != nil {
Expand All @@ -125,18 +126,24 @@ func invokeHookSync(typ HookType, info tusd.FileInfo, captureOutput bool) ([]byt
logEv(stdout, "HookInvocationFinish", "type", string(typ), "id", info.ID)
}

if typ == HookPostReceive && Flags.HooksStopUploadCode != 0 && Flags.HooksStopUploadCode == returnCode {
logEv(stdout, "HookStopUpload", "id", info.ID)

info.StopUpload()
}

return output, err
}

func invokeHttpHook(name string, typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, error) {
func invokeHttpHook(name string, typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, int, error) {
jsonInfo, err := json.Marshal(info)
if err != nil {
return nil, err
return nil, 0, err
}

req, err := http.NewRequest("POST", Flags.HttpHooksEndpoint, bytes.NewBuffer(jsonInfo))
if err != nil {
return nil, err
return nil, 0, err
}

req.Header.Set("Hook-Name", name)
Expand All @@ -152,27 +159,27 @@ func invokeHttpHook(name string, typ HookType, info tusd.FileInfo, captureOutput

resp, err := client.Do(req)
if err != nil {
return nil, err
return nil, 0, err
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
return nil, 0, err
}

if resp.StatusCode >= http.StatusBadRequest {
return body, hookError{fmt.Errorf("endpoint returned: %s", resp.Status), resp.StatusCode, body}
return body, resp.StatusCode, hookError{fmt.Errorf("endpoint returned: %s", resp.Status), resp.StatusCode, body}
}

if captureOutput {
return body, err
return body, resp.StatusCode, err
}

return nil, err
return nil, resp.StatusCode, err
}

func invokeFileHook(name string, typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, error) {
func invokeFileHook(name string, typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, int, error) {
hookPath := Flags.FileHooksDir + string(os.PathSeparator) + name
cmd := exec.Command(hookPath)
env := os.Environ()
Expand All @@ -182,7 +189,7 @@ func invokeFileHook(name string, typ HookType, info tusd.FileInfo, captureOutput

jsonInfo, err := json.Marshal(info)
if err != nil {
return nil, err
return nil, 0, err
}

reader := bytes.NewReader(jsonInfo)
Expand All @@ -208,5 +215,7 @@ func invokeFileHook(name string, typ HookType, info tusd.FileInfo, captureOutput
err = nil
}

return output, err
returnCode := cmd.ProcessState.ExitCode()

return output, returnCode, err
}
17 changes: 17 additions & 0 deletions datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package tusd

import (
"io"

"golang.org/x/net/context"
)

type MetaData map[string]string
Expand All @@ -25,6 +27,21 @@ type FileInfo struct {
// ordered slice containing the ids of the uploads of which the final upload
// will consist after concatenation.
PartialUploads []string

// stopUpload is the cancel function for the upload's context.Context. When
// invoked it will interrupt the writes to DataStore#WriteChunk.
stopUpload context.CancelFunc
}

// StopUpload interrupts an running upload from the server-side. This means that
// the current request body is closed, so that the data store does not get any
// more data. Furthermore, a response is sent to notify the client of the
// interrupting and the upload is terminated (if supported by the data store),
// so the upload cannot be resumed anymore.
func (f FileInfo) StopUpload() {
if f.stopUpload != nil {
f.stopUpload()
}
}

type DataStore interface {
Expand Down
60 changes: 60 additions & 0 deletions patch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,4 +485,64 @@ func TestPatch(t *testing.T) {
_, more := <-c
a.False(more)
})

SubTest(t, "StopUpload", func(t *testing.T, store *MockFullDataStore) {
gomock.InOrder(
store.EXPECT().GetInfo("yes").Return(FileInfo{
ID: "yes",
Offset: 0,
Size: 100,
}, nil),
store.EXPECT().WriteChunk("yes", int64(0), NewReaderMatcher("first ")).Return(int64(6), http.ErrBodyReadAfterClose),
store.EXPECT().Terminate("yes").Return(nil),
)

handler, _ := NewHandler(Config{
DataStore: store,
NotifyUploadProgress: true,
})

c := make(chan FileInfo)
handler.UploadProgress = c

reader, writer := io.Pipe()
a := assert.New(t)

go func() {
writer.Write([]byte("first "))

info := <-c
info.StopUpload()

// Wait a short time to ensure that the goroutine in the PATCH
// handler has received and processed the stop event.
<-time.After(10 * time.Millisecond)

// Assert that the "request body" has been closed.
_, err := writer.Write([]byte("second "))
a.Equal(err, io.ErrClosedPipe)

// Close the upload progress handler so that the main goroutine
// can exit properly after waiting for this goroutine to finish.
close(handler.UploadProgress)
}()

(&httpTest{
Method: "PATCH",
URL: "yes",
ReqHeader: map[string]string{
"Tus-Resumable": "1.0.0",
"Content-Type": "application/offset+octet-stream",
"Upload-Offset": "0",
},
ReqBody: reader,
Code: http.StatusBadRequest,
ResHeader: map[string]string{
"Upload-Offset": "",
},
}).Run(handler, t)

_, more := <-c
a.False(more)
})
}
57 changes: 53 additions & 4 deletions unrouted_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"strings"
"sync/atomic"
"time"

"golang.org/x/net/context"
)

const UploadLengthDeferred = "1"
Expand Down Expand Up @@ -70,6 +72,7 @@ var (
ErrModifyFinal = NewHTTPError(errors.New("modifying a final upload is not allowed"), http.StatusForbidden)
ErrUploadLengthAndUploadDeferLength = NewHTTPError(errors.New("provided both Upload-Length and Upload-Defer-Length"), http.StatusBadRequest)
ErrInvalidUploadDeferLength = NewHTTPError(errors.New("invalid Upload-Defer-Length header"), http.StatusBadRequest)
ErrUploadStoppedByServer = NewHTTPError(errors.New("upload has been stopped by server"), http.StatusBadRequest)
)

// UnroutedHandler exposes methods to handle requests as part of the tus protocol,
Expand Down Expand Up @@ -535,14 +538,46 @@ func (handler *UnroutedHandler) writeChunk(id string, info FileInfo, w http.Resp
// Limit the data read from the request's body to the allowed maximum
reader := io.LimitReader(r.Body, maxSize)

// We use a context object to allow the hook system to cancel an upload
uploadCtx, stopUpload := context.WithCancel(context.Background())
info.stopUpload = stopUpload
// terminateUpload specifies whether the upload should be deleted after
// the write has finished
terminateUpload := false
// Cancel the context when the function exits to ensure that the goroutine
// is properly cleaned up
defer stopUpload()

go func() {
// Interrupt the Read() call from the request body
<-uploadCtx.Done()
terminateUpload = true
r.Body.Close()
}()

if handler.config.NotifyUploadProgress {
var stop chan<- struct{}
reader, stop = handler.sendProgressMessages(info, reader)
defer close(stop)
var stopProgressEvents chan<- struct{}
reader, stopProgressEvents = handler.sendProgressMessages(info, reader)
defer close(stopProgressEvents)
}

var err error
bytesWritten, err = handler.composer.Core.WriteChunk(id, offset, reader)
if terminateUpload && handler.composer.UsesTerminater {
if terminateErr := handler.terminateUpload(id, info); terminateErr != nil {
// We only log this error and not show it to the user since this
// termination error is not relevant to the uploading client
handler.log("UploadStopTerminateError", "id", id, "error", terminateErr.Error())
}
}

// The error "http: invalid Read on closed Body" is returned if we stop the upload
// while the data store is still reading. Since this is an implementation detail,
// we replace this error with a message saying that the upload has been stopped.
if err == http.ErrBodyReadAfterClose {
err = ErrUploadStoppedByServer
}

if err != nil {
return err
}
Expand Down Expand Up @@ -735,19 +770,33 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request)
}
}

err = handler.composer.Terminater.Terminate(id)
err = handler.terminateUpload(id, info)
if err != nil {
handler.sendError(w, r, err)
return
}

handler.sendResp(w, r, http.StatusNoContent)
}

// terminateUpload passes a given upload to the DataStore's Terminater,
// send the corresponding upload info on the TerminatedUploads channnel
// and updates the statistics.
// Note the the info argument is only needed if the terminated uploads
// notifications are enabled.
func (handler *UnroutedHandler) terminateUpload(id string, info FileInfo) error {
err := handler.composer.Terminater.Terminate(id)
if err != nil {
return err
}

if handler.config.NotifyTerminatedUploads {
handler.TerminatedUploads <- info
}

handler.Metrics.incUploadsTerminated()

return nil
}

// Send the error in the response body. The status code will be looked up in
Expand Down
5 changes: 5 additions & 0 deletions utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func (m readerMatcher) Matches(x interface{}) bool {
}

bytes, err := ioutil.ReadAll(input)
// Handle closed pipes similar to how EOF are handled by ioutil.ReadAll,
// we handle this error as if the stream ended normally.
if err == io.ErrClosedPipe {
err = nil
}
if err != nil {
panic(err)
}
Expand Down

0 comments on commit d23be46

Please sign in to comment.