Skip to content

Commit

Permalink
Merge pull request #515 from duckdb/f-514-cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
krlmlr authored Oct 21, 2024
2 parents f3e79c3 + 69aeedc commit 007c6e4
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 21 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: duckdb
Title: DBI Package for the DuckDB Database Management System
Version: 1.1.1.9000
Version: 1.1.1.9001
Authors@R: c(
person("Hannes", "Mühleisen", , "[email protected]", role = "aut",
comment = c(ORCID = "0000-0001-8552-0029")),
Expand Down
4 changes: 4 additions & 0 deletions inst/include/cpp11/R.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@
#endif
#endif

#ifndef R_NO_REMAP
#define R_NO_REMAP
#endif
#ifndef STRICT_R_HEADERS
#define STRICT_R_HEADERS
#endif
#include "R_ext/Boolean.h"
#include "Rinternals.h"
#include "Rversion.h"
Expand Down
2 changes: 1 addition & 1 deletion src/Makevars
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ include Makevars.duckdb

CXX_STD = CXX17
PKG_CPPFLAGS = -Iinclude -I../inst/include -DDUCKDB_DISABLE_PRINT -DDUCKDB_R_BUILD -DBROTLI_ENCODER_CLEANUP_ON_OOM -Iduckdb/src/include -Iduckdb/third_party/concurrentqueue -Iduckdb/third_party/fast_float -Iduckdb/third_party/fastpforlib -Iduckdb/third_party/fmt/include -Iduckdb/third_party/fsst -Iduckdb/third_party/httplib -Iduckdb/third_party/hyperloglog -Iduckdb/third_party/jaro_winkler -Iduckdb/third_party/jaro_winkler/details -Iduckdb/third_party/libpg_query -Iduckdb/third_party/libpg_query/include -Iduckdb/third_party/lz4 -Iduckdb/third_party/brotli/include -Iduckdb/third_party/brotli/common -Iduckdb/third_party/brotli/dec -Iduckdb/third_party/brotli/enc -Iduckdb/third_party/mbedtls -Iduckdb/third_party/mbedtls/include -Iduckdb/third_party/mbedtls/library -Iduckdb/third_party/miniz -Iduckdb/third_party/pcg -Iduckdb/third_party/re2 -Iduckdb/third_party/skiplist -Iduckdb/third_party/tdigest -Iduckdb/third_party/utf8proc -Iduckdb/third_party/utf8proc/include -Iduckdb/third_party/yyjson/include -Iduckdb/extension/parquet/include -Iduckdb/third_party/parquet -Iduckdb/third_party/thrift -Iduckdb/third_party/lz4 -Iduckdb/third_party/brotli/include -Iduckdb/third_party/brotli/common -Iduckdb/third_party/brotli/dec -Iduckdb/third_party/brotli/enc -Iduckdb/third_party/snappy -Iduckdb/third_party/zstd/include -Iduckdb/third_party/mbedtls -Iduckdb/third_party/mbedtls/include -I../inst/include -Iduckdb -DDUCKDB_EXTENSION_PARQUET_LINKED -DDUCKDB_BUILD_LIBRARY
OBJECTS=rfuns.o database.o connection.o statement.o register.o relational.o scan.o transform.o utils.o reltoaltrep.o types.o cpp11.o $(SOURCES)
OBJECTS=rfuns.o database.o connection.o statement.o register.o relational.o scan.o signal.o transform.o utils.o reltoaltrep.o types.o cpp11.o $(SOURCES)
2 changes: 1 addition & 1 deletion src/Makevars.in
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ include Makevars.duckdb

CXX_STD = CXX17
PKG_CPPFLAGS = -Iinclude -I../inst/include -DDUCKDB_DISABLE_PRINT -DDUCKDB_R_BUILD -DBROTLI_ENCODER_CLEANUP_ON_OOM {{ INCLUDES }}
OBJECTS=rfuns.o database.o connection.o statement.o register.o relational.o scan.o transform.o utils.o reltoaltrep.o types.o cpp11.o $(SOURCES)
OBJECTS=rfuns.o database.o connection.o statement.o register.o relational.o scan.o signal.o transform.o utils.o reltoaltrep.o types.o cpp11.o $(SOURCES)
PKG_LIBS={{ LINK_FLAGS }}
2 changes: 1 addition & 1 deletion src/Makevars.win
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ include Makevars.duckdb

CXX_STD = CXX17
PKG_CPPFLAGS = -Iinclude -I../inst/include -DDUCKDB_DISABLE_PRINT -DDUCKDB_R_BUILD -DBROTLI_ENCODER_CLEANUP_ON_OOM -Iduckdb/src/include -Iduckdb/third_party/concurrentqueue -Iduckdb/third_party/fast_float -Iduckdb/third_party/fastpforlib -Iduckdb/third_party/fmt/include -Iduckdb/third_party/fsst -Iduckdb/third_party/httplib -Iduckdb/third_party/hyperloglog -Iduckdb/third_party/jaro_winkler -Iduckdb/third_party/jaro_winkler/details -Iduckdb/third_party/libpg_query -Iduckdb/third_party/libpg_query/include -Iduckdb/third_party/lz4 -Iduckdb/third_party/brotli/include -Iduckdb/third_party/brotli/common -Iduckdb/third_party/brotli/dec -Iduckdb/third_party/brotli/enc -Iduckdb/third_party/mbedtls -Iduckdb/third_party/mbedtls/include -Iduckdb/third_party/mbedtls/library -Iduckdb/third_party/miniz -Iduckdb/third_party/pcg -Iduckdb/third_party/re2 -Iduckdb/third_party/skiplist -Iduckdb/third_party/tdigest -Iduckdb/third_party/utf8proc -Iduckdb/third_party/utf8proc/include -Iduckdb/third_party/yyjson/include -Iduckdb/extension/parquet/include -Iduckdb/third_party/parquet -Iduckdb/third_party/thrift -Iduckdb/third_party/lz4 -Iduckdb/third_party/brotli/include -Iduckdb/third_party/brotli/common -Iduckdb/third_party/brotli/dec -Iduckdb/third_party/brotli/enc -Iduckdb/third_party/snappy -Iduckdb/third_party/zstd/include -Iduckdb/third_party/mbedtls -Iduckdb/third_party/mbedtls/include -I../inst/include -Iduckdb -DDUCKDB_EXTENSION_PARQUET_LINKED -DDUCKDB_BUILD_LIBRARY -DDUCKDB_PLATFORM_RTOOLS=1
OBJECTS=rfuns.o database.o connection.o statement.o register.o relational.o scan.o transform.o utils.o reltoaltrep.o types.o cpp11.o $(SOURCES)
OBJECTS=rfuns.o database.o connection.o statement.o register.o relational.o scan.o signal.o transform.o utils.o reltoaltrep.o types.o cpp11.o $(SOURCES)
PKG_LIBS=-lws2_32 -L. -lrstrtmgr
41 changes: 41 additions & 0 deletions src/include/signal.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include <csignal>

#include "duckdb/common/shared_ptr.hpp"
#include "duckdb/main/client_context.hpp"

// Toy repo: https://github.com/krlmlr/cancel.test

namespace duckdb {

class ScopedInterruptHandler {
private:
shared_ptr<ClientContext> context;
bool interrupted = false;

// oldhandler stores the old signal handler
// so that it can be restored when the object is destroyed
typedef void (*sig_t)(int);
sig_t oldhandler;

static ScopedInterruptHandler *instance;

private:
ScopedInterruptHandler() = delete;
ScopedInterruptHandler(const ScopedInterruptHandler &) = delete;
ScopedInterruptHandler &operator=(const ScopedInterruptHandler &) = delete;
ScopedInterruptHandler(ScopedInterruptHandler &&) = delete;

public:
ScopedInterruptHandler(shared_ptr<ClientContext> context_);
~ScopedInterruptHandler();

bool HandleInterrupt() const;
void Disable();

private:
static void signal_handler(int signum);
};

};
13 changes: 12 additions & 1 deletion src/relational.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "cpp11.hpp"
#include "duckdb.hpp"
#include "signal.hpp"
#include "typesr.hpp"
#include "rapi.hpp"

Expand Down Expand Up @@ -431,7 +432,17 @@ static SEXP result_to_df(duckdb::unique_ptr<QueryResult> res) {
}

[[cpp11::register]] SEXP rapi_rel_to_df(duckdb::rel_extptr_t rel) {
return result_to_df(rel->rel->Execute());
ScopedInterruptHandler signal_handler(rel->rel->context.GetContext());

auto res = rel->rel->Execute();

if (signal_handler.HandleInterrupt()) {
return R_NilValue;
}

signal_handler.Disable();

return result_to_df(std::move(res));
}

[[cpp11::register]] std::string rapi_rel_tostring(duckdb::rel_extptr_t rel) {
Expand Down
17 changes: 17 additions & 0 deletions src/reltoaltrep.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#define __STDC_FORMAT_MACROS

#include "httplib.hpp"
#include "rapi.hpp"
#include "typesr.hpp"
#include "reltoaltrep.hpp"
#include "signal.hpp"
#include "cpp11/declarations.hpp"

#include <cinttypes>
Expand Down Expand Up @@ -97,16 +99,31 @@ struct AltrepRelationWrapper {
Rprintf("materializing:\n%s\n", rel->ToString().c_str());
}

ScopedInterruptHandler signal_handler(rel->context.GetContext());

// We need to temporarily allow a deeper execution stack
// https://github.com/duckdb/duckdb-r/issues/101
auto old_depth = rel->context.GetContext()->config.max_expression_depth;
rel->context.GetContext()->config.max_expression_depth = old_depth * 2;
duckdb_httplib::detail::scope_exit reset_max_expression_depth([&]() {
rel->context.GetContext()->config.max_expression_depth = old_depth;
});

res = rel->Execute();

// FIXME: Use std::experimental::scope_exit
if (rel->context.GetContext()->config.max_expression_depth != old_depth * 2) {
Rprintf("Internal error: max_expression_depth was changed from %" PRIu64 " to %" PRIu64 "\n", old_depth * 2,
rel->context.GetContext()->config.max_expression_depth);
}
rel->context.GetContext()->config.max_expression_depth = old_depth;
reset_max_expression_depth.release();

if (signal_handler.HandleInterrupt()) {
cpp11::stop("Query execution was interrupted");
}

signal_handler.Disable();

if (res->HasError()) {
cpp11::stop("Error evaluating duckdb query: %s", res->GetError().c_str());
Expand Down
63 changes: 63 additions & 0 deletions src/signal.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#include "signal.hpp"

#include "cpp11/R.hpp"
#include "cpp11/function.hpp"
#include "cpp11/protect.hpp" // for safe
#include "duckdb/common/exception.hpp"

#include <R_ext/GraphicsEngine.h>

// Toy repo: https://github.com/krlmlr/cancel.test

namespace duckdb {

ScopedInterruptHandler *ScopedInterruptHandler::instance = nullptr;

ScopedInterruptHandler::ScopedInterruptHandler(shared_ptr<ClientContext> context_) : context(context_) {
if (instance) {
throw InternalException("ScopedInterruptHandler already active");
}
if (context) {
instance = this;
oldhandler = std::signal(SIGINT, ScopedInterruptHandler::signal_handler);
}
}

ScopedInterruptHandler::~ScopedInterruptHandler() {
Disable();
instance = nullptr;
}

bool ScopedInterruptHandler::HandleInterrupt() const {
// Never interrupted without context
if (!interrupted) {
return false;
} else {
D_ASSERT(context);
}

// This seems necessary to work around a specificity with the RStudio IDE on Windows.
// Without the message, the interrupt is not available as a catchable condition.
// https://github.com/krlmlr/cancel.test/issues/1
cpp11::message("");

// FIXME: Is this equivalent to cpp11::safe[Rf_onintrNoResume](), or worse?
cpp11::safe[Rf_onintr]();
return true;
}

void ScopedInterruptHandler::Disable() {
if (context) {
std::signal(SIGINT, oldhandler);
context.reset();
}
}

void ScopedInterruptHandler::signal_handler(int signum) {
if (instance) {
instance->interrupted = true;
instance->context->Interrupt();
}
}

}; // namespace duckdb
32 changes: 16 additions & 16 deletions src/statement.cpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
#include "rapi.hpp"
#include "typesr.hpp"

#include <R_ext/Utils.h>

#include "duckdb/common/arrow/arrow.hpp"
#include "duckdb/common/arrow/arrow_converter.hpp"
#include "duckdb/common/arrow/arrow_util.hpp"
#include "duckdb/common/types/timestamp.hpp"
#include "duckdb/common/arrow/arrow_wrapper.hpp"
#include "duckdb/common/arrow/result_arrow_wrapper.hpp"
#include "duckdb/common/types/timestamp.hpp"
#include "duckdb/main/chunk_scan_state/query_result.hpp"

#include "duckdb/parser/statement/relation_statement.hpp"
#include "rapi.hpp"
#include "signal.hpp"
#include "typesr.hpp"

#include <R_ext/Utils.h>

using namespace duckdb;
using namespace cpp11::literals;
Expand Down Expand Up @@ -338,16 +337,17 @@ bool FetchArrowChunk(ChunkScanState &scan_state, ClientProperties options, Appen
if (!stmt || !stmt.get() || !stmt->stmt) {
cpp11::stop("rapi_execute: Invalid statement");
}
auto pending_query = stmt->stmt->PendingQuery(stmt->parameters, arrow);
duckdb::PendingExecutionResult execution_result;
do {
execution_result = pending_query->ExecuteTask();
R_CheckUserInterrupt();
} while (!PendingQueryResult::IsResultReady(execution_result));
if (execution_result == PendingExecutionResult::EXECUTION_ERROR) {
cpp11::stop("rapi_execute: Failed to run query\nError: %s", pending_query->GetError().c_str());

ScopedInterruptHandler signal_handler(stmt->stmt->context);

auto generic_result = stmt->stmt->Execute(stmt->parameters, false);

if (signal_handler.HandleInterrupt()) {
return R_NilValue;
}
auto generic_result = pending_query->Execute();

signal_handler.Disable();

if (generic_result->HasError()) {
cpp11::stop("rapi_execute: Failed to run query\nError: %s", generic_result->GetError().c_str());
}
Expand Down
43 changes: 43 additions & 0 deletions tests/testthat/test-signal.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
test_that("long-running queries can be cancelled", {
skip_if_not_installed("callr")
# Skip on Windows for R < 4.4, the signal doesn't seem to make it through
# (but works for the toy repository)
skip_if(getRversion() < "4.4.0" && .Platform$OS.type == "windows")

r_session <- callr::r_session$new()

r_session$run(function() {
.GlobalEnv$con <- DBI::dbConnect(duckdb::duckdb())
DBI::dbExecute(.GlobalEnv$con, "CREATE TABLE data AS SELECT unnest(generate_series(1, 100000)) AS a")
})

r_session$call(function() {
.GlobalEnv$interrupted <- FALSE
tryCatch(
DBI::dbGetQuery(.GlobalEnv$con, "SELECT COUNT(*) FROM data JOIN data AS data2 ON data.a != data2.a"),
interrupt = function(e) {
.GlobalEnv$interrupted <- TRUE
}
)
})

start_time <- Sys.time()

Sys.sleep(0.2)
expect_equal(r_session$get_state(), "busy")
polled <- r_session$poll_process(200)
expect_equal(polled, "timeout")
r_session$interrupt()
polled <- r_session$poll_process(200)
expect_equal(polled, "ready")
expect_equal(r_session$read()$code, 200)
expect_equal(r_session$get_state(), "idle")

expect_true(r_session$run(function() .GlobalEnv$interrupted))

end_time <- Sys.time()
expect_lt(end_time - start_time, 1)

r_session$run(function() DBI::dbDisconnect(.GlobalEnv$con))
r_session$close()
})

0 comments on commit 007c6e4

Please sign in to comment.