Skip to content

Commit

Permalink
feat(asset): use tx for upsert
Browse files Browse the repository at this point in the history
  • Loading branch information
haveiss committed Nov 16, 2023
1 parent a4ec45c commit 3163d11
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 23 deletions.
8 changes: 6 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ services:
networks:
- storage
volumes:
- ./temp/esdata:/usr/share/elasticsearch/data
- esdata:/usr/share/elasticsearch/data

postgres:
ports:
Expand All @@ -21,7 +21,11 @@ services:
networks:
- storage
volumes:
- ./temp/pgdata:/var/lib/postgresql/data
- pgdata:/var/lib/postgresql/data

volumes:
pgdata:
esdata:

networks:
storage:
52 changes: 31 additions & 21 deletions internal/store/postgres/asset_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,35 +279,45 @@ func (r *AssetRepository) getByVersion(
// It updates if asset does exist.
// Checking existence is done using "urn", "type", and "service" fields.
func (r *AssetRepository) Upsert(ctx context.Context, ast *asset.Asset) (string, error) {
fetchedAsset, err := r.GetByURN(ctx, ast.URN)
if errors.As(err, new(asset.NotFoundError)) {
err = nil
}
if err != nil {
return "", fmt.Errorf("error getting asset by URN: %w", err)
}
var id string
err := r.client.RunWithinTx(ctx, func(tx *sqlx.Tx) error {
fetchedAsset, err := r.GetByURN(ctx, ast.URN)
if errors.As(err, new(asset.NotFoundError)) {
err = nil
}
if err != nil {
return fmt.Errorf("error getting asset by URN: %w", err)
}

if fetchedAsset.ID == "" {
// insert flow
id, err = r.insert(ctx, ast)
if err != nil {
return fmt.Errorf("error inserting asset to DB: %w", err)
}
return nil
}

if fetchedAsset.ID == "" {
// insert flow
id, err := r.insert(ctx, ast)
// update flow
changelog, err := fetchedAsset.Diff(ast)
if err != nil {
return "", fmt.Errorf("error inserting asset to DB: %w", err)
return fmt.Errorf("error diffing two assets: %w", err)
}
return id, nil
}

// update flow
changelog, err := fetchedAsset.Diff(ast)
if err != nil {
return "", fmt.Errorf("error diffing two assets: %w", err)
}
err = r.update(ctx, fetchedAsset.ID, ast, &fetchedAsset, changelog)
if err != nil {
return fmt.Errorf("error updating asset to DB: %w", err)
}
id = fetchedAsset.ID

return nil
})

err = r.update(ctx, fetchedAsset.ID, ast, &fetchedAsset, changelog)
if err != nil {
return "", fmt.Errorf("error updating asset to DB: %w", err)
return "", err
}

return fetchedAsset.ID, nil
return id, nil
}

// DeleteByID removes asset using its ID
Expand Down

0 comments on commit 3163d11

Please sign in to comment.