-
Notifications
You must be signed in to change notification settings - Fork 60
/
Copy pathdb.rs
4192 lines (3548 loc) · 146 KB
/
db.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//! This is where the actual code starts executing.
//!
//! The [`Database`] struct owns everything and delegates work to the other
//! modules.
use std::{
cell::RefCell,
collections::{HashMap, VecDeque},
ffi::OsString,
fmt::Display,
fs::File,
io::{self, Read, Seek, Write},
path::{Path, PathBuf},
rc::Rc,
};
use crate::{
os::{FileSystemBlockSize, Open},
paging::{
io::FileOps,
pager::{PageNumber, Pager},
},
query,
sql::{
self,
analyzer::AnalyzerError,
parser::{Parser, ParserError},
statement::{Column, Constraint, Create, DataType, Statement, Value},
},
storage::{tuple, BTree, BTreeKeyComparator, FixedSizeMemCmp},
vm::{
self,
plan::{Plan, Tuple},
TypeError, VmError,
},
};
/// Database file default page size.
pub(crate) const DEFAULT_PAGE_SIZE: usize = 4096;
/// Name of the meta-table used to keep track of other tables.
pub(crate) const MKDB_META: &str = "mkdb_meta";
/// Name of the column used to store [`RowId`] values.
pub(crate) const ROW_ID_COL: &str = "row_id";
/// Root page of the meta-table.
pub(crate) const MKDB_META_ROOT: PageNumber = 0;
/// Max size that can be collected in memory for [`QuerySet`] structures. 1 GiB.
///
/// Mostly relevant for network code, as the database can process rows one at
/// a time but we have no implementation of SQL cursors or anything like that.
pub(crate) const MAX_QUERY_SET_SIZE: usize = 1 << 30;
/// Rows are uniquely identified by an 8 byte key stored in big endian at the
/// beginning of each tuple.
///
/// The big endian format allows for fast memcmp() comparisons where we don't
/// need to parse or cast the integer to compare it to another one. This idea
/// was of course stolen from SQLite.
pub(crate) type RowId = u64;
/// State of the current transaction. There can only be one at a time.
#[derive(Debug, PartialEq)]
enum TransactionState {
/// Transaction is in progress, no errors so far.
InProgress,
/// Transaction was aborted due to an error.
Aborted,
/// There is no active transaction.
None,
}
/// Main entry point to everything.
///
/// Provides the high level [`Database::exec`] API that receives SQL text and
/// runs it.
pub(crate) struct Database<F> {
/// The database owns the pager.
///
/// TODO: [`Rc<Refcell>`] is a temporary solution until we make the pager
/// multithreaded. The pager should be able to allow multiple readers and
/// one writer.
pub pager: Rc<RefCell<Pager<F>>>,
/// Database context. See [`DatabaseContext`].
pub context: Context,
/// Working directory (the directory of the file).
pub work_dir: PathBuf,
/// `true` if we are currently in a transaction.
pub transaction_state: TransactionState,
}
/// Not really "Send" because of the [`Rc<RefCell>`], but we put the entire
/// database behind a mutex when working with it in the "server.rs" file and we
/// take care of not unlocking the database until `transaction_started` is
/// false. We could probably build a specific struct that wraps the Database
/// and does all this, but what we really should do instead is make the program
/// actually multithreaded. We can support multiple readers while only allowing
/// one writer. Of course, easier said than done, that's why we're using a
/// Mutex :)
unsafe impl Send for Database<File> {}
impl Database<File> {
/// Initializes a [`Database`] instance from the given file.
pub fn init(path: impl AsRef<Path>) -> Result<Self, DbError> {
let file = crate::os::Fs::options()
.create(true)
.truncate(false)
.read(true)
.write(true)
.bypass_cache(true)
.sync_on_write(false)
.lock(true)
.open(&path)?;
let metadata = file.metadata()?;
if !metadata.is_file() {
return Err(io::Error::new(io::ErrorKind::Unsupported, "not a file").into());
}
let block_size = crate::os::Fs::block_size(&path)?;
let full_db_file_path = path.as_ref().canonicalize()?;
let work_dir = full_db_file_path.parent().unwrap().to_path_buf();
let mut extension = full_db_file_path
.extension()
.unwrap_or(&OsString::new())
.to_os_string();
extension.push(".journal");
let journal_file_path = full_db_file_path.with_extension(extension);
let mut pager = Pager::<File>::builder()
.page_size(DEFAULT_PAGE_SIZE)
.block_size(block_size)
.journal_file_path(journal_file_path)
.wrap(file);
pager.init()?;
// Initial rollback on startup if the journal file exists.
pager.rollback()?;
Ok(Database::new(Rc::new(RefCell::new(pager)), work_dir))
}
}
/// Errors somehow related to SQL.
#[derive(Debug, PartialEq)]
pub(crate) enum SqlError {
/// Database table not found or otherwise not usable.
InvalidTable(String),
/// Table column not found or not usable in the context of the error.
InvalidColumn(String),
/// Duplicated UNIQUE columns, duplicated PRIMARY KEY columns, etc.
DuplicatedKey(Value),
/// Errors caught by the [`sql::analyzer`].
AnalyzerError(AnalyzerError),
/// Data type errors. Trying to add numbers to strings, etc.
TypeError(TypeError),
/// Errors thrown by the [`vm`] when executing expressions.
VmError(VmError),
/// Uncategorized error with custom message.
Other(String),
}
impl From<TypeError> for SqlError {
fn from(type_error: TypeError) -> Self {
SqlError::TypeError(type_error)
}
}
impl From<AnalyzerError> for SqlError {
fn from(analyzer_error: AnalyzerError) -> Self {
SqlError::AnalyzerError(analyzer_error)
}
}
impl From<VmError> for SqlError {
fn from(vm_error: VmError) -> Self {
Self::VmError(vm_error)
}
}
impl Display for SqlError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::InvalidTable(name) => write!(f, "invalid table '{name}'"),
Self::InvalidColumn(name) => write!(f, "invalid column '{name}'"),
Self::DuplicatedKey(key) => write!(f, "duplicated key {key}"),
Self::AnalyzerError(analyzer_error) => write!(f, "{analyzer_error}"),
Self::VmError(vm_error) => write!(f, "{vm_error}"),
Self::TypeError(type_error) => write!(f, "{type_error}"),
Self::Other(message) => f.write_str(message),
}
}
}
/// Generic top level error.
#[derive(Debug)]
pub enum DbError {
/// Files, sockets, etc.
Io(io::Error),
/// [`sql::parser`] error.
Parser(ParserError),
/// Other SQL error not related to syntax.
Sql(SqlError),
/// Something in the database file or journal file is corrupted/unexpected.
Corrupted(String),
/// Query too large or out of memory.
NoMem,
/// Uncategorized custom error.
Other(String),
}
impl Display for DbError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::Io(e) => write!(f, "{e}"),
Self::Parser(e) => write!(f, "{e}"),
Self::Sql(e) => write!(f, "{e}"),
Self::Corrupted(message) => f.write_str(message),
Self::NoMem => f.write_str("our of memory"),
Self::Other(message) => f.write_str(message),
}
}
}
impl<E: Into<SqlError>> From<E> for DbError {
fn from(err: E) -> Self {
DbError::Sql(err.into())
}
}
impl From<io::Error> for DbError {
fn from(e: io::Error) -> Self {
Self::Io(e)
}
}
impl From<ParserError> for DbError {
fn from(e: ParserError) -> Self {
Self::Parser(e)
}
}
/// In-memory representation of a table schema.
#[derive(Debug, PartialEq, Clone)]
pub struct Schema {
/// Column definitions.
pub columns: Vec<Column>,
/// Quick index to find column defs based on their name.
pub index: HashMap<String, usize>,
}
impl Schema {
/// Create a new schema with the given column definitions.
pub fn new(columns: Vec<Column>) -> Self {
let index = columns
.iter()
.enumerate()
.map(|(i, col)| (col.name.clone(), i))
.collect();
Self { columns, index }
}
/// Creates an empty schema with no columns.
pub fn empty() -> Self {
Self::new(vec![])
}
/// Returns the index in [`Self::columns`] of `col`.
pub fn index_of(&self, col: &str) -> Option<usize> {
self.index.get(col).copied()
}
/// Number of columns in this schema.
pub fn len(&self) -> usize {
self.columns.len()
}
/// Appends a new column to the end of the schema.
pub fn push(&mut self, column: Column) {
self.index.insert(column.name.to_owned(), self.len());
self.columns.push(column);
}
/// Prepends the special "row_id" column at the beginning of the schema.
pub fn prepend_row_id(&mut self) {
debug_assert!(
self.columns[0].name != ROW_ID_COL,
"schema already has {ROW_ID_COL}: {self:?}"
);
let col = Column::new(ROW_ID_COL, DataType::UnsignedBigInt);
self.columns.insert(0, col);
self.index.values_mut().for_each(|idx| *idx += 1);
self.index.insert(String::from(ROW_ID_COL), 0);
}
/// See [`has_btree_key`].
pub fn has_btree_key(&self) -> bool {
has_btree_key(&self.columns)
}
/// Returns a list of owned [`Column::name`] strings.
pub fn column_identifiers(&self) -> Vec<String> {
self.columns.iter().map(|col| col.name.to_owned()).collect()
}
}
impl<'c, C: IntoIterator<Item = &'c Column>> From<C> for Schema {
fn from(columns: C) -> Self {
Self::new(Vec::from_iter(columns.into_iter().cloned()))
}
}
/// Returns `true` if the primary key of the table can also be used as the BTree
/// key.
///
/// If the table doesn't need a Row ID we won't create an index for the primary
/// key because the table BTree itself will already be an index for the primary
/// key since we're using index organized storage.
///
/// On the other hand if we need a Row ID then the primary key is not usable as
/// a direct table index, so we'll create a separate BTree index instead.
pub fn has_btree_key(columns: &[Column]) -> bool {
columns[0].constraints.contains(&Constraint::PrimaryKey)
&& !matches!(columns[0].data_type, DataType::Varchar(_) | DataType::Bool)
}
/// This only exists because in earlier development stages the iterator model
/// was not yet implemented.
///
/// Without the iterator model we couldn't process tuples one at a time, we had
/// to collect them all at once and process them after that. So all tests in
/// this module (quite a few at this point) were written using this struct for
/// `assert_eq` comparisons.
///
/// Right now we're only using this struct to collect the results of a query
/// in memory, which we mostly need for tests and also the client package
/// collects all the results to print an ASCII table like the MySQL client.
#[derive(Debug, PartialEq)]
pub struct QuerySet {
/// Schema of the results.
pub schema: Schema,
/// Rows.
pub tuples: Vec<Vec<Value>>,
}
impl QuerySet {
/// Creates a new [`QuerySet`].
pub fn new(schema: Schema, tuples: Vec<Vec<Value>>) -> Self {
Self { schema, tuples }
}
/// Creates a set with no schema and no results.
pub fn empty() -> Self {
Self {
schema: Schema::empty(),
tuples: Vec::new(),
}
}
/// Returns a concrete value given its column name and row number.
pub fn get(&self, row: usize, column: &str) -> Option<&Value> {
self.tuples.get(row)?.get(self.schema.index_of(column)?)
}
/// `true` if there are no results.
pub fn is_empty(&self) -> bool {
self.tuples.is_empty()
}
}
/// Schema of the table used to keep track of the database information.
pub(crate) fn mkdb_meta_schema() -> Schema {
Schema::from(&[
// Either "index" or "table"
Column::new("type", DataType::Varchar(255)),
// Index or table name
Column::new("name", DataType::Varchar(255)),
// Root page
Column::new("root", DataType::UnsignedInt),
// Table name
Column::new("table_name", DataType::Varchar(255)),
// SQL used to create the index or table.
// TODO: Implement and use some TEXT data type with higher length limits.
Column::new("sql", DataType::Varchar(65535)),
])
}
/// Data that we need to know about an index at runtime.
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct IndexMetadata {
/// Root page of the index-
pub root: PageNumber,
/// Index name.
pub name: String,
/// Column on which the index was created.
pub column: Column,
/// Schema of the index. Always key -> primary key.
pub schema: Schema,
/// Always `true` because non-unique indexes are not implemented.
pub unique: bool,
}
/// Data that we need to know about tables at runtime.
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct TableMetadata {
/// Root page of the table.
pub root: PageNumber,
/// Table name.
pub name: String,
/// Schema of the table as defined by the `CREATE TABLE` statement.
pub schema: Schema,
/// All the indexes associated to this table.
pub indexes: Vec<IndexMetadata>,
/// Next [`RowId`] for this table.
row_id: RowId,
}
/// Dynamic dispatch for relation types.
///
/// Some [`Plan`] types need to work with both table BTrees and Index BTrees but
/// we will only know exactly wich at runtime. The relations share some
/// properties like the root page but differ in others, for example tables have
/// indexes associated to them but indexes don't.
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum Relation {
Index(IndexMetadata),
Table(TableMetadata),
}
impl Relation {
/// Root page.
pub fn root(&self) -> PageNumber {
match self {
Self::Index(index) => index.root,
Self::Table(table) => table.root,
}
}
/// Dynamically dispatched key comparator for the BTree.
pub fn comparator(&self) -> BTreeKeyComparator {
match self {
Self::Index(index) => BTreeKeyComparator::from(&index.column.data_type),
Self::Table(table) => BTreeKeyComparator::from(&table.schema.columns[0].data_type),
}
}
/// Schema of the relation.
///
/// Indexes will always have 2 columns, the index key and the table key.
pub fn schema(&self) -> &Schema {
match self {
Self::Index(index) => &index.schema,
Self::Table(table) => &table.schema,
}
}
/// Type of relation as a string.
pub fn kind(&self) -> &str {
match self {
Self::Index(_) => "index",
Self::Table(_) => "table",
}
}
/// Name of the relation.
///
/// This is the name of the index or the table.
pub fn name(&self) -> &str {
match self {
Self::Index(index) => &index.name,
Self::Table(table) => &table.name,
}
}
/// Returns the array index where the table key is located in the schema.
///
/// For tables the key is always at index 0 while for indexes it's at index
/// 1.
pub fn index_of_table_key(&self) -> usize {
match self {
Self::Index(_) => 1,
Self::Table(_) => 0,
}
}
}
impl TableMetadata {
/// Returns the next [`RowId`] that should be used for rows in this table.
pub fn next_row_id(&mut self) -> RowId {
let row_id = self.row_id;
self.row_id += 1;
row_id
}
/// As of right now all tables use integers as real primary keys.
///
/// Varchar primary keys are not used, we use the special "row_id" column
/// in that case.
pub(crate) fn comparator(&self) -> Result<FixedSizeMemCmp, DbError> {
FixedSizeMemCmp::try_from(&self.schema.columns[0].data_type).map_err(|_| {
DbError::Corrupted(format!(
"table {} is using a non-integer BTree key of type {}",
self.name, self.schema.columns[0].data_type
))
})
}
/// Generates a new schema that contains only the key of this table.
///
/// This is useful for some query plans that only need to work with keys.
pub fn key_only_schema(&self) -> Schema {
Schema::new(vec![self.schema.columns[0].clone()])
}
}
/// API to obtain data about the database itself.
pub(crate) trait DatabaseContext {
/// Returns a [`TableMetadata`] object describing `table`.
fn table_metadata(&mut self, table: &str) -> Result<&mut TableMetadata, DbError>;
}
/// Default value for [`Context::max_size`].
const DEFAULT_RELATION_CACHE_SIZE: usize = 512;
/// Dead simple cache made for storing [`TableMetadata`] instances.
///
/// Unlike [`crate::paging::cache`], this one doesn't need to complicated since
/// [`TableMetadata`] structs are just a handful of bytes depending on the
/// schema and databases usually don't have thousands of tables like they do
/// pages. So the eviction policy is pretty much random, we evict whatever the
/// underlying [`HashMap`] decides that is the first element.
///
/// This struct is also used as some sort of mock for tests in [`crate::sql`].
/// Some components like the analyzer need access to the database context but
/// we don't want to create an entire [`Database`] struct just for that. The
/// [`Database`] struct also need to parse SQL to obtain metadata about tables
/// so it would call the [`crate::sql`] module again, and we don't want so much
/// mutual recursion because it makes test debugging hard.
pub(crate) struct Context {
/// Maps the table name to its metadata.
tables: HashMap<String, TableMetadata>,
/// Maximum size of the cache.
max_size: Option<usize>,
}
impl Context {
/// New default [`Context`].
///
/// [`Context::max_size`] is unlimited.
pub fn new() -> Self {
Self {
tables: HashMap::new(),
max_size: None,
}
}
/// New [`Context`] with fixed max size.
pub fn with_max_size(max_size: usize) -> Self {
Self {
tables: HashMap::with_capacity(max_size),
max_size: Some(max_size),
}
}
/// `true` if `table` exists in memory.
pub fn contains(&self, table: &str) -> bool {
self.tables.contains_key(table)
}
/// Adds the given table and metadata to this context.
pub fn insert(&mut self, metadata: TableMetadata) {
if self.max_size.is_some_and(|size| self.tables.len() >= size) {
let evict = self.tables.keys().next().unwrap().clone();
self.tables.remove(&evict);
}
self.tables.insert(metadata.name.clone(), metadata);
}
/// Removes the `table` from cache. Next it will be loaded from disk.
pub fn invalidate(&mut self, table: &str) {
self.tables.remove(table);
}
}
// Mainly used for mocks in tests to avoid passing the entire database around
// and debugging mutual recursive calls.
#[cfg(test)]
impl TryFrom<&[&str]> for Context {
type Error = DbError;
/// Creates a context from raw SQL statements.
///
/// Used for tests. See [`crate::sql::analyzer`] or [`crate::sql::prepare`].
fn try_from(statements: &[&str]) -> Result<Self, Self::Error> {
let mut context = Self::new();
let mut root = 1;
for sql in statements {
let statement = Parser::new(sql).parse_statement()?;
match statement {
Statement::Create(Create::Table { name, columns }) => {
let mut schema = Schema::from(&columns);
schema.prepend_row_id();
let mut metadata = TableMetadata {
root,
name: name.clone(),
row_id: 1,
schema,
indexes: vec![],
};
root += 1;
for column in &columns {
for constraint in &column.constraints {
let index_name = match constraint {
Constraint::PrimaryKey => format!("{name}_pk_index"),
Constraint::Unique => format!("{name}_{}_uq_index", column.name),
};
metadata.indexes.push(IndexMetadata {
column: column.clone(),
schema: Schema::new(vec![column.clone(), columns[0].clone()]),
name: index_name,
root,
unique: true,
});
root += 1;
}
}
context.insert(metadata);
}
Statement::Create(Create::Index {
name,
column,
unique,
..
}) if unique => {
let table = context.table_metadata(&name)?;
let index_col = table.schema.columns[table.schema.index_of(&column).unwrap()].clone();
table.indexes.push(IndexMetadata {
column: index_col.clone(),
schema: Schema::new(vec![index_col, table.schema.columns[0].clone()]),
name,
root,
unique,
});
root += 1;
}
other => return Err(DbError::Sql(SqlError::Other(format!(
"only CREATE TABLE and CREATE UNIQUE INDEX should be used to create a mock context but received {other}"
)))),
}
}
Ok(context)
}
}
impl DatabaseContext for Context {
fn table_metadata(&mut self, table: &str) -> Result<&mut TableMetadata, DbError> {
self.tables
.get_mut(table)
.ok_or_else(|| DbError::Sql(SqlError::InvalidTable(table.into())))
}
}
impl<F> Database<F> {
/// Creates a new database.
pub fn new(pager: Rc<RefCell<Pager<F>>>, work_dir: PathBuf) -> Self {
Self {
pager,
work_dir,
context: Context::with_max_size(DEFAULT_RELATION_CACHE_SIZE),
transaction_state: TransactionState::None,
}
}
/// Returns `true` if there's a an active transaction at the moment.
pub fn active_transaction(&self) -> bool {
matches!(
self.transaction_state,
TransactionState::InProgress | TransactionState::Aborted
)
}
pub fn transaction_aborted(&self) -> bool {
self.transaction_state == TransactionState::Aborted
}
/// Starts a new transaction.
///
/// Transactions can only be terminated by calling [`Database::rollback`]
/// or [`Database::commit`]. Otherwise the equivalent SQL statements can
/// be used to terminate transactions.
pub fn start_transaction(&mut self) {
self.transaction_state = TransactionState::InProgress;
}
}
impl<F: Seek + Read + Write + FileOps> DatabaseContext for Database<F> {
fn table_metadata(&mut self, table: &str) -> Result<&mut TableMetadata, DbError> {
if !self.context.contains(table) {
let metadata = self.load_table_metadata(table)?;
self.context.insert(metadata);
}
self.context.table_metadata(table)
}
}
impl<F: Seek + Read + Write + FileOps> Database<F> {
/// Loads the next row ID that should be used for the table rooted at
/// `root`.
///
/// This is not so expensive since it traverses the BTree from the root
/// straight to a leaf node to find the max row ID, but it should be cached
/// to avoid IO next time.
fn load_next_row_id(&mut self, root: PageNumber) -> Result<RowId, DbError> {
let mut pager = self.pager.borrow_mut();
let mut btree = BTree::new(&mut pager, root, FixedSizeMemCmp::for_type::<RowId>());
let row_id = if let Some(max) = btree.max()? {
tuple::deserialize_row_id(max.as_ref()) + 1
} else {
1
};
Ok(row_id)
}
/// Loads all the metadata that we store about `table`.
///
/// Right now the [`MKDB_META`] table doesn't use any indexes, so this
/// is basically a sequential scan. The metadata table shouldn't get too
/// big in most scenarios though.
fn load_table_metadata(&mut self, table: &str) -> Result<TableMetadata, DbError> {
if table == MKDB_META {
let mut schema = mkdb_meta_schema();
schema.prepend_row_id();
return Ok(TableMetadata {
root: MKDB_META_ROOT,
name: String::from(table),
row_id: self.load_next_row_id(MKDB_META_ROOT)?,
schema,
indexes: vec![],
});
}
let mut metadata = TableMetadata {
root: 1,
name: String::from(table),
row_id: 1,
schema: Schema::empty(),
indexes: Vec::new(),
};
let mut found_table_definition = false;
let (schema, mut results) = self.prepare(&format!(
"SELECT root, sql FROM {MKDB_META} where table_name = '{table}';"
))?;
let corrupted_error = || {
DbError::Corrupted(format!(
"{MKDB_META} table is corrupted or contains wrong/unexpected data"
))
};
while let Some(tuple) = results.try_next()? {
let Value::Number(root) = &tuple[schema.index_of("root").ok_or(corrupted_error())?]
else {
return Err(corrupted_error());
};
match &tuple[schema.index_of("sql").ok_or(corrupted_error())?] {
Value::String(sql) => match Parser::new(sql).parse_statement()? {
Statement::Create(Create::Table { columns, .. }) => {
assert!(
!found_table_definition,
"multiple definitions of table '{table}'"
);
metadata.root = *root as PageNumber;
metadata.schema = Schema::new(columns);
// Tables tha don't have an integer primary key as the
// first field will use a hidden primary key that we
// generate ourselves.
if !metadata.schema.has_btree_key() {
metadata.schema.prepend_row_id();
}
found_table_definition = true;
}
Statement::Create(Create::Index {
column,
name,
unique,
..
}) => {
// The table schema should be loaded by this time
// because it's impossible to define an index unless the
// table exists and the results are returned sorted by
// row_id.
let col_idx = metadata.schema.index_of(&column).ok_or(
SqlError::Other(format!(
"could not find index column {column} in the definition of table {table}"
)),
)?;
let index_col = metadata.schema.columns[col_idx].clone();
metadata.indexes.push(IndexMetadata {
column: index_col.clone(),
schema: Schema::new(vec![
index_col,
metadata.schema.columns[0].clone(),
]),
name,
root: *root as PageNumber,
unique,
});
}
_ => return Err(corrupted_error()),
},
_ => return Err(corrupted_error()),
};
}
if !found_table_definition {
return Err(DbError::Sql(SqlError::InvalidTable(table.into())));
}
if metadata.schema.columns[0].name == ROW_ID_COL {
metadata.row_id = self.load_next_row_id(metadata.root)?;
}
Ok(metadata)
}
/// Returns the root page of `index` if it exists.
fn index_metadata(&mut self, index_name: &str) -> Result<IndexMetadata, DbError> {
let query = self.exec(&format!(
"SELECT table_name FROM {MKDB_META} where name = '{index_name}' AND type = 'index';"
))?;
if query.is_empty() {
return Err(DbError::Sql(SqlError::Other(format!(
"index {index_name} does not exist"
))));
}
let table_name = match query.get(0, "table_name") {
Some(Value::String(name)) => name,
_ => unreachable!(),
};
let table_metadata = self.table_metadata(table_name)?;
// TODO: Innefficient.
Ok(table_metadata
.indexes
.iter()
.find(|index| index.name == index_name)
.unwrap()
.clone())
}
/// Highest level API in the entire system.
///
/// Receives a SQL string and executes it, collecting the results in memory.
/// Of course this is not ideal for internal work because the results might
/// not fit in memory, but it's nice for tests and network code that returns
/// one complete packet.
///
/// Internal APIs must use [`Database::prepare`] instead to obtain an
/// iterator over the rows produced by the query. That will limit the memory
/// usage to the size of internal buffers used the [`Plan`] execution engine
/// at [`vm::plan`].
pub fn exec(&mut self, input: &str) -> Result<QuerySet, DbError> {
let (schema, mut preapred_staement) = self.prepare(input)?;
let mut query_set = QuerySet::new(schema, vec![]);
let mut total_size = 0;
while let Some(tuple) = preapred_staement.try_next()? {
total_size += tuple::size_of(&tuple, &query_set.schema);
if total_size > MAX_QUERY_SET_SIZE {
self.rollback()?;
return Err(DbError::NoMem);
}
query_set.tuples.push(tuple);
}
Ok(query_set)
}
/// Parses the given `sql` and generates an execution plan for it.
///
/// The execution plan is returned and can be iterated tuple by tuple
/// with fixed memory usage (except for the size of the tuple itself). This
/// is the API the should be used to process queries as it will not make use
/// of all the system's RAM.
pub fn prepare(&mut self, sql: &str) -> Result<(Schema, PreparedStatement<'_, F>), DbError> {
let statement = sql::pipeline(sql, self)?;
let mut schema = Schema::empty();
let exec = match statement {
Statement::Create(_)
| Statement::Drop(_)
| Statement::StartTransaction
| Statement::Commit
| Statement::Rollback => Exec::Statement(statement),
Statement::Explain(inner) => match &*inner {
Statement::Select { .. }
| Statement::Insert { .. }
| Statement::Update { .. }
| Statement::Delete { .. } => {
schema = Schema::new(vec![Column::new("Query Plan", DataType::Varchar(255))]);
let plan = query::planner::generate_plan(*inner, self)?;
Exec::Explain(format!("{plan}").lines().map(String::from).collect())
}
_ => {
return Err(DbError::Other(String::from(
"EXPLAIN only works with SELECT, UPDATE, DELETE and INSERT statements",
)));
}
},
_ => {
let plan = query::planner::generate_plan(statement, self)?;
if let Some(plan_schema) = plan.schema() {
schema = plan_schema;
}
Exec::Plan(plan)
}
};
let prepared_statement = PreparedStatement {
db: self,
auto_commit: false,
exec: Some(exec),
};
Ok((schema, prepared_statement))
}
/// Manually rolls back the database and stops the current transaction.
pub fn rollback(&mut self) -> Result<usize, DbError> {
self.transaction_state = TransactionState::None;
self.pager.borrow_mut().rollback()
}
/// Manually commits the changes and stops the current transaction.
pub fn commit(&mut self) -> io::Result<()> {
self.transaction_state = TransactionState::None;
self.pager.borrow_mut().commit()
}
}
/// Not all statements need [`Plan`] trees for execution.
///
/// See [`vm::statement`].
enum Exec<F> {
/// Statements that don't need any plans executed by [`vm::statement`].
Statement(Statement),
/// Complex statements that require [`Plan`] trees executed by [`vm::plan`].
Plan(Plan<F>),
/// Return a string that describes the generated plan.
Explain(VecDeque<String>),
}