Skip to content

Commit

Permalink
pipeline: refactor such that user can return the DF for debugging
Browse files Browse the repository at this point in the history
- refactor all read_* create_* into one create_tbl function that can
  optionally return the table as a dataframe for easier debugging
- use function dispatch to simplify implementation (thanks @clizbe!)
- custom exceptions for cleaner user feedback

Fixes: #14, #18
  • Loading branch information
suvayu committed Mar 15, 2024
1 parent 8c8a22f commit 5fe7799
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 99 deletions.
1 change: 1 addition & 0 deletions src/TulipaIO.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module TulipaIO

include("exceptions.jl")
include("parsers.jl")
include("pipeline.jl")

Expand Down
45 changes: 45 additions & 0 deletions src/exceptions.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import DuckDB: DB

struct FileNotFoundError <: Exception
file::String
msg::String
function FileNotFoundError(file)
if ispath(file)
new(file, "$(file): exists, but not a regular file")
else
new(file, "$(file): file not found")
end
end
end

struct DirectoryNotFoundError <: Exception
dir::String
msg::String
function DirectoryNotFoundError(dir)
if ispath(dir)
new(dir, "$(dir): exists, but not a directory")
else
new(dir, "$(dir): directory not found")
end
end
end

struct TableNotFoundError <: Exception
con::DB
tbl::String
msg::String
TableNotFoundError(con, tbl) = new(con, tbl, "$(tbl): table not found in $(con)")
end

struct NeitherTableNorFileError <: Exception
con::DB
src::String
msg::String
NeitherTableNorFileError(con, src, msg) =
new(con, src, "$(src): neither table ($con) nor file found")
end

Base.showerror(io::IO, exc::FileNotFoundError) = print(io, exc.msg)
Base.showerror(io::IO, exc::DirectoryNotFoundError) = print(io, exc.msg)
Base.showerror(io::IO, exc::TableNotFoundError) = print(io, exc.msg)
Base.showerror(io::IO, exc::NeitherTableNorFileError) = print(io, exc.msg)
62 changes: 62 additions & 0 deletions src/fmtsql.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
module FmtSQL

function sprintf(fmt::String, args...)
format(Format(fmt), args...)
end

function fmt_opts(source::String; opts...)
_src = '?' in source ? "$source" : "'$(source)'"
join(["$(_src)"; [join(p, "=") for p in opts]], ", ")
end

function reader(source::String)
_, ext = splitext(source)
if ext in (".csv", ".parquet", ".json")
return "read_$(ext[2:end])_auto"
elseif '?' in source
# FIXME: how to support other file formats?
return "read_csv_auto"
else
error("$(ext[2:end]): unsupported input file '$(source)'")
end
end

function fmt_read(source::String, opts...)
sprintf("%s(%s)", reader(source), fmt_opts(source; opts...))
end

function fmt_select(source::String; opts...)
sprintf("SELECT * FROM %s", fmt_read(source; opts...))
end

function fmt_join(
from_subquery::String,
join_subquery::String;
on::Vector{String},
cols::Vector{String},
fill::Union{Bool,Vector::Any},
)
exclude = join(cols, ", ")
if fill # back fill
# e.g.: IFNULL(t2.investable, t1.investable) AS investable
include = join(map(c -> "IFNULL(t2.$c, t1.$c) AS $c", cols), ", ")
elseif !fill # explicit missing
include = join(map(c -> "t2.$c", cols), ", ")
else # fill with default
if length(fill) != length(cols)
msg = "number of default values does not match columns\n"
msg = msg * "columns: $cols\n"
msg = msg * "defaults: $fill"
error(msg)
end
include = join(map((c, f) -> "IFNULL(t2.$c, $f) AS $c", zip(cols, fill)), ", ")
end
select_ = "SELECT t1.* EXCLUDE ($exclude), $include"

join_on = join(map(c -> "t1.$c = t2.$c", on), " AND ")
from_ = "FROM $from_subquery t1 LEFT JOIN $join_subquery t2 ON ($join_on)"

"$(select_)\n$(from_)"
end

end # module FmtSQL
145 changes: 46 additions & 99 deletions src/pipeline.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,66 +2,29 @@ using DataFrames: DataFrames as DF
using DuckDB: DB, DBInterface, Stmt
using Printf: format, Format

function sprintf(fmt::String, args...)
format(Format(fmt), args...)
end
using .FmtSQL: fmt_join, fmt_read, fmt_select

function fmt_opts(source::String; opts...)
_src = '?' in source ? "$source" : "'$(source)'"
join(["$(_src)"; [join(p, "=") for p in opts]], ", ")
end
# default options (for now)
_read_opts = pairs((header = true, skip = 1))

function reader(source::String)
_, ext = splitext(source)
if ext in (".csv", ".parquet", ".json")
return "read_$(ext[2:end])_auto"
elseif '?' in source
# FIXME: how to support other file formats?
return "read_csv_auto"
else
error("$(ext[2:end]): unsupported input file '$(source)'")
end
function check_file(source::String)
# FIXME: handle globs
isfile(source)
end

function fmt_select(source::String; opts...)
sprintf("SELECT * FROM %s(%s)", reader(source), fmt_opts(source; opts...))
function check_tbl(con::DB, source::String)
tbls = DBInterface.execute(con, "SHOW TABLES").tbl[:name]
source in tbls
end

