The following Airflow Provider is composed of an Airflow Hook named KoboHook
and an Airflow Operator named KoboToGeoNodeOperator
.
The Kobo Hook
has the purpose of performing the connection and basic operations with the KoboToolBox service. Within these operations it is possible:
- Get the
formid
of a form from hisid_string
and vice versa - Get form structure
- Get metadata from a form
- Extract forms submissions
- More features are in development
The KoboToGeoNodeOperator
is an operator focused on those forms that have geospatial information that needs to be displayed in GeoNode. This operator allows to capture the information of each response, that a user makes in a given form, and add them to an existing dataset. If it is a new form, the Operator will create the new dataset with the first response.
For the mapping of columns and certain necessary and complementary information, the Operator uses a certain number of parameters which are defined when instantiating the Operator.
Installation of airflow_providers_geonode
is needed to use KoboToGeoNodeOperator
.
To use the KoboToGeoNodeOperator
you will first need to make some configurations:
In the Airflow UI three connections must be created:
To connect to the KoboToolBox service, a Connection ID
is required along with the Kobo Host
, Kobo CAT Host
(which has access to API v1) and API token
.
Note: It is important to note that the user generating the token must have access to the specific form in question.
It will be necessary to connect Airflow with the destination GeoNode database to which you want to migrate the form data. For this it will be necessary to specify the following parameters related to the database:
Connection Id
: Connection name in AirflowHost
: Database hostDatabase
: Database name (typically geonode_data)Login
: Database usernamePassword
: Database passwordPort
: Database port
It will be necessary to connect Airflow with the destination GeoNode to which you want to upload the form data. For this it will be necessary to specify the following parameters related to the GeoNode instance:
Connection Id
: Connection name in Airflow.Base URL
: URL/Host to GeoNode with https or http as appropiate.GeoNode username
: GeoNode app username (optional if GeoNode API Token has been placed and GeoServer credentials will be defined bellow).GeoNode password
: GeoNode app password (optional if GeoNode API Token has been placed and GeoServer credentials will be defined bellow).GeoNode API Token
: It is possible to work in GeoNode by authenticating with an API Token (optional if GeoNode username and password has been placed).GeoServer username
: GeoServer app username (Optional. If GeoServer credentials differ from GeoNode credentials you must to specify them. Otherwise, you can specify only the GeoNode credentials).GeoServer password
: GeoServer app password (Optional. If GeoServer credentials differ from GeoNode credentials you must to specify them. Otherwise, you can specify only the GeoNode credentials).SSH Host
: SSH host where the GeoNode instance is hosted.SSH port
: SSH port where the GeoNode instance is hosted.SSH username
: SSH username where the GeoNode instance is hosted.SSH password
: SSH password where the GeoNode instance is hosted.
To use the KoboToGeoNodeOperator
operator it is necessary to configure some required parameters and others will be optional.
The parameters can be defined in the body of the DAG, when instantiating the Operator, as is normally done:
process_form = KoboToGeoNodeOperator(
task_id="process_form",
kobo_conn_id="kobo_connection_id_airflow",
formid=39, # This or
form_id_string="y4nMrHxChj3XZEQLVDwQIN", # This
postgres_conn_id="postgres_connection_id_airflow",
geonode_conn_id="geonode_connection_id_airflow",
dataset_name= "dataset_name_in_geonode", # Optional
columns=[ # Optional
"username",
"column_1",
"column_2",
"column_3",
"column_4",
"column_5",
"my_coords_column"
],
mode="append/replace", # Optional
dag=dag
)
kobo_conn_id
Required: The name of the Kobo connection previously created in the Airflow UI.formid
Required: ID of the Kobo form from which you want to extract the information. Instead of this parameter the following one can be defined (form_id_string).form_id_string
Required: ID string the Kobo form from which you want to extract the information. Instead of this parameter the previous one can be defined (formid).postgres_conn_id
Required: The name of the connection to the target database (GeoNode) previously created in the Airflow UI.geonode_conn_id
Required: The name of the GeoNode connection previously created in the Airflow UI.dataset_name
Optional: The name that the dataset will have (or has, if it already exists) where you want to create/add the form information. Check that name has not special characters, otherwise these will be converted to the closest equivalent in ASCII system. If it not defined, the dataset will take the form name.columns
Optional: A list or array with the name of the columns to migrate. If this parameter is not defined, all columns of the form (including metadata) will be migrated to GeoNode.mode
Optional: Specifies whether the data to be inserted will be added to the existing data (append
) or whether it will overwrite the old data (replace
). By default (If it not defined) data will be append to the previous data.
A DAG template of how the KoboToGeoNodeOperator
could be used is provided inside the "example-dag" folder.
Then the DAG would look something like this:
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
from kobo_provider.operators.kobotoolbox import KoboToGeoNodeOperator
default_args = {
'owner': 'KAN',
'depends_on_past': False,
'start_date': days_ago(1),
'retry_delay': timedelta(minutes=3),
'email_on_failure': False,
'email_on_retry': False,
}
dag = DAG(
"KoboToGeoNodeOperator-ExampleDAG",
default_args=default_args,
description='ETL Kobo-GeoNode',
schedule_interval=None,
tags=['KoboToolBox', 'GeoNode']
)
start = EmptyOperator(
task_id='start',
dag=dag
)
process_form = KoboToGeoNodeOperator(
task_id="process_form",
kobo_conn_id="kobo_connection_id_airflow",
formid=39, # This or
form_id_string="y4nMrHxChj3XZEQLVDwQIN", # This
postgres_conn_id="postgres_connection_id_airflow",
geonode_conn_id="geonode_connection_id_airflow",
dataset_name= "dataset_name_in_geonode", # Optional
columns=[ # Optional
"username",
"column_1",
"column_2",
"column_3",
"column_4",
"column_5",
"my_coords_column"
],
mode="replace", # Optional
dag=dag
)
end = EmptyOperator(
task_id='end',
dag=dag
)
start >> process_form >> end
You can also take the already made DAG from the mentioned folder.
© 2024 KAN Territory & IT. All rights reserved.