Source code for loci.io

import pandas as pd
from shapely.geometry import Point
import geopandas as gpd
import math
import osmnx
import requests
from io import BytesIO
from zipfile import ZipFile


[docs]def read_poi_csv(input_file, col_id='id', col_name='name', col_lon='lon', col_lat='lat', col_kwds='kwds', col_sep=';', kwds_sep=',', source_crs='EPSG:4326', target_crs='EPSG:4326', keep_other_cols=False): """Creates a POI GeoDataFrame from an input CSV file. Args: input_file (string): Path to the input csv file. col_id (string): Name of the column containing the POI id (default: `id`). col_name (string): Name of the column containing the POI name (default: `name`). col_lon (string): Name of the column containing the POI longitude (default: `lon`). col_lat (string): Name of the column containing the POI latitude (default: `lat`). col_kwds (string): Name of the column containing the POI keywords (default: `kwds`). col_sep (string): Column delimiter (default: `;`). kwds_sep (string): Keywords delimiter (default: `,`). source_crs (string): Coordinate Reference System of input data (default: `EPSG:4326`). target_crs (string): Coordinate Reference System of the GeoDataFrame to be created (default: `EPSG:4326`). keep_other_cols (bool): Whether to keep the rest of the columns in the csv file (default: `False`). Returns: A POI GeoDataFrame with columns `id`, `name` and `kwds`. """ def lon_lat_to_point(row, c_lon, c_lat): try: x_lon = float(row[c_lon]) y_lat = float(row[c_lat]) if math.isnan(x_lon) is False and math.isnan(y_lat) is False: return Point(x_lon, y_lat) else: return float('NaN') except: return float('NaN') pois = pd.read_csv(input_file, delimiter=col_sep, error_bad_lines=False) init_poi_size = pois.index.size columns = list(pois) subset_cols = [] # Columns to Check for N/A, Nulls if keep_other_cols: subset_cols.extend(columns) else: subset_cols = [col_id, col_lon, col_lat] if col_name in columns: subset_cols.append(col_name) if col_kwds in columns: subset_cols.append(col_kwds) # Geometry Column(Uncleaned) pois['geometry'] = pois.apply(lambda row: lon_lat_to_point(row, col_lon, col_lat), axis=1) subset_cols.append('geometry') # Drop Columns Not in subset Columns. drop_columns = set(columns) - set(subset_cols) pois.drop(drop_columns, inplace=True, axis=1) # Drop all N/A, Null rows from DataFrame. pois.dropna(inplace=True) if init_poi_size - pois.index.size > 0: print("Skipped", (init_poi_size - pois.index.size), "rows due to errors.") if col_kwds in columns: pois[col_kwds] = pois[col_kwds].map(lambda s: s.split(kwds_sep)) source_crs = {'init': source_crs} target_crs = {'init': target_crs} pois = gpd.GeoDataFrame(pois, crs=source_crs, geometry=pois['geometry']).to_crs(target_crs).drop(columns=[col_lon, col_lat]) print('Loaded ' + str(len(pois.index)) + ' POIs.') return pois
[docs]def import_osmnx(bound, target_crs='EPSG:4326'): """Creates a POI GeoDataFrame from POIs retrieved by OSMNX (https://github.com/gboeing/osmnx). Args: bound (polygon): A polygon to be used as filter. target_crs (string): Coordinate Reference System of the GeoDataFrame to be created (default: `EPSG:4326`). Returns: A POI GeoDataFrame with columns `id`, `name` and `kwds`. """ # retrieve pois pois = osmnx.pois.pois_from_polygon(bound) if len(pois.index) > 0: # filter pois pois = pois[pois.amenity.notnull()] pois_filter = pois.element_type == 'node' pois = pois[pois_filter] # restructure gdf subset_cols = ['osmid', 'amenity', 'name', 'geometry'] columns = list(pois) drop_columns = set(columns) - set(subset_cols) pois.drop(drop_columns, inplace=True, axis=1) pois = pois.reset_index(drop=True) pois = pois.rename(columns={'osmid': 'id', 'amenity': 'kwds'}) pois['kwds'] = pois['kwds'].map(lambda s: [s]) if target_crs != 'EPSG:4326': target_crs = {'init': target_crs} pois = pois.to_crs(target_crs) print('Loaded ' + str(len(pois.index)) + ' POIs.') return pois
[docs]def import_osmwrangle(osmwrangle_file, target_crs='EPSG:4326', bound=None): """Creates a POI GeoDataFrame from a file produced by OSMWrangle (https://github.com/SLIPO-EU/OSMWrangle). Args: osmwrangle_file (string): Path or URL to the input csv file. target_crs (string): Coordinate Reference System of the GeoDataFrame to be created (default: `EPSG:4326`). bound (polygon): A polygon to be used as filter. Returns: A POI GeoDataFrame with columns `id`, `name` and `kwds`. """ def lon_lat_to_point(row, c_lon, c_lat): x_lon = float(row[c_lon]) y_lat = float(row[c_lat]) if math.isnan(x_lon) is False and math.isnan(y_lat) is False: return Point(x_lon, y_lat) else: return float('NaN') col_sep = '|' col_id = 'ID' col_lon = 'LON' col_lat = 'LAT' col_name = 'NAME' col_cat = 'CATEGORY' col_subcat = 'SUBCATEGORY' source_crs = {'init': 'EPSG:4326'} # Load the file if osmwrangle_file.startswith('http') and osmwrangle_file.endswith('.zip'): response = requests.get(osmwrangle_file) zip_file = ZipFile(BytesIO(response.content)) with zip_file.open(zip_file.namelist()[0]) as csvfile: pois = pd.read_csv(csvfile, delimiter=col_sep, error_bad_lines=False) else: pois = pd.read_csv(osmwrangle_file, delimiter=col_sep, error_bad_lines=False) init_poi_size = pois.index.size columns = list(pois) subset_cols = [col_id, col_name, 'kwds', col_lon, col_lat] # Geometry Column(Uncleaned) pois['geometry'] = pois.apply(lambda row: lon_lat_to_point(row, col_lon, col_lat), axis=1) subset_cols.append('geometry') pois['kwds'] = pois[col_cat] + ',' + pois[col_subcat] pois['kwds'] = pois['kwds'].map(lambda s: s.split(',')) # Drop Columns Not in subset Columns. drop_columns = set(columns) - set(subset_cols) pois.drop(drop_columns, inplace=True, axis=1) # Drop all N/A, Null rows from DataFrame. pois.dropna(inplace=True) if init_poi_size - pois.index.size > 0: print("Skipped", (init_poi_size - pois.index.size), "rows due to errors.") pois = pois.rename(columns={col_id: 'id', col_name: 'name'}) pois = gpd.GeoDataFrame(pois, crs=source_crs, geometry=pois['geometry']).drop(columns=[col_lon, col_lat]) # Check whether location filter should be applied if bound is not None: spatial_filter = pois.geometry.intersects(bound) pois = pois[spatial_filter] if target_crs != 'EPSG:4326': target_crs = {'init': target_crs} pois = pois.to_crs(target_crs) print('Loaded ' + str(len(pois.index)) + ' POIs.') return pois
[docs]def retrieve_osm_loc(name, buffer_dist=0): """Retrieves a polygon from an OSM location. Args: name (string): Name of the location to be resolved. buffer_dist (numeric): Buffer distance in meters. Returns: A polygon. """ geom = osmnx.core.gdf_from_place(name, buffer_dist=buffer_dist) if len(geom.index) > 0: geom = geom.iloc[0].geometry else: geom = None return geom
[docs]def to_geojson(gdf, output_file): """Exports a GeoDataFrame to a GeoJSON file. Args: gdf (GeoDataFrame): The GeoDataFrame object to be exported. output_file (string): Path to the output file. """ gdf.to_file(output_file, driver='GeoJSON')