"""
This module contains a connector for UK postcode lookup.
Run as module with --import <csv file> to import a postcode database CSV.
"""
import csv
import sys
import typing
from django.http import JsonResponse
from decouple import config
import sqlalchemy
from sqlalchemy.exc import NoSuchTableError
import sqlalchemy.orm
from .base import DataSetConnector
[docs]class OnsPostcodeDirectoryConnector(DataSetConnector):
"""
Connector for UK postcode lookup, backed by an SQL table.
"""
_table_name = 'connector_postcode'
def __init__(self, location: str,
api_key: typing.Optional[str] = None,
auth: typing.Optional[typing.Callable] = None):
super().__init__(location, api_key=api_key, auth=auth)
self._engine = sqlalchemy.create_engine(config('DATABASE_URL'))
self._session_maker = sqlalchemy.orm.sessionmaker(bind=self._engine)
try:
self._table_meta = sqlalchemy.MetaData(self._engine)
self._table = sqlalchemy.Table(self._table_name, self._table_meta, autoload=True)
except NoSuchTableError as exc:
raise FileNotFoundError('Postcode table is not present') from exc
[docs] def get_response(self,
params: typing.Optional[typing.Mapping[str, str]] = None):
if params is None or 'postcode' not in params:
return JsonResponse({
'status': 'fail',
'data': {
'postcode': 'Field \'postcode\' is a required field',
},
}, status=400)
query = sqlalchemy.select(
[self._table]
).where(self._table.c.postcode == params['postcode'].replace(' ', '').upper())
result = self._session_maker().execute(query).fetchone()
try:
return JsonResponse(dict(result), json_dumps_params={'default': str})
except TypeError:
# Did not return a valid result
return JsonResponse({
'status': 'fail',
'data': {
'postcode': 'No record matching postcode \'{0}\' found'.format(params['postcode']),
},
}, status=404)
[docs] @classmethod
def setup(cls, filename):
engine = sqlalchemy.create_engine(config('DATABASE_URL'))
metadata = sqlalchemy.MetaData(engine)
postcodes = sqlalchemy.Table(
cls._table_name, metadata,
sqlalchemy.Column('postcode', sqlalchemy.String(length=10), index=True, nullable=False, primary_key=True),
sqlalchemy.Column('lat', sqlalchemy.Float, nullable=False),
sqlalchemy.Column('long', sqlalchemy.Float, nullable=False)
)
try:
postcodes.create()
except sqlalchemy.exc.OperationalError:
pass
conn = engine.connect()
with open(filename, 'r') as csvfile:
reader = csv.DictReader(csvfile)
# TODO this fails if any row already exists - but checking each row in turn is slow - find solution
conn.execute(postcodes.insert(), [
{'postcode': row['pcds'].replace(' ', '').upper(),
'lat': row['lat'],
'long': row['long']} for row in reader
])
if __name__ == '__main__':
if len(sys.argv) == 3 and sys.argv[1] == '--import':
OnsPostcodeDirectoryConnector.setup(sys.argv[2])
else:
print(__doc__)