diff --git a/process.py b/process.py index e8d3899..2a05157 100644 --- a/process.py +++ b/process.py @@ -1,4 +1,4 @@ -from dataflows import Flow, load, unpivot, find_replace, set_type, dump_to_path, update_package, update_resource, update_schema, join, join_with_self, add_computed_field, delete_fields, checkpoint, duplicate +from dataflows import Flow, load, unpivot, find_replace, set_type, dump_to_path, update_package, update_resource, update_schema, join, join_with_self, add_computed_field, delete_fields, checkpoint, duplicate, filter_rows BASE_URL = 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/' CONFIRMED = 'time_series_19-covid-Confirmed.csv' @@ -19,6 +19,10 @@ unpivoting_fields = [ extra_keys = [{'name': 'Date', 'type': 'string'} ] extra_value = {'name': 'Case', 'type': 'number'} +def is_key_country(row): + key_countries = ['Chine', 'US', 'United Kingdom', 'Italy', 'France', 'Germany'] + return row['Country'] in key_countries + Flow( load(f'{BASE_URL}{CONFIRMED}'), load(f'{BASE_URL}{RECOVERED}'), @@ -115,7 +119,7 @@ Flow( duplicate( source='time-series-19-covid-combined', target_name='worldwide-aggregated', - target_path='worldwide-aggregated.csv' + target_path='data/worldwide-aggregated.csv' ), join_with_self( resource_name='worldwide-aggregated', @@ -166,6 +170,72 @@ Flow( "type": "integer" } ]), + checkpoint('processed_worldwide_data'), + # Create another resource with countries aggregated + duplicate( + source='time-series-19-covid-combined', + target_name='countries-aggregated', + target_path='data/countries-aggregated.csv' + ), + join_with_self( + resource_name='countries-aggregated', + join_key=['Date', 'Country/Region'], + fields=dict( + Date={ + 'name': 'Date' + }, + Country={ + 'name': 'Country/Region' + }, + Confirmed={ + 'name': 'Confirmed', + 'aggregate': 'sum' + }, + Recovered={ + 'name': 'Recovered', + 'aggregate': 'sum' + }, + Deaths={ + 'name': 'Deaths', + 'aggregate': 'sum' + } + ) + ), + update_schema('countries-aggregated', fields=[ + { + "format": "%Y-%m-%d", + "name": "Date", + "type": "date" + }, + { + "format": "default", + "name": "Country", + "type": "string" + }, + { + "format": "default", + "groupChar": "", + "name": "Confirmed", + "title": "Cumulative total confirmed cases to date", + "type": "integer" + }, + { + "format": "default", + "groupChar": "", + "name": "Recovered", + "title": "Cumulative total recovered cases to date", + "type": "integer" + }, + { + "format": "default", + "groupChar": "", + "name": "Deaths", + "title": "Cumulative total deaths to date", + "type": "integer" + } + ]), + checkpoint('processed_country_data'), + # Prepare data package (name, title) and add views update_package( name='covid-19', title='Novel Coronavirus 2019',