Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add index and table metrics to vttablet #17570

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions go/vt/vttablet/endtoend/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,30 @@ func TestEngineReload(t *testing.T) {
})
}

func TestUpdateTableIndexMetrics(t *testing.T) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &connParams)
require.NoError(t, err)
defer conn.Close()

if query := conn.BaseShowInnodbTableSizes(); query == "" {
t.Skip("additional table/index metrics not updated in this version of MySQL")
}
vars := framework.DebugVars()

require.NotNil(t, framework.FetchVal(vars, "TableRows/vitess_a"))
require.NotNil(t, framework.FetchVal(vars, "TableRows/vitess_part"))

require.NotNil(t, framework.FetchVal(vars, "TableClusteredIndexSize/vitess_a"))
require.NotNil(t, framework.FetchVal(vars, "TableClusteredIndexSize/vitess_part"))

require.NotNil(t, framework.FetchVal(vars, "IndexCardinality/vitess_a.PRIMARY"))
require.NotNil(t, framework.FetchVal(vars, "IndexCardinality/vitess_part.PRIMARY"))

require.NotNil(t, framework.FetchVal(vars, "IndexBytes/vitess_a.PRIMARY"))
require.NotNil(t, framework.FetchVal(vars, "IndexBytes/vitess_part.PRIMARY"))
}

