Skip to content
This repository has been archived by the owner on Nov 14, 2023. It is now read-only.

Commit

Permalink
Check if dataset already is registered
Browse files Browse the repository at this point in the history
  • Loading branch information
jbygdell committed Nov 30, 2022
1 parent 8af2ece commit 2f3b157
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 6 deletions.
14 changes: 13 additions & 1 deletion cmd/syncapi/syncapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,11 @@ func dataset(w http.ResponseWriter, r *http.Request) {
}

if err := parseDatasetMessage(b); err != nil {
w.WriteHeader(http.StatusInternalServerError)
if err.Error() == "Dataset exists" {
w.WriteHeader(http.StatusAlreadyReported)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
}

w.WriteHeader(http.StatusOK)
Expand All @@ -240,6 +244,14 @@ func parseDatasetMessage(msg []byte) error {
blob := syncDataset{}
_ = json.Unmarshal(msg, &blob)

ds, err := Conf.API.DB.CheckIfDatasetExists(blob.DatasetID)
if err != nil {
return fmt.Errorf("Failed to check dataset existance: Reason %v", err)
}
if ds {
return fmt.Errorf("Dataset exists")
}

var accessionIDs []string
for _, files := range blob.DatasetFiles {
ingest := common.Ingest{
Expand Down
16 changes: 13 additions & 3 deletions cmd/syncapi/syncapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,25 @@ func TestDatasetRoute(t *testing.T) {
if err != nil {
t.Skip("skip TestShutdown since broker not present")
}
assert.NoError(t, err)
Conf.Database = database.DBConf{
Host: "localhost",
Port: 5432,
User: "postgres",
Password: "postgres",
Database: "lega",
SslMode: "disable",
}
Conf.API.DB, err = database.NewDB(Conf.Database)
if err != nil {
t.Skip("skip TestShutdown since broker not present")
}

r := mux.NewRouter()
r.HandleFunc("/dataset", dataset)
ts := httptest.NewServer(r)
defer ts.Close()

goodJSON := []byte(`{"user":"[email protected]", "dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e7", "dataset_files": [{"filepath": "inbox/user/file1.c4gh","file_id": "5fe7b660-afea-4c3a-88a9-3daabf055ebb", "sha256": "82E4e60e7beb3db2e06A00a079788F7d71f75b61a4b75f28c4c942703dabb6d6"}, {"filepath": "inbox/user/file2.c4gh","file_id": "ed6af454-d910-49e3-8cda-488a6f246e76", "sha256": "c967d96e56dec0f0cfee8f661846238b7f15771796ee1c345cae73cd812acc2b"}]}`)
goodJSON := []byte(`{"user":"[email protected]", "dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e6", "dataset_files": [{"filepath": "inbox/user/file1.c4gh","file_id": "5fe7b660-afea-4c3a-88a9-3daabf055ebb", "sha256": "82E4e60e7beb3db2e06A00a079788F7d71f75b61a4b75f28c4c942703dabb6d6"}, {"filepath": "inbox/user/file2.c4gh","file_id": "ed6af454-d910-49e3-8cda-488a6f246e76", "sha256": "c967d96e56dec0f0cfee8f661846238b7f15771796ee1c345cae73cd812acc2b"}]}`)
good, err := http.Post(ts.URL+"/dataset", "application/json", bytes.NewBuffer(goodJSON))
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, good.StatusCode)
Expand Down Expand Up @@ -262,7 +273,6 @@ func TestBuildJSON(t *testing.T) {
if err != nil {
t.Skip("skip TestShutdown since broker not present")
}
assert.NoError(t, err)

db := Conf.API.DB.DB

Expand Down
36 changes: 35 additions & 1 deletion internal/database/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"hash"
"math"
"path/filepath"
"strings"
"time"
Expand All @@ -25,6 +26,7 @@ type Database interface {
MarkReady(accessionID, user, filepath, checksum string) error
GetArchived(user, filepath, checksum string) (string, int, error)
GetSyncData(accessionID string) (SyncData, error)
CheckIfDatasetExists(datasetID string) (bool, error)
Close()
}

Expand Down Expand Up @@ -491,7 +493,7 @@ func (dbs *SQLdb) getSyncData(accessionID string) (SyncData, error) {
dbs.checkAndReconnectIfNeeded()

db := dbs.DB
const query = "SELECT elixir_id, inbox_path, decrypted_file_checksum from local_ega.files WHERE stable_id = $1 AND status = 'READY'"
const query = "SELECT elixir_id, inbox_path, decrypted_file_checksum from local_ega.files WHERE stable_id = $1 AND status = 'READY';"

var data SyncData
if err := db.QueryRow(query, accessionID).Scan(&data.User, &data.FilePath, &data.Checksum); err != nil {
Expand All @@ -500,3 +502,35 @@ func (dbs *SQLdb) getSyncData(accessionID string) (SyncData, error) {

return data, nil
}

// CheckIfDatasetExists checks if a dataset already is registered
func (dbs *SQLdb) CheckIfDatasetExists(datasetID string) (bool, error) {
var (
ds bool
err error
count int
)

for count = 1; count <= dbRetryTimes; count++ {
ds, err = dbs.checkIfDatasetExists(datasetID)
if err == nil {
break
}
time.Sleep(time.Duration(math.Pow(3, float64(count))) * time.Second)
}

return ds, err
}

// getSyncData is the actual function performing work for GetSyncData
func (dbs *SQLdb) checkIfDatasetExists(datasetID string) (bool, error) {
dbs.checkAndReconnectIfNeeded()

const query = "SELECT EXISTS(SELECT id from local_ega_ebi.filedataset WHERE dataset_stable_id = $1);"
var yesNo bool
if err := dbs.DB.QueryRow(query, datasetID).Scan(&yesNo); err != nil {
return yesNo, err
}

return yesNo, nil
}
16 changes: 15 additions & 1 deletion internal/database/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func TestClose(t *testing.T) {

func TestGetSyncData(t *testing.T) {
r := sqlTesterHelper(t, func(mock sqlmock.Sqlmock, testDb *SQLdb) error {
mock.ExpectQuery("SELECT elixir_id, inbox_path, decrypted_file_checksum from local_ega.files WHERE stable_id = \\$1 AND status = 'READY'").
mock.ExpectQuery("SELECT elixir_id, inbox_path, decrypted_file_checksum from local_ega.files WHERE stable_id = \\$1 AND status = 'READY';").
WithArgs("accessionId").WillReturnRows(sqlmock.NewRows([]string{"elixir_id", "inbox_path", "decrypted_file_checksum"}).AddRow("dummy", "/file/paht", "abc123"))

s, err := testDb.GetSyncData("accessionId")
Expand All @@ -547,3 +547,17 @@ func TestGetSyncData(t *testing.T) {

assert.Nil(t, r, "GetSyncData failed unexpectedly")
}

func TestCheckIfDatasetExists(t *testing.T) {
r := sqlTesterHelper(t, func(mock sqlmock.Sqlmock, testDb *SQLdb) error {
mock.ExpectQuery("SELECT EXISTS\\(SELECT id from local_ega_ebi.filedataset WHERE dataset_stable_id = \\$1\\);").
WithArgs("datasetID").WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow("true"))

s, err := testDb.checkIfDatasetExists("datasetID")
assert.True(t, s)

return err
})

assert.Nil(t, r, "GetSyncData failed unexpectedly")
}

0 comments on commit 2f3b157

Please sign in to comment.