Skip to content

Commit

Permalink
Prepare for release, revamp and improved a bit `generate-csv/inject-c…
Browse files Browse the repository at this point in the history
…sv` flow
  • Loading branch information
maoueh committed Sep 1, 2023
1 parent fee53f9 commit 82bb02d
Show file tree
Hide file tree
Showing 13 changed files with 306 additions and 247 deletions.
37 changes: 27 additions & 10 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,23 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## v2.4.1
## v2.5.0

### Highlights

This releases brings improvements to reported progress message while your Substreams executes which should greatly enhanced progression tracking

> [!NOTE]
> Stay tuned, we are planning even more useful progression tracking now that we've updated progression data sent back to the client!
This releases also introduces a new mode to dump data in the database at high speed, useful for large amount of data insertion.

### Substreams Progress Messages

> [!IMPORTANT]
> This client only support progress messages sent from a to a server with substreams version >=v1.1.12
Bumped [substreams-sink](https://github.com/streamingfast/substreams-sink) [v0.3.1](https://github.com/streamingfast/substreams-sink/releases/tag/v0.3.1) and [substreams](https://github.com/streamingfast/substreams) to [v1.1.12](https://github.com/streamingfast/substreams/releases/tag/v1.1.12) to support the new progress message format. Progression now relates to **stages** instead of modules. You can get stage information using the `substreams info` command starting from version `v1.1.12`.

* Bumped substreams-sink to `v0.3.1` and substreams to `v1.1.12` to support the new progress message format. Progression now relates to **stages** instead of modules. You can get stage information using the `substreams info` command starting at version `v1.1.12`.
> [!IMPORTANT]
> This client only support progress messages sent from a server using `substreams` version `>=v1.1.12`
#### Changed Prometheus Metrics

Expand All @@ -21,17 +30,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

#### Added Prometheus Metrics

* added `substreams_sink_progress_message_last_contiguous_block` (per stage)
* added `substreams_sink_progress_message_running_jobs`(per stage)
* Added `substreams_sink_progress_message_last_contiguous_block` (per stage)
* Added `substreams_sink_progress_message_running_jobs`(per stage)

### New injection method

A new injection method has been added to this `substreams-sink-postgres` release. It's a 2 steps method that leverage `COPY FROM` SQL operations to inject at high speed a great quantity of data.

> [!NOTE]
> This method will be useful if you insert a lot of data into the database. If the standard ingestion speed satisfy your needs, continue to use it, the new feature is an advanced use case.
See the [High Throughput Injection section](https://github.com/streamingfast/substreams-sink-postgres/blob/develop/README.md#high-throughput-injection) of the `README.md` file to check how to use it.

### Added

* Added newer method of populating the database via CSV
* Added newer method of populating the database via CSV (thanks [@gusinacio](https://github.com/gusinacio)!).

Newer commands:
- `generate_csv`: Generates CSVs for each table
- `insert_csv`: Injects generated CSV rows for <table>
- `inject_cursor`: Injects the cursor from a file into database
- `generate-csv`: Generates CSVs for each table
- `inject-csv`: Injects generated CSV rows for `<table>`

## v2.4.0

Expand Down
61 changes: 61 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,64 @@ psql://<user>:<password>@<host>/<dbname>[?<options>]
Where `<options>` is URL query parameters in `<key>=<value>` format, multiple options are separated by `&` signs. Supported options can be seen [on libpq official documentation](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS). The options `<user>`, `<password>`, `<host>` and `<dbname>` should **not** be passed in `<options>` as they are automatically extracted from the DSN URL.

Moreover, the `schema` option key can be used to select a particular schema within the `<dbname>` database.

### Advanced Topics

#### High Throughput Injection

> [!IMPORTANT]
> This method will be useful if you insert a lot of data into the database. If the standard ingestion speed satisfy your needs, continue to use it, the steps below are an advanced use case.

The `substreams-sink-postgres` contains a fast injection mechanism for cases where big data needs to be dump into the database. In those cases, it may be preferable to dump every files to CSV and then use `COPYFROM` to transfer data super quick to Postgres.

The idea is to first dump the Substreams data to `CSV` files using `substreams-sink-postgres generate-csv` command:

```bash
substreams-sink-postgres generate-csv "psql://dev-node:insecure-change-me-in-prod@localhost:5432/dev-node?sslmode=disable" mainnet.eth.streamingfast.io:443 <spkg> db_out ./data/tables :14490000
```

> [!NOTE]
> We are using 14490000 as our stop block, pick you stop block close to chain's HEAD or smaller like us to perform an experiment, adjust to your needs.
This will generate block segmented CSV files for each table in your schema inside the folder `./data/tables`. Next step is to actually inject those CSV files into your database. You can use `psql` and inject directly with it.
We offer `substreams-sink-postgres inject-csv` command as a convenience. It's a per table invocation but feel free to run each table concurrently, your are bound by your database as this point, so it's up to you to decide you much concurrency you want to use. Here a small `Bash` command to loop through all tables and inject them all
```bash
for i in `ls ./data/tables | grep -v state.yaml`; do \
substreams-sink-postgres inject-csv "psql://dev-node:insecure-change-me-in-prod@localhost:5432/dev-node?sslmode=disable" ./data/tables "$i" :14490000; \
if [[ $? != 0 ]]; then break; fi; \
done
```
Those files are then inserted in the database efficiently by doing a `COPY FROM` and reading the data from a network pipe directly.
The command above will also pick up the `cursors` table injection as it's a standard table to write. The table is a bit special as it contains a single file which is contains the `cursor` that will handoff between CSV injection and going back to "live" blocks. It's extremely important that you validate that this table has been properly populated. You can do this simply by doing:
```bash
substreams-sink-postgres tools --dsn="psql://dev-node:insecure-change-me-in-prod@localhost:5432/dev-node?sslmode=disable" cursor read
Module eaf2fc2ea827d6aca3d5fee4ec9af202f3d1b725: Block #14490000 (61bd396f3776f26efc3f73c44e2b8be3b90cc5171facb1f9bdeef9cb5c4fd42a) [cqR8Jx...hxNg==]
```
This should emit a single line, the `Module <hash>` should fit the for `db_out` (check `substreams info <spkg>` to see your module's hashes) and the block number should fit your last block you written.

> [!WARNING]
> Failure to properly populate will 'cursors' table will make the injection starts from scratch when you will do `substreams-sink-postgres run` to bridge with "live" blocks as no cursor will exist so we will start from scratch.

Once data has been injected and you validated the `cursors` table, you can then simply start streaming normally using:

```bash
substreams-sink-postgres run "psql://dev-node:insecure-change-me-in-prod@localhost:5432/dev-node?sslmode=disable" mainnet.eth.streamingfast.io:443 <spkg> db_out
```

This will start back at the latest block written and will start to handoff streaming to a "live" blocks.

##### Performance Knobs

When generating the CSV files, optimally choosing the `--buffer-max-size` configuration value can drastically increase your write throughput locally but even more if your target store is an Amazon S3, Google Cloud Storage or Azure bucket. The flag controls how many bytes of the files is to be held in memory. By having bigger amount of buffered bytes, data is transferred in big chunk to the storage layer leading to improve performance. In lots of cases, the full file can be held in memory leading to a single "upload" call being performed having even better performance.

When choosing this value you should consider 2 things:
- One buffer exist by table in your schema, so if there is 12 tables and you have a 128 MiB buffer, you could have up to 1.536 GiB (`128 MiB * 12`) of RAM allocated to those buffers.
- Amount of RAM you want to allocate.

Let's take a container that is going to have 8 GiB of RAM. We suggest leaving 512 MiB for other part of the `generate-csv` tasks, which mean we could dedicated 7.488 GiB to buffering. If your schema has 10 tables, you should use `--buffer-max-size=785173709` (`7.488 GiB / 10 = 748.8 MiB = 785173709`).
48 changes: 24 additions & 24 deletions bundler/bundler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,21 @@ import (
"path"
"time"

"github.com/streamingfast/bstream"
"github.com/streamingfast/dhammer"
"github.com/streamingfast/dstore"
"github.com/streamingfast/shutter"

"github.com/streamingfast/substreams-sink-postgres/bundler/writer"

"github.com/streamingfast/bstream"
"go.uber.org/zap"
)

type Bundler struct {
*shutter.Shutter

blockCount uint64
// encoder Encoder
blockCount uint64
stats *boundaryStats
boundaryWriter writer.Writer
outputStore dstore.Store
fileType writer.FileType
Header []byte
HeaderWritten bool

Expand Down Expand Up @@ -92,7 +88,7 @@ func (b *Bundler) Launch(ctx context.Context) {
}
}()

b.uploadQueue.OnTerminating(func(err error) {
b.uploadQueue.OnTerminating(func(_ error) {
b.Shutdown(fmt.Errorf("upload queue failed: %w", b.uploadQueue.Err()))
})
}
Expand All @@ -105,9 +101,9 @@ func (b *Bundler) Close() {
b.zlogger.Debug("boundary upload completed")
}

func (b *Bundler) Roll(ctx context.Context, blockNum uint64) error {
func (b *Bundler) Roll(ctx context.Context, blockNum uint64) (rolled bool, err error) {
if b.activeBoundary.Contains(blockNum) {
return nil
return false, nil
}

boundaries := boundariesToSkip(b.activeBoundary, blockNum, b.blockCount)
Expand All @@ -119,26 +115,28 @@ func (b *Bundler) Roll(ctx context.Context, blockNum uint64) error {
)

if err := b.stop(ctx); err != nil {
return fmt.Errorf("stop active boundary: %w", err)
}

if blockNum >= b.stopBlock {
return ErrStopBlockReached
return false, fmt.Errorf("stop active boundary: %w", err)
}

// Empty boundaries are before `blockNum`, we must flush them also before checking if we should quit
for _, boundary := range boundaries {
if err := b.Start(boundary.StartBlock()); err != nil {
return fmt.Errorf("start skipping boundary: %w", err)
return false, fmt.Errorf("start skipping boundary: %w", err)
}
if err := b.stop(ctx); err != nil {
return fmt.Errorf("stop skipping boundary: %w", err)
return false, fmt.Errorf("stop skipping boundary: %w", err)
}
}

if blockNum >= b.stopBlock {
return false, ErrStopBlockReached
}

if err := b.Start(blockNum); err != nil {
return fmt.Errorf("start active boundary: %w", err)
return false, fmt.Errorf("start active boundary: %w", err)
}
return nil

return true, nil
}

func (b *Bundler) TrackBlockProcessDuration(elapsed time.Duration) {
Expand Down Expand Up @@ -171,20 +169,22 @@ func (b *Bundler) stop(ctx context.Context) error {
return fmt.Errorf("closing file: %w", err)
}

b.zlogger.Debug("queuing boundary upload",
zap.Stringer("boundary", b.activeBoundary),
)
if b.boundaryWriter.IsWritten(){
if b.boundaryWriter.IsWritten() {
b.zlogger.Debug("queuing boundary upload", zap.Stringer("boundary", b.activeBoundary))

b.uploadQueue.In <- &boundaryFile{
name: b.activeBoundary.String(),
file: file,
}
} else {
b.zlogger.Debug("boundary not written, skipping upload of files", zap.Stringer("boundary", b.activeBoundary))
}
b.HeaderWritten = false

// Reset state
b.HeaderWritten = false
b.activeBoundary = nil

b.stats.endBoundary()

b.zlogger.Info("bundler stats", b.stats.Log()...)
return nil
}
Expand Down
9 changes: 9 additions & 0 deletions cmd/substreams-sink-postgres/common_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/streamingfast/bstream"
"github.com/streamingfast/cli"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/logging"
"github.com/streamingfast/shutter"
sink "github.com/streamingfast/substreams-sink"
"github.com/streamingfast/substreams-sink-postgres/db"
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -100,6 +102,13 @@ func AddCommonSinkerFlags(flags *pflag.FlagSet) {
`))
}

func readBlockRangeArgument(in string) (blockRange *bstream.Range, err error) {
return sink.ReadBlockRange(&pbsubstreams.Module{
Name: "dummy",
InitialBlock: 0,
}, in)
}

type cliApplication struct {
appCtx context.Context
shutter *shutter.Shutter
Expand Down
8 changes: 5 additions & 3 deletions cmd/substreams-sink-postgres/generate_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"github.com/streamingfast/substreams-sink-postgres/sinker"
)

// lastCursorFilename is the name of the file where the last cursor is stored, no extension as it's added by the store
const lastCursorFilename = "last_cursor"

var generateCsvCmd = Command(generateCsvE,
"generate-csv <psql_dsn> <endpoint> <manifest> <module> <dest-folder> [start]:<stop>",
"Generates CSVs for each table so it can be bulk inserted with `inject-csv`",
Expand All @@ -25,8 +28,7 @@ var generateCsvCmd = Command(generateCsvE,
The process is as follows:
- Generate CSVs for each table with this command
- Inject the CSVs into the database with the 'inject-csv' command
- Run the 'inject-cursor' command to update the cursor in the database
- Inject the CSVs into the database with the 'inject-csv' command (contains 'cursors' table, double check you injected it correctly!)
- Start streaming with the 'run' command
`),
ExactArgs(6),
Expand Down Expand Up @@ -80,7 +82,7 @@ func generateCsvE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("instantiate db loader and sink: %w", err)
}

generateCSVSinker, err := sinker.NewGenerateCSVSinker(sink, destFolder, workingDir, bundleSize, bufferMaxSize, dbLoader, zlog, tracer)
generateCSVSinker, err := sinker.NewGenerateCSVSinker(sink, destFolder, workingDir, bundleSize, bufferMaxSize, dbLoader, lastCursorFilename, zlog, tracer)
if err != nil {
return fmt.Errorf("unable to setup generate csv sinker: %w", err)
}
Expand Down
Loading

0 comments on commit 82bb02d

Please sign in to comment.