diff --git a/src/TulipaIO.jl b/src/TulipaIO.jl index c26badc..24d9659 100644 --- a/src/TulipaIO.jl +++ b/src/TulipaIO.jl @@ -1,5 +1,6 @@ module TulipaIO +include("exceptions.jl") include("parsers.jl") include("pipeline.jl") diff --git a/src/exceptions.jl b/src/exceptions.jl new file mode 100644 index 0000000..2145baa --- /dev/null +++ b/src/exceptions.jl @@ -0,0 +1,45 @@ +import DuckDB: DB + +struct FileNotFoundError <: Exception + file::String + msg::String + function FileNotFoundError(file) + if ispath(file) + new(file, "$(file): exists, but not a regular file") + else + new(file, "$(file): file not found") + end + end +end + +struct DirectoryNotFoundError <: Exception + dir::String + msg::String + function DirectoryNotFoundError(dir) + if ispath(dir) + new(dir, "$(dir): exists, but not a directory") + else + new(dir, "$(dir): directory not found") + end + end +end + +struct TableNotFoundError <: Exception + con::DB + tbl::String + msg::String + TableNotFoundError(con, tbl) = new(con, tbl, "$(tbl): table not found in $(con)") +end + +struct NeitherTableNorFileError <: Exception + con::DB + src::String + msg::String + NeitherTableNorFileError(con, src, msg) = + new(con, src, "$(src): neither table ($con) nor file found") +end + +Base.showerror(io::IO, exc::FileNotFoundError) = print(io, exc.msg) +Base.showerror(io::IO, exc::DirectoryNotFoundError) = print(io, exc.msg) +Base.showerror(io::IO, exc::TableNotFoundError) = print(io, exc.msg) +Base.showerror(io::IO, exc::NeitherTableNorFileError) = print(io, exc.msg) diff --git a/src/fmtsql.jl b/src/fmtsql.jl new file mode 100644 index 0000000..5d3999a --- /dev/null +++ b/src/fmtsql.jl @@ -0,0 +1,62 @@ +module FmtSQL + +function sprintf(fmt::String, args...) + format(Format(fmt), args...) +end + +function fmt_opts(source::String; opts...) + _src = '?' in source ? "$source" : "'$(source)'" + join(["$(_src)"; [join(p, "=") for p in opts]], ", ") +end + +function reader(source::String) + _, ext = splitext(source) + if ext in (".csv", ".parquet", ".json") + return "read_$(ext[2:end])_auto" + elseif '?' in source + # FIXME: how to support other file formats? + return "read_csv_auto" + else + error("$(ext[2:end]): unsupported input file '$(source)'") + end +end + +function fmt_read(source::String, opts...) + sprintf("%s(%s)", reader(source), fmt_opts(source; opts...)) +end + +function fmt_select(source::String; opts...) + sprintf("SELECT * FROM %s", fmt_read(source; opts...)) +end + +function fmt_join( + from_subquery::String, + join_subquery::String; + on::Vector{String}, + cols::Vector{String}, + fill::Union{Bool,Vector::Any}, +) + exclude = join(cols, ", ") + if fill # back fill + # e.g.: IFNULL(t2.investable, t1.investable) AS investable + include = join(map(c -> "IFNULL(t2.$c, t1.$c) AS $c", cols), ", ") + elseif !fill # explicit missing + include = join(map(c -> "t2.$c", cols), ", ") + else # fill with default + if length(fill) != length(cols) + msg = "number of default values does not match columns\n" + msg = msg * "columns: $cols\n" + msg = msg * "defaults: $fill" + error(msg) + end + include = join(map((c, f) -> "IFNULL(t2.$c, $f) AS $c", zip(cols, fill)), ", ") + end + select_ = "SELECT t1.* EXCLUDE ($exclude), $include" + + join_on = join(map(c -> "t1.$c = t2.$c", on), " AND ") + from_ = "FROM $from_subquery t1 LEFT JOIN $join_subquery t2 ON ($join_on)" + + "$(select_)\n$(from_)" +end + +end # module FmtSQL diff --git a/src/pipeline.jl b/src/pipeline.jl index b29162f..f9cd17b 100644 --- a/src/pipeline.jl +++ b/src/pipeline.jl @@ -2,66 +2,29 @@ using DataFrames: DataFrames as DF using DuckDB: DB, DBInterface, Stmt using Printf: format, Format -function sprintf(fmt::String, args...) - format(Format(fmt), args...) -end +using .FmtSQL: fmt_join, fmt_read, fmt_select -function fmt_opts(source::String; opts...) - _src = '?' in source ? "$source" : "'$(source)'" - join(["$(_src)"; [join(p, "=") for p in opts]], ", ") -end +# default options (for now) +_read_opts = pairs((header = true, skip = 1)) -function reader(source::String) - _, ext = splitext(source) - if ext in (".csv", ".parquet", ".json") - return "read_$(ext[2:end])_auto" - elseif '?' in source - # FIXME: how to support other file formats? - return "read_csv_auto" - else - error("$(ext[2:end]): unsupported input file '$(source)'") - end +function check_file(source::String) + # FIXME: handle globs + isfile(source) end -function fmt_select(source::String; opts...) - sprintf("SELECT * FROM %s(%s)", reader(source), fmt_opts(source; opts...)) +function check_tbl(con::DB, source::String) + tbls = DBInterface.execute(con, "SHOW TABLES").tbl[:name] + source in tbls end -function fmt_join( - from_subquery::String, - join_subquery::String; - on::Vector{String}, - cols::Vector{String}, - fill::Union{Bool,Vector::Any}, -) - exclude = join(cols, ", ") - if fill # back fill - # e.g.: IFNULL(t2.investable, t1.investable) AS investable - include = join(map(c -> "IFNULL(t2.$c, t1.$c) AS $c", cols), ", ") - else if !fill # explicit missing - include = join(map(c -> "t2.$c", cols), ", ") - else # fill with default - if length(fill) != length(cols) - msg = "number of default values does not match columns\n" - msg = msg * "columns: $cols\n" - msg = msg * "defaults: $fill" - error(msg) - end - include = join(map((c, f) -> "IFNULL(t2.$c, $f) AS $c", zip(cols, fill)), ", ") +function fmt_source(con::DB, source::String) + if check_tbl(source) + return base_source + elseif check_file(base_source) + return fmt_read(base_source, _read_opts...) + else + throw(NeitherTableNorFileError(con, base_source)) end - select_ = "SELECT t1.* EXCLUDE ($exclude), $include" - - join_on = join(map(c -> "t1.$c = t2.$c", on), " AND ") - from_ = "FROM $from_subquery t1 LEFT JOIN $join_subquery t2 ON ($join_on)" - - "$(select_)\n$(from_)" -end - -# TODO: check_file -> check_source: file, file glob, table name -function check_file(source::String) - # FIXME: handle globs - isfile(source) || throw(ArgumentError("$(source): is not a regular file")) - source end ## User facing functions below @@ -82,49 +45,42 @@ end Store() = Store(":memory:") DEFAULT = Store() -function read_file(con::DB, source::String) - check_file(source) - query = fmt_select(source; header = true, skip = 1) # FIXME: don't hardcode options - res = DBInterface.execute(con, query) - return DF.DataFrame(res) -end - -function read_file_replace_cols( - con::DB, - source1::String, - source2::String; - on::Vector{String}, - cols::Vector{String}, - fill::Union{Bool,Vector::Any} = true, -) - check_file(source1) - check_file(source2) - sources = [ # FIXME: don't hardcode options - sprintf("%s(%s)", reader(src), fmt_opts(src; header = true, skip = 1)) for - src in (source1, source2) - ] - query = fmt_join(sources...; on = on, cols = cols, fill = fill) - res = DBInterface.execute(con, query) - return DF.DataFrame(res) -end - function tmp_tbl_name(source::String) name, _ = replace(splitext(basename(source)), r"[ ()\[\]{}\\+,.]+" => "_") "t_$(name)" end -function create_tbl(con::DB, source::String; name::String = "", tmp::Bool = false) - check_file(source) - query = fmt_select(source; header = true, skip = 1) # FIXME: don't hardcode options - if length(name) == 0 +# TODO: support "CREATE OR REPLACE" & "IF NOT EXISTS" for all create_* functions + +function _create_tbl_impl(con::DB, query::String; name::String, tmp::Bool, show::Bool) + if (length(name) == 0) && !show tmp = true name = tmp_tbl_name(source) end - DBInterface.execute(con, "CREATE $(tmp ? "TEMP" : "") TABLE $name AS $query") - return name + + if length(name) > 0 + DBInterface.execute(con, "CREATE $(tmp ? "TEMP" : "") TABLE $name AS $query") + return show ? DF.DataFrame(DBInterface.execute(con, "SELECT * FROM $name")) : name + else # only show + res = DBInterface.execute(con, query) + return DF.DataFrame(res) + end end -function create_tbl_variant( +function create_tbl( + con::DB, + source::String; + name::String = "", + tmp::Bool = false, + show::Bool = false, +) + check_file(source) ? true : throw(FileNotFoundError(source)) + query = fmt_select(source; _read_opts...) + + return _create_tbl_impl(con, query; name = name, tmp = tmp, show = show) +end + +function create_tbl( con::DB, base_source::String, alt_source::String; @@ -133,21 +89,12 @@ function create_tbl_variant( cols::Vector{String}, fill::Union{Bool,Vector::Any} = true, tmp::Bool = false, + show::Bool = false, ) - check_file(alt_source) - if length(variant) == 0 - tmp = true - variant = tmp_tbl_name(alt_source) - end - # TODO: support "CREATE OR REPLACE" & "IF NOT EXISTS" - create_ = "CREATE $(tmp ? "TEMP" : "") TABLE $variant AS" - - # FIXME: don't hardcode options - subquery = sprintf("%s(%s)", reader(alt_source), fmt_opts(alt_source; header = true, skip = 1)) - query = fmt_join(base_source, subquery; on = on, cols = cols, fill = fill) + sources = [fmt_source(con, src) for src in (base_source, alt_source)] + query = fmt_join(sources...; on = on, cols = cols, fill = fill) - DBInterface.execute(con, "$(create_)\n$(query)") - variant + return _create_tbl_impl(con, query; name = variant, tmp = tmp, show = show) end end