// TestTuple tests that bind variables having tuple values work with vttablet.
func TestTuple(t *testing.T) {
client := framework.NewClient()
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletserver/health_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,10 @@ func TestReloadView(t *testing.T) {
db.AddQuery("SELECT TABLE_NAME, CREATE_TIME FROM _vt.`tables`", &sqltypes.Result{})
// adding query pattern for udfs
db.AddQueryPattern("SELECT name.*", &sqltypes.Result{})
db.AddQuery("select table_name, partition_name from information_schema.partitions where table_schema = database() and partition_name is not null", &sqltypes.Result{})
Copy link
Contributor Author

@rafer rafer Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love that these queries have leaked out of the schema package, but I can't think of better way to make this test pass (unless we want to make the query constants public). Interested if this causes anyone concern.

db.AddQuery("select table_name, n_rows, clustered_index_size * @@innodb_page_size from mysql.innodb_table_stats where database_name = database()", &sqltypes.Result{})
db.AddQuery("select table_name, index_name, stat_value * @@innodb_page_size from mysql.innodb_index_stats where database_name = database() and stat_name = 'size'", &sqltypes.Result{})
db.AddQuery("select table_name, index_name, max(cardinality) from information_schema.statistics s where table_schema = database() group by s.table_name, s.index_name", &sqltypes.Result{})

se.InitDBConfig(cfg.DB.DbaWithDB())
hs.Open()
Expand Down
12 changes: 12 additions & 0 deletions go/vt/vttablet/tabletserver/schema/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,18 @@ SELECT f.name, i.UDF_RETURN_TYPE, f.type FROM mysql.func f left join performance
`
// fetchAggregateUdfs queries fetches all the aggregate user defined functions.
fetchAggregateUdfs = `select function_name, function_return_type, function_type from %s.udfs`

// fetch a list of all partitions
fetchPartitions = `select table_name, partition_name from information_schema.partitions where table_schema = database() and partition_name is not null`

// fetch the estimated number of rows and the clustered index byte size for all tables
fetchTableRowCountClusteredIndex = `select table_name, n_rows, clustered_index_size * @@innodb_page_size from mysql.innodb_table_stats where database_name = database()`

// fetch the byte size of all indexes
fetchIndexSizes = `select table_name, index_name, stat_value * @@innodb_page_size from mysql.innodb_index_stats where database_name = database() and stat_name = 'size'`

// fetch the cardinality of all indexes
fetchIndexCardinalities = `select table_name, index_name, max(cardinality) from information_schema.statistics s where table_schema = database() group by s.table_name, s.index_name`
)

// reloadTablesDataInDB reloads teh tables information we have stored in our database we use for schema-tracking.
Expand Down
154 changes: 149 additions & 5 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ import (
)

const maxTableCount = 10000
const maxPartitionsPerTable = 8192
const maxIndexesPerTable = 64

type notifier func(full map[string]*Table, created, altered, dropped []*Table, udfsChanged bool)

Expand Down Expand Up @@ -95,10 +97,16 @@ type Engine struct {
// dbCreationFailed is for preventing log spam.
dbCreationFailed bool

tableFileSizeGauge *stats.GaugesWithSingleLabel
tableAllocatedSizeGauge *stats.GaugesWithSingleLabel
innoDbReadRowsCounter *stats.Counter
SchemaReloadTimings *servenv.TimingsWrapper
tableFileSizeGauge *stats.GaugesWithSingleLabel
tableAllocatedSizeGauge *stats.GaugesWithSingleLabel
tableRowsGauge *stats.GaugesWithSingleLabel
tableClusteredIndexSizeGauge *stats.GaugesWithSingleLabel

indexCardinalityGauge *stats.GaugesWithMultiLabels
indexBytesGauge *stats.GaugesWithMultiLabels

innoDbReadRowsCounter *stats.Counter
SchemaReloadTimings *servenv.TimingsWrapper
}

// NewEngine creates a new Engine.
Expand All @@ -119,6 +127,10 @@ func NewEngine(env tabletenv.Env) *Engine {
_ = env.Exporter().NewGaugeDurationFunc("SchemaReloadTime", "vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time.", se.ticks.Interval)
se.tableFileSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableFileSize", "tracks table file size", "Table")
se.tableAllocatedSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableAllocatedSize", "tracks table allocated size", "Table")
se.tableRowsGauge = env.Exporter().NewGaugesWithSingleLabel("TableRows", "the estimated number of rows in the table", "Table")
se.tableClusteredIndexSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableClusteredIndexSize", "the byte size of the clustered index (i.e. row data)", "Table")
se.indexCardinalityGauge = env.Exporter().NewGaugesWithMultiLabels("IndexCardinality", "estimated number of unique values in the index", []string{"Table", "Index"})
se.indexBytesGauge = env.Exporter().NewGaugesWithMultiLabels("IndexBytes", "byte size of the the index", []string{"Table", "Index"})
se.innoDbReadRowsCounter = env.Exporter().NewCounter("InnodbRowsRead", "number of rows read by mysql")
se.SchemaReloadTimings = env.Exporter().NewTimings("SchemaReload", "time taken to reload the schema", "type")
se.reloadTimeout = env.Config().SchemaChangeReloadTimeout
Expand Down Expand Up @@ -432,7 +444,7 @@ func (se *Engine) reload(ctx context.Context, includeStats bool) error {
// We therefore don't want to query for table sizes in getTableData()
includeStats = false

innodbResults, err := conn.Conn.Exec(ctx, innodbTableSizesQuery, maxTableCount, false)
innodbResults, err := conn.Conn.Exec(ctx, innodbTableSizesQuery, maxTableCount*maxPartitionsPerTable, false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increased this because even with our limit of 10k tables, it's possible that there are more partitions than that (and this query returns one row per partition).

if err != nil {
return vterrors.Wrapf(err, "in Engine.reload(), reading innodb tables")
}
Expand Down Expand Up @@ -466,8 +478,12 @@ func (se *Engine) reload(ctx context.Context, includeStats bool) error {
}
}
}
if err := se.updateTableIndexMetrics(ctx, conn.Conn); err != nil {
log.Errorf("Updating index/table statistics failed, error: %v", err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just logs instead of failing the method. I figure if updateTableIndexMetrics fails it's better to continue on since the only impact is missing metrics.

}
// See testing in TestEngineReload
}

}
tableData, err := getTableData(ctx, conn.Conn, includeStats)
if err != nil {
Expand Down Expand Up @@ -689,6 +705,134 @@ func (se *Engine) updateInnoDBRowsRead(ctx context.Context, conn *connpool.Conn)
return nil
}

func (se *Engine) updateTableIndexMetrics(ctx context.Context, conn *connpool.Conn) error {
// Load all partitions so that we can extract the base table name from tables given as "TABLE#p#PARTITION"
type partition struct {
table string
partition string
}

partitionsResults, err := conn.Exec(ctx, fetchPartitions, 8192*maxTableCount, false)
if err != nil {
return err
}
partitions := make(map[string]partition)
for _, row := range partitionsResults.Rows {
p := partition{
table: row[0].ToString(),
partition: row[1].ToString(),
}
key := p.table + "#p#" + p.partition
partitions[key] = p
}

// Load table row counts and clustered index sizes. Results contain one row for every partition
type table struct {
table string
rows int64
rowBytes int64
}
tables := make(map[string]table)
tableStatsResults, err := conn.Exec(ctx, fetchTableRowCountClusteredIndex, maxTableCount*maxPartitionsPerTable, false)
if err != nil {
return err
}
for _, row := range tableStatsResults.Rows {
tableName := row[0].ToString()
rowCount, _ := row[1].ToInt64()
rowsBytes, _ := row[2].ToInt64()

if strings.Contains(tableName, "#p#") {
if partition, ok := partitions[tableName]; ok {
tableName = partition.table
}
}

t, ok := tables[tableName]
if !ok {
t = table{table: tableName}
}
t.rows += rowCount
t.rowBytes += rowsBytes
tables[tableName] = t
}

type index struct {
table string
index string
bytes int64
cardinality int64
}
indexes := make(map[[2]string]index)

// Load the byte sizes of all indexes. Results contain one row for every index/partition combination.
bytesResults, err := conn.Exec(ctx, fetchIndexSizes, maxTableCount*maxIndexesPerTable, false)
if err != nil {
return err
}
for _, row := range bytesResults.Rows {
tableName := row[0].ToString()
indexName := row[1].ToString()
indexBytes, _ := row[2].ToInt64()

if strings.Contains(tableName, "#p#") {
if partition, ok := partitions[tableName]; ok {
tableName = partition.table
}
}

key := [2]string{tableName, indexName}
idx, ok := indexes[key]
if !ok {
idx = index{
table: tableName,
index: indexName,
}
}
idx.bytes += indexBytes
indexes[key] = idx
}

// Load index cardinalities. Results contain one row for every index (pre-aggregated across partitions).
cardinalityResults, err := conn.Exec(ctx, fetchIndexCardinalities, maxTableCount*maxPartitionsPerTable, false)
if err != nil {
return err
}
for _, row := range cardinalityResults.Rows {
tableName := row[0].ToString()
indexName := row[1].ToString()
cardinality, _ := row[2].ToInt64()

key := [2]string{tableName, indexName}
idx, ok := indexes[key]
if !ok {
idx = index{
table: tableName,
index: indexName,
}
}
idx.cardinality = cardinality
indexes[key] = idx
}

se.indexBytesGauge.ResetAll()
se.indexCardinalityGauge.ResetAll()
for _, idx := range indexes {
key := []string{idx.table, idx.index}
se.indexBytesGauge.Set(key, idx.bytes)
se.indexCardinalityGauge.Set(key, idx.cardinality)
}

se.tableRowsGauge.ResetAll()
se.tableClusteredIndexSizeGauge.ResetAll()
for _, tbl := range tables {
se.tableRowsGauge.Set(tbl.table, tbl.rows)
se.tableClusteredIndexSizeGauge.Set(tbl.table, tbl.rowBytes)
}

return nil
}

func (se *Engine) mysqlTime(ctx context.Context, conn *connpool.Conn) (int64, error) {
// Keep `SELECT UNIX_TIMESTAMP` is in uppercase because binlog server queries are case sensitive and expect it to be so.
tm, err := conn.Exec(ctx, "SELECT UNIX_TIMESTAMP()", 1, false)
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletserver/schema/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1900,6 +1900,10 @@ func TestGetTableForPos(t *testing.T) {
se.historian.enabled = false

addExpectedReloadQueries := func(db *fakesqldb.DB) {
db.AddQuery(fetchPartitions, &sqltypes.Result{})
db.AddQuery(fetchTableRowCountClusteredIndex, &sqltypes.Result{})
db.AddQuery(fetchIndexSizes, &sqltypes.Result{})
db.AddQuery(fetchIndexCardinalities, &sqltypes.Result{})
db.AddQuery("SELECT UNIX_TIMESTAMP()", sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"UNIX_TIMESTAMP()",
"int64"),
Expand Down
Loading