function fmt_join(
from_subquery::String,
join_subquery::String;
on::Vector{String},
cols::Vector{String},
fill::Union{Bool,Vector::Any},
)
exclude = join(cols, ", ")
if fill # back fill
# e.g.: IFNULL(t2.investable, t1.investable) AS investable
include = join(map(c -> "IFNULL(t2.$c, t1.$c) AS $c", cols), ", ")
else if !fill # explicit missing
include = join(map(c -> "t2.$c", cols), ", ")
else # fill with default
if length(fill) != length(cols)
msg = "number of default values does not match columns\n"
msg = msg * "columns: $cols\n"
msg = msg * "defaults: $fill"
error(msg)
end
include = join(map((c, f) -> "IFNULL(t2.$c, $f) AS $c", zip(cols, fill)), ", ")
function fmt_source(con::DB, source::String)
if check_tbl(source)
return base_source
elseif check_file(base_source)
return fmt_read(base_source, _read_opts...)
else
throw(NeitherTableNorFileError(con, base_source))
end
select_ = "SELECT t1.* EXCLUDE ($exclude), $include"

join_on = join(map(c -> "t1.$c = t2.$c", on), " AND ")
from_ = "FROM $from_subquery t1 LEFT JOIN $join_subquery t2 ON ($join_on)"

"$(select_)\n$(from_)"
end

# TODO: check_file -> check_source: file, file glob, table name
function check_file(source::String)
# FIXME: handle globs
isfile(source) || throw(ArgumentError("$(source): is not a regular file"))
source
end

## User facing functions below
Expand All @@ -82,49 +45,42 @@ end
Store() = Store(":memory:")
DEFAULT = Store()

function read_file(con::DB, source::String)
check_file(source)
query = fmt_select(source; header = true, skip = 1) # FIXME: don't hardcode options
res = DBInterface.execute(con, query)
return DF.DataFrame(res)
end

function read_file_replace_cols(
con::DB,
source1::String,
source2::String;
on::Vector{String},
cols::Vector{String},
fill::Union{Bool,Vector::Any} = true,
)
check_file(source1)
check_file(source2)
sources = [ # FIXME: don't hardcode options
sprintf("%s(%s)", reader(src), fmt_opts(src; header = true, skip = 1)) for
src in (source1, source2)
]
query = fmt_join(sources...; on = on, cols = cols, fill = fill)
res = DBInterface.execute(con, query)
return DF.DataFrame(res)
end

function tmp_tbl_name(source::String)
name, _ = replace(splitext(basename(source)), r"[ ()\[\]{}\\+,.]+" => "_")
"t_$(name)"
end

function create_tbl(con::DB, source::String; name::String = "", tmp::Bool = false)
check_file(source)
query = fmt_select(source; header = true, skip = 1) # FIXME: don't hardcode options
if length(name) == 0
# TODO: support "CREATE OR REPLACE" & "IF NOT EXISTS" for all create_* functions

function _create_tbl_impl(con::DB, query::String; name::String, tmp::Bool, show::Bool)
if (length(name) == 0) && !show
tmp = true
name = tmp_tbl_name(source)
end
DBInterface.execute(con, "CREATE $(tmp ? "TEMP" : "") TABLE $name AS $query")
return name

if length(name) > 0
DBInterface.execute(con, "CREATE $(tmp ? "TEMP" : "") TABLE $name AS $query")
return show ? DF.DataFrame(DBInterface.execute(con, "SELECT * FROM $name")) : name
else # only show
res = DBInterface.execute(con, query)
return DF.DataFrame(res)
end
end

function create_tbl_variant(
function create_tbl(
con::DB,
source::String;
name::String = "",
tmp::Bool = false,
show::Bool = false,
)
check_file(source) ? true : throw(FileNotFoundError(source))
query = fmt_select(source; _read_opts...)

return _create_tbl_impl(con, query; name = name, tmp = tmp, show = show)
end

function create_tbl(
con::DB,
base_source::String,
alt_source::String;
Expand All @@ -133,21 +89,12 @@ function create_tbl_variant(
cols::Vector{String},
fill::Union{Bool,Vector::Any} = true,
tmp::Bool = false,
show::Bool = false,
)
check_file(alt_source)
if length(variant) == 0
tmp = true
variant = tmp_tbl_name(alt_source)
end
# TODO: support "CREATE OR REPLACE" & "IF NOT EXISTS"
create_ = "CREATE $(tmp ? "TEMP" : "") TABLE $variant AS"

# FIXME: don't hardcode options
subquery = sprintf("%s(%s)", reader(alt_source), fmt_opts(alt_source; header = true, skip = 1))
query = fmt_join(base_source, subquery; on = on, cols = cols, fill = fill)
sources = [fmt_source(con, src) for src in (base_source, alt_source)]
query = fmt_join(sources...; on = on, cols = cols, fill = fill)

DBInterface.execute(con, "$(create_)\n$(query)")
variant
return _create_tbl_impl(con, query; name = variant, tmp = tmp, show = show)
end

end
Expand Down

0 comments on commit 5fe7799

Please sign in to comment.