Skip to content

Commit

Permalink
Replace LIMIT/OFFSET with Psycopg2 server-side cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
anthonyfok committed Oct 22, 2021
1 parent b0dfbf6 commit d3f2c80
Showing 1 changed file with 35 additions and 31 deletions.
66 changes: 35 additions & 31 deletions python/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import psycopg2
import json
import decimal
import re

from elasticsearch import Elasticsearch
from elasticsearch import helpers
Expand Down Expand Up @@ -61,7 +62,7 @@ def __init__(self, PostGISConnection, ESConnection, view, sqlquerystring):
self._pgConnection = PostGISConnection
self._esConnection = ESConnection
self._view = view
self._sqlquerystring = sqlquerystring
self._sqlquerystring = re.sub(r'\s\s+', ' ', sqlquerystring)
self._auth = get_config_params('config.ini')

def pgConnection(self):
Expand All @@ -79,12 +80,8 @@ def auth(self):
def sqlquerystring(self):
return self._sqlquerystring

def getGeoJson(self, sqlquerystring, pgConnection):
cur = pgConnection.pgConnection().cursor()
cur.execute(sqlquerystring)
rows = cur.fetchall()
def getGeoJson(self, rows, columns):
if rows:
columns = [name[0] for name in cur.description]
geomIndex = columns.index('st_asgeojson')
feature_collection = {'type': 'FeatureCollection',
'features': []}
Expand All @@ -101,6 +98,7 @@ def getGeoJson(self, sqlquerystring, pgConnection):
value = row[index]
feature['properties'][column] = value
feature_collection['features'].append(feature)

geojsonobject = json.dumps(feature_collection,
indent=2,
default=decimal_default)
Expand Down Expand Up @@ -142,33 +140,41 @@ def postgis2es(self):
sqlquerystring = self.sqlquerystring().format(
**{'limit': self.LIMIT,
'offset': self.OFFSET})
geojsonobject = self.getGeoJson(sqlquerystring, self.pgConnection())
while geojsonobject is not None:

print(sqlquerystring)
self.populateElasticSearchIndex(self.esConnection(),
geojsonobject,
self.auth(),
self.view())
self.OFFSET += self.LIMIT

sqlquerystring = self.sqlquerystring().format(
**{'limit': self.LIMIT,
'offset': self.OFFSET})
geojsonobject = self.getGeoJson(sqlquerystring,
self.pgConnection())

# Remove LIMIT and OFFSET until we decide to change all caller scripts
sqlquerystring = re.sub(r'\s+LIMIT.*', '', sqlquerystring)

print(sqlquerystring)

with self.pgConnection().pgConnection() as conn:
with conn.cursor(name='postgis2es_cursor') as cur:
cur.itersize = self.LIMIT
cur.execute(sqlquerystring)
rows = cur.fetchmany(self.LIMIT)
columns = [name[0] for name in cur.description]

count = 0
while rows:
count = count + 1
print("Rows %d-%d, %s = %s" %
(self.LIMIT * (count-1) + 1, self.LIMIT * count,
columns[0], rows[0][0]))

geojsonobject = self.getGeoJson(rows, columns)
# print("populateElasticsearchIndex()")
self.populateElasticSearchIndex(self.esConnection(),
geojsonobject,
self.auth(),
self.view())
rows = cur.fetchmany(self.LIMIT)

return


class PostGISPointDataset(PostGISdataset):

def getGeoJson(self, sqlquerystring, pgConnection):
cur = pgConnection.pgConnection().cursor()
cur.execute(sqlquerystring)
rows = cur.fetchall()
def getGeoJson(self, rows, columns):
if rows:
columns = [name[0] for name in cur.description]
geomIndex = columns.index('st_asgeojson')
feature_collection = {'type': 'FeatureCollection',
'features': []}
Expand All @@ -187,6 +193,7 @@ def getGeoJson(self, sqlquerystring, pgConnection):
value = row[index]
feature['properties'][column] = value
feature_collection['features'].append(feature)

geojsonobject = json.dumps(feature_collection,
indent=2,
default=decimal_default)
Expand All @@ -197,12 +204,8 @@ def getGeoJson(self, sqlquerystring, pgConnection):

class PostGISTable(PostGISdataset):

def getGeoJson(self, sqlquerystring, pgConnection):
cur = pgConnection.pgConnection().cursor()
cur.execute(sqlquerystring)
rows = cur.fetchall()
def getGeoJson(self, rows, columns):
if rows:
columns = [name[0] for name in cur.description]
# geomIndex = columns.index('st_asgeojson')
feature_collection = {'type': 'FeatureCollection',
'features': []}
Expand All @@ -221,6 +224,7 @@ def getGeoJson(self, sqlquerystring, pgConnection):
value = row[index]
feature['properties'][column] = value
feature_collection['features'].append(feature)

geojsonobject = json.dumps(feature_collection,
indent=2,
default=decimal_default)
Expand Down

0 comments on commit d3f2c80

Please sign in to comment.