Skip to content

API Reference

create_dashboard_main_page(inference_predictions)

Creates the dashboard page for the Bavarian Forest National Park visitor information. This includes the visitor count, parking, weather, recreation, and other information.

Parameters:

Name Type Description Default
inference_predictions DataFrame

The inference predictions for region-wise visitor counts.

required
Source code in Dashboard.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def create_dashboard_main_page(inference_predictions):

    """
    Creates the dashboard page for the Bavarian Forest National Park visitor information. This includes the visitor count, parking, weather, recreation, and other information.

    Args:
        inference_predictions (pd.DataFrame): The inference predictions for region-wise visitor counts.
    """

    with col1:

        # Display the logo and title of the column
        logo = Image.open("src/streamlit_app/assets/logo-bavarian-forest-national-park.png")
        st.image(logo, width=300)
        st.title(TRANSLATIONS[st.session_state.selected_language]['title'])

        # Get the visitor count section
        visitor_count.get_visitor_counts_section(inference_predictions)

        # get the parking section
        parking.get_parking_section()


    with col2:
        # get the language selection menu
        lang_sel_menu.get_language_selection_menu()

        # get the weather section
        weather.get_weather_section()


        # create recreational section
        recreation.get_recreation_section()

        # Get the other information section
        other_info.get_other_information()

run_training()

Runs the training pipeline. This includes sourcing and preprocessing the data, training the model, and saving the model.

Source code in Dashboard.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def run_training():

    """
    Runs the training pipeline. This includes sourcing and preprocessing the data, training the model, and saving the model.
    """

    # source and preprocess the historic visitor count data
    sourced_visitor_count_df = source_historic_visitor_count()
    processed_visitor_count_df = preprocess_visitor_count_data(sourced_visitor_count_df)

    # source and preprocess the visitor center data
    sourced_vc_data_df = source_visitor_center_data()
    processed_vc_df_hourly,_ = process_visitor_center_data(sourced_vc_data_df)

     # get the weather data for training and inference
    # training data
    train_start_date = datetime(2023, 1, 1)
    train_end_date = datetime(2024, 7, 21)
    weather_data = source_weather_data(start_time=train_start_date, end_time=train_end_date)
    processed_weather_df = process_weather_data(weather_data)

    # join the dataframes
    joined_df = get_joined_dataframe(processed_weather_df, processed_visitor_count_df, processed_vc_df_hourly)

    # Feature engineering: add features such as zscore weather features and nearest holidays
    weather_columns_for_zscores = [ 'Temperature (°C)','Relative Humidity (%)','Wind Speed (km/h)']
    with_zscores_and_nearest_holidays_df = get_zscores_and_nearest_holidays(joined_df, weather_columns_for_zscores)

    # get the features for training
    feature_df = get_features(with_zscores_and_nearest_holidays_df,train_start_date, train_end_date)

    # train the model
    train_regressor(feature_df)

get_latest_parking_data_and_visualize_it()

Display the parking section of the dashboard with a map showing the real-time parking occupancy and interactive metrics with actual numbers of visitors. It will update every 15 minutes.

Source code in pages/Admin_🔓.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@st.fragment(run_every="15min")
def get_latest_parking_data_and_visualize_it():

    """
    Display the parking section of the dashboard with a map showing the real-time parking occupancy 
    and interactive metrics with actual numbers of visitors. It will update every 15 minutes.
    """
    print("Rendering parking section for the visitor dashboard...")

    def get_current_15min_interval():
        """
        Get the current 15-minute interval in the format "HH:MM:00".

        Returns:
            str: The current 15-minute interval in the format "HH:MM:00".
        """
        current_time = datetime.now(pytz.timezone('Europe/Berlin'))
        minutes = (current_time.minute // 15) * 15

        # Replace the minute value with the truncated value and set seconds and microseconds to 0
        timestamp_latest_parking_data_fetch = current_time.replace(minute=minutes, second=0, microsecond=0)

        # If you want to format it as a string in the "%Y-%m-%d %H:%M:%S" format
        timestamp_latest_parking_data_fetch_str = timestamp_latest_parking_data_fetch.strftime("%Y-%m-%d %H:%M:%S")

        return timestamp_latest_parking_data_fetch_str

    timestamp_latest_parking_data_fetch = get_current_15min_interval()

    # Source and preprocess the parking data
    processed_parking_data = source_and_preprocess_realtime_parking_data(timestamp_latest_parking_data_fetch)

    get_parking_section(processed_parking_data)

get_visitor_predictions_section()

Build the visitor predictions section by running/loading the inference pipeline and displaying the predictions in actual number of visitors.

Source code in pages/Admin_🔓.py
23
24
25
26
27
28
29
30
31
32
def get_visitor_predictions_section():
    """
    Build the visitor predictions section by running/loading the inference pipeline and displaying the predictions in actual number of visitors.
    """

    preprocessed_hourly_visitor_center_data = source_preprocessed_hourly_visitor_center_data()

    inference_predictions = run_inference(preprocessed_hourly_visitor_center_data)

    visitor_prediction_graph(inference_predictions)

add_spatial_info_to_parking_sensors(parking_data_df)

Add spatial information to the parking dataframe.

Parameters:

Name Type Description Default
parking_data_df DataFrame

DataFrame containing parking sensor data (occupancy, capacity, occupancy rate).

required

Returns:

Name Type Description
parking_data_df DataFrame

DataFrame containing parking sensor data with spatial information.

Source code in src/streamlit_app/source_data.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def add_spatial_info_to_parking_sensors(parking_data_df):

    """
    Add spatial information to the parking dataframe.

    Args:
        parking_data_df (pd.DataFrame): DataFrame containing parking sensor data (occupancy, capacity, occupancy rate).

    Returns:
        parking_data_df (pd.DataFrame): DataFrame containing parking sensor data with spatial information.
    """

    for location_slug in parking_sensors.keys():
        if location_slug in parking_data_df['location'].values:
            parking_data_df['latitude'] = parking_sensors[location_slug][1][0]
            parking_data_df['longitude'] = parking_sensors[location_slug][1][1]

            return parking_data_df

merge_all_df_from_list(df_list)

Merge all the dataframes in the list into a single dataframe.

Parameters:

Name Type Description Default
df_list list

A list of pandas DataFrames to merge.

required

Returns:

Name Type Description
merged_dataframe DataFrame

The merged DataFrame.

Source code in src/streamlit_app/source_data.py
123
124
125
126
127
128
129
130
131
132
133
134
135
def merge_all_df_from_list(df_list):
    """
    Merge all the dataframes in the list into a single dataframe.

    Args:
        df_list (list): A list of pandas DataFrames to merge.

    Returns:
        merged_dataframe (pd.DataFrame): The merged DataFrame.
    """
    # Merge all the dataframes in the list with the 'time' column as index
    merged_dataframe = pd.concat(df_list, axis=0, ignore_index=True)
    return merged_dataframe

source_and_preprocess_forecasted_weather_data(timestamp_latest_weather_data_fetch)

Source and preprocess the forecasted weather data for the Bavarian Forest National Park.

Parameters:

Name Type Description Default
timestamp_latest_weather_data_fetch datetime

The timestamp of the latest weather data fetch.

required

Returns:

Name Type Description
sourced_and_preprocessed_weather_data DataFrame

Processed forecasted weather dataframe

Source code in src/streamlit_app/source_data.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
@st.cache_data(max_entries=1)
def source_and_preprocess_forecasted_weather_data(timestamp_latest_weather_data_fetch: datetime):

    """
    Source and preprocess the forecasted weather data for the Bavarian Forest National Park.

    Args:
        timestamp_latest_weather_data_fetch (datetime): The timestamp of the latest weather data fetch.

    Returns:
        sourced_and_preprocessed_weather_data (pd.DataFrame): Processed forecasted weather dataframe
    """

    print(f"Sourcing and preprocessing weather data from Meteostat API at {timestamp_latest_weather_data_fetch}...")

    # Source the weather data
    weather_data_df = source_weather_data(timestamp_latest_weather_data_fetch)

    # Preprocess the weather data
    sourced_and_preprocessed_weather_data = prfwd.process_weather_data(weather_data_df)

    return sourced_and_preprocessed_weather_data

source_and_preprocess_realtime_parking_data(current_timestamp)

Source and preprocess the real-time parking data. Returns the timestamp of when the function was run.

Parameters:

Name Type Description Default
current_timestamp datetime

The timestamp of when the function was run.

required

Returns:

Name Type Description
processed_parking_data DataFram

Preprocessed real-time parking data.

Source code in src/streamlit_app/source_data.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
@st.cache_data(max_entries=1)
def source_and_preprocess_realtime_parking_data(current_timestamp):

    """
    Source and preprocess the real-time parking data. Returns the timestamp of when the function was run.

    Args:
        current_timestamp (datetime): The timestamp of when the function was run.

    Returns:
        processed_parking_data (pd.DataFram): Preprocessed real-time parking data.
    """
    print(f"Fetching and saving real-time parking occupancy data at '{current_timestamp}'...")

    # Source the parking data from bayern cloud
    all_parking_dataframes = []
    for location_slug in parking_sensors.keys():
        print(f"Fetching and saving real-time occupancy data for location '{location_slug}'...")
        parking_df = source_parking_data_from_cloud(location_slug)
        all_parking_dataframes.append(parking_df)

    all_parking_data = merge_all_df_from_list(all_parking_dataframes)

    print("Parking data sourced successfully!")

    # Preprocess the parking data
    processed_parking_data = prtpd.process_real_time_parking_data(all_parking_data)

    print("Parking data processed and cleaned!")

    # Return the timestamp in German time indicating the time zone Berlin

    print(f"Parking data processed and cleaned at {current_timestamp}, Europe/Berlin time.")

    st.write(f"{TRANSLATIONS[st.session_state.selected_language]['parking_data_last_updated']} {current_timestamp}")

    return processed_parking_data

source_parking_data_from_cloud(location_slug)

Sources the current occupancy data from the Bayern Cloud API.

Parameters:

Name Type Description Default
location_slug str

The location slug of the parking sensor.

required

Returns:

Name Type Description
parking_df_with_spatial_info DataFrame

A DataFrame containing the current occupancy data, occupancy rate, capacity and spatial coordinates.

Source code in src/streamlit_app/source_data.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def source_parking_data_from_cloud(location_slug: str) -> pd.DataFrame:
    """Sources the current occupancy data from the Bayern Cloud API.

    Args:
        location_slug (str): The location slug of the parking sensor.

    Returns:
        parking_df_with_spatial_info (pd.DataFrame): A DataFrame containing the current occupancy data, occupancy rate, capacity and spatial coordinates.
    """

    API_endpoint = f'https://data.bayerncloud.digital/api/v4/endpoints/list_occupancy/{location_slug}'

    request_params = {
        'token': BAYERN_CLOUD_API_KEY
    }


    response = requests.get(API_endpoint, params=request_params)
    response_json = response.json()

    # Access the first item in the @graph list
    graph_item = response_json["@graph"][0]

    # Extract the current occupancy and capacity
    current_occupancy = graph_item.get("dcls:currentOccupancy", None)
    current_capacity = graph_item.get("dcls:currentCapacity", None)
    current_occupancy_rate = graph_item.get("dcls:currentOccupancyRate", None)

    # Make a dataframe with the three values and the current time stamp in the datetime format
    parking_data = pd.DataFrame({
        "timestamp": datetime.now(), 
        "location" : [location_slug],
        "current_occupancy": [current_occupancy],
        "current_capacity": [current_capacity],
        "current_occupancy_rate": [current_occupancy_rate],
    })

    parking_data.reset_index(drop=True, inplace=True)

    # adding spatial information to the dataframe
    parking_df_with_spatial_info = add_spatial_info_to_parking_sensors(parking_data)

    return parking_df_with_spatial_info

source_weather_data(start_time)

Source forecasted weather data from the Meteostat API for the Bavarian Forest National Park in the next 7 days in hourly intervals.

Parameters:

Name Type Description Default
start_time datetime

The start time of the weather data.

required

Returns:

Name Type Description
weather_hourly DataFrame

Hourly weather data for the Bavarian Forest National Park for the next 7 days

Source code in src/streamlit_app/source_data.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def source_weather_data(start_time: datetime):
    """
    Source forecasted weather data from the Meteostat API for the Bavarian Forest National Park in the next 7 days in hourly intervals.

    Args:
        start_time (datetime): The start time of the weather data.

    Returns:
        weather_hourly (pd.DataFrame): Hourly weather data for the Bavarian Forest National Park for the next 7 days
    """

    # Create a Point object for the Bavarian Forest National Park entry
    bavarian_forest = Point(lat=LATITUDE, lon=LONGITUDE)

    # Convert start_time to datetime format in utc
    start_time = start_time.astimezone(pytz.UTC).replace(tzinfo=None)

    # Add 7 days to start_time
    end_time = start_time + timedelta(days=7)

    # Fetch hourly data for the location
    weather_hourly = get_hourly_data(bavarian_forest, start_time, end_time)

    # Drop unnecessary columns
    weather_hourly = weather_hourly.drop(columns=['dwpt', 'snow', 'wdir', 'wpgt', 'pres', 'coco','prcp', 'tsun'])

    # Convert the 'Time' column to datetime format again in Europe/Berlin time
    weather_hourly['time'] = pd.to_datetime(weather_hourly['time'], utc=True).dt.tz_convert('Europe/Berlin')
    return weather_hourly

fill_missing_values(data, parameters)

Fill missing values in the weather data using linear interpolation or zero values.

Parameters:

Name Type Description Default
data DataFrame

Processed hourly weather data.

required
parameters list

List of column names to process.

required

Returns:

Name Type Description
data DataFrame

DataFrame with missing values filled.

Source code in src/streamlit_app/pre_processing/process_forecast_weather_data.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def fill_missing_values(data, parameters):
    """
    Fill missing values in the weather data using linear interpolation or zero values.

    Args:
        data (pd.DataFrame): Processed hourly weather data.
        parameters (list): List of column names to process.

    Returns:
        data (pd.DataFrame): DataFrame with missing values filled.
    """
    total_rows = data.shape[0]

    for parameter in parameters:
        # Calculate missing values and their percentage
        missing_values = data[parameter].isnull().sum()
        missing_percentage = (missing_values / total_rows) * 100

        # Calculate zero values and their percentage
        zero_values = data[parameter].eq(0).sum()
        zero_percentage = (zero_values / total_rows) * 100

        # Check for missing values in the 'Time' column
        if parameter == 'Time' and missing_values > 0:
            print(f'Missing values in Time column: {missing_percentage:.2f}%')
            print('Please check the missing values in the Time column')
            exit()

        if missing_values == 0:
            print(f'No missing values in {parameter} column')
        else:
            print(f'Missing values in {parameter} column: {missing_percentage:.2f}%')

            if zero_percentage > 60:
                # Fill missing values with 0.0 if zero values are significant
                print(f'Zero values in {parameter} column: {zero_percentage:.2f}%')
                data[parameter].fillna(0.0, inplace=True)
                print(f'Missing values in {parameter} column filled with 0.0')
            else:
                # Use linear interpolation to fill missing values
                data[parameter].interpolate(method='linear', inplace=True)
                # Round the interpolated values to 2 decimal places
                data[parameter] = data[parameter].round(2)
                print(f'Missing values in {parameter} column filled using linear interpolation')

    return data

process_weather_data(weather_data_df)

Process the hourly weather data by filling missing values.

Parameters:

Name Type Description Default
weather_data_df DataFrame

Hourly weather data.

required

Returns:

Name Type Description
imputed_data DataFrame

Processed weather data with missing values filled.

Source code in src/streamlit_app/pre_processing/process_forecast_weather_data.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def process_weather_data(weather_data_df):
    """
    Process the hourly weather data by filling missing values.

    Args:
        weather_data_df (pandas.DataFrame): Hourly weather data.

    Returns:
        imputed_data (pandas.DataFrame): Processed weather data with missing values filled.
    """


    # Get the list of columns to process
    parameters = weather_data_df.columns.to_list()

    print(f'Processing weather data with the following columns: {parameters}')

    # Fill missing values in the weather data
    imputed_data = fill_missing_values(weather_data_df, parameters)

    return imputed_data

impute_missing_data(all_parking_data)

Impute missing values in the parking data.

Parameters:

Name Type Description Default
all_parking_data DataFrame

Raw parking data.

required

Returns:

Name Type Description
all_parking_data DataFrame

Processed parking data.

Source code in src/streamlit_app/pre_processing/process_real_time_parking_data.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def impute_missing_data(all_parking_data):
    """
    Impute missing values in the parking data.

    Args:
        all_parking_data (pandas.DataFrame): Raw parking data.

    Returns:
        all_parking_data (pandas.DataFrame): Processed parking data.
    """

    # Fill missing values in the parking data

    # if there are null values in the capacity column, fill them with the with 40 (it is the lowest capacity value from all the sensors)
    all_parking_data['current_capacity'].fillna(40, inplace=True)

    # if there are null values in the occupancy column, fill them with the corresponding capacity value
    all_parking_data['current_occupancy'].fillna(all_parking_data['current_capacity'], inplace=True)

    # if there are null values in the occupancy rate column, fill them with the occupancy divided by the capacity   
    all_parking_data['current_occupancy_rate'].fillna(all_parking_data['current_occupancy']/all_parking_data['current_capacity'], inplace=True)


    # Convert to data type int

    all_parking_data['current_capacity'] = all_parking_data['current_capacity'].astype(int)
    all_parking_data['current_occupancy'] = all_parking_data['current_occupancy'].astype(int)

    # add a column called 'current availability' that is the difference between the capacity and the occupancy

    all_parking_data['current_availability'] = all_parking_data['current_capacity'] - all_parking_data['current_occupancy']

    return all_parking_data

process_real_time_parking_data(parking_data_df)

Process the real-time parking data by imputing missing values.

Parameters:

Name Type Description Default
parking_data_df DataFrame

Raw real-time parking data.

required

Returns:

Name Type Description
clean_parking_data DataFrame

Processed real-time parking data.

Source code in src/streamlit_app/pre_processing/process_real_time_parking_data.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def process_real_time_parking_data(parking_data_df):
    """
    Process the real-time parking data by imputing missing values.

    Args:
        parking_data_df (pandas.DataFrame): Raw real-time parking data.

    Returns:
        clean_parking_data (pandas.DataFrame): Processed real-time parking data.
    """


    clean_parking_data = impute_missing_data(parking_data_df)

    print("Parking data processed and cleaned!")

    return clean_parking_data

convert_sensor_dictionary_to_excel_file(sensor_dict, output_file_path)

Convert sensor dictionary to a Pandas Dataframe and save it as an Excel file.

Info: This function is not used as of now, but might be useful in the future for handling changes to the sensor configuration.

Parameters:

Name Type Description Default
sensor_dict dict

A dictionary containing sensor data.

required
output_file_path str

The path to the output Excel file.

required

Returns:

Type Description
None

None

Source code in src/streamlit_app/pre_processing/data_quality_check.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def convert_sensor_dictionary_to_excel_file(
        sensor_dict: dict,
        output_file_path: str) -> None:
    """
    Convert sensor dictionary to a Pandas Dataframe and save it as an Excel file.

    Info: This function is not used as of now, but might be useful in the future for handling changes to the sensor configuration.

    Args:
        sensor_dict (dict): A dictionary containing sensor data.
        output_file_path (str): The path to the output Excel file.

    Returns:
        None
    """
    regions = []
    sensor_names = []
    sensor_directions = []
    possible_sensor_ids = []

    for region, sensors in sensor_dict.items():
        for sensor_name, directions in sensors.items():
            for direction, sensor_ids in directions.items():
                regions.append(region)
                sensor_names.append(sensor_name)
                sensor_directions.append(direction)
                # Join the list into a comma-separated string
                possible_sensor_ids.append(",".join(sensor_ids))

    df = pd.DataFrame({
        "region": regions,
        "sensor_name": sensor_names,
        "sensor_direction": sensor_directions,
        "possible_sensor_ids": possible_sensor_ids
    })

    df.to_excel(output_file_path, index=False)

convert_sensor_excel_file_to_dictionary(sensor_file_path)

Convert Excel file containing sensor configuration data to a dictionary.

Info: This function is not used as of now, but might be useful in the future for handling changes to the sensor configuration.

Parameters:

Name Type Description Default
sensor_file_path str

The path to the Excel file.

required

Returns:

Name Type Description
dict dict

A dictionary containing sensor configuration.

Source code in src/streamlit_app/pre_processing/data_quality_check.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def convert_sensor_excel_file_to_dictionary(
        sensor_file_path: str) -> dict:
    """
    Convert Excel file containing sensor configuration data to a dictionary.

    Info: This function is not used as of now, but might be useful in the future for handling changes to the sensor configuration.

    Args:
        sensor_file_path (str): The path to the Excel file.

    Returns:
        dict: A dictionary containing sensor configuration.
    """
    df = pd.read_excel(sensor_file_path)

    sensor_dict = {}

    for index, row in df.iterrows():
        region = row["region"]
        sensor_name = row["sensor_name"]
        sensor_direction = row["sensor_direction"]
        possible_sensor_ids = row["possible_sensor_ids"].split(",")

        if region not in sensor_dict:
            sensor_dict[region] = {}

        if sensor_name not in sensor_dict[region]:
            sensor_dict[region][sensor_name] = {}

        if sensor_direction not in sensor_dict[region][sensor_name]:
            sensor_dict[region][sensor_name][sensor_direction] = []

        sensor_dict[region][sensor_name][sensor_direction] = possible_sensor_ids

    return sensor_dict

int_for_all_counts(df)

Convert all numeric columns in the DataFrame to integer type. Round float values that are not integers, and replace NaN values with 0 to allow conversion to integers.

Source code in src/streamlit_app/pre_processing/data_quality_check.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def int_for_all_counts(df):
    """
    Convert all numeric columns in the DataFrame to integer type. Round float values that are not integers,
    and replace NaN values with 0 to allow conversion to integers.
    """
    # Loop through each column in the dataframe
    for column in df.columns:
        # Check if the column is of float type
        if df[column].dtype == "float64":
            # Apply the transformation only to non-NaN values
            df[column] = df[column].apply(lambda x: int(x) if pd.notna(x) and x.is_integer() else (round(x) if pd.notna(x) else np.nan))
            # Replace NaN values with 0 or another placeholder, then convert to integer
            df[column] = df[column].fillna(0).astype('int64')
        elif df[column].dtype == "object":
            pass

    return df

parse_german_dates(df, date_column_name)

Parses German dates in the specified date column of the DataFrame using regex, including hours and minutes if available.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame containing the date column.

required
date_column_name str

The name of the date column.

required

Returns:

Type Description
DataFrame

pd.DataFrame: The DataFrame with parsed German dates.

Source code in src/streamlit_app/pre_processing/data_quality_check.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def parse_german_dates(
    df: pd.DataFrame,
    date_column_name: str
) -> pd.DataFrame:
    """
    Parses German dates in the specified date column of the DataFrame using regex,
    including hours and minutes if available.

    Args:
        df (pd.DataFrame): The DataFrame containing the date column.
        date_column_name (str): The name of the date column.

    Returns:
        pd.DataFrame: The DataFrame with parsed German dates.
    """

    # Define a mapping of German month names to their numeric values
    month_map = {
        "Jan.": "01",
        "Feb.": "02",
        "März": "03",
        "Apr.": "04",
        "Mai": "05",
        "Juni": "06",
        "Juli": "07",
        "Aug.": "08",
        "Sep.": "09",
        "Okt.": "10",
        "Nov.": "11",
        "Dez.": "12"
    }

    # Create a regex pattern for replacing months and capturing time
    pattern = re.compile(r'(\d{1,2})\.\s*(' + '|'.join(month_map.keys()) + r')\s*(\d{4})\s*(\d{2}):(\d{2})')

    # Function to replace the month in the matched string and keep the time part
    def replace_month(match):
        day = match.group(1)
        month = month_map[match.group(2)]
        year = match.group(3)
        hour = match.group(4)
        minute = match.group(5)
        return f"{year}-{month}-{day} {hour}:{minute}:00"

    # Apply regex replacement and convert to datetime
        # Apply regex replacement and convert to datetime, only for string values
    df[date_column_name] = df[date_column_name].apply(
        lambda x: replace_month(pattern.search(str(x))) if isinstance(x, str) and pattern.search(str(x)) else x
    )
    df[date_column_name] = pd.to_datetime(df[date_column_name], errors='coerce')

    return df

calculate_color(occupancy_rate)

Calculate the color of the marker based on the occupancy rate.

Parameters:

Name Type Description Default
occupancy_rate float

The occupancy rate of the parking section.

required

Returns:

Name Type Description
list

A list of RGB values representing the color of the marker.

Source code in src/streamlit_app/pages_in_dashboard/admin/parking.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def calculate_color(occupancy_rate):
    """
    Calculate the color of the marker based on the occupancy rate.

    Args:
        occupancy_rate (float): The occupancy rate of the parking section.

    Returns:
        list: A list of RGB values representing the color of the marker.
    """
    occupancy_rate = float(occupancy_rate)

    if occupancy_rate >= 80:
        return [230, 39, 39] #red

    elif occupancy_rate >= 60:
        return [244, 251, 81] #yellow

    else:
        return [109, 249, 2] #green

get_fixed_size()

Get a fixed size value for the map markers.

Source code in src/streamlit_app/pages_in_dashboard/admin/parking.py
 8
 9
10
11
12
def get_fixed_size():
    """
    Get a fixed size value for the map markers.
    """
    return 300  

get_parking_section(processed_parking_data)

Display the parking section of the dashboard with a map showing the real-time parking occupancy and interactive metrics.

Parameters:

Name Type Description Default
processed_parking_data DataFrame

Processed parking data.

required

Returns:

Type Description

None

Source code in src/streamlit_app/pages_in_dashboard/admin/parking.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def get_parking_section(
        processed_parking_data):
    """

    Display the parking section of the dashboard with a map showing the real-time parking occupancy 
    and interactive metrics.

    Args:
        processed_parking_data (pd.DataFrame): Processed parking data.

    Returns:
        None
    """
    st.markdown(f"### {TRANSLATIONS[st.session_state.selected_language]['real_time_parking_occupancy']}")

    # Set a fixed size for all markers
    processed_parking_data['size'] = get_fixed_size()
    processed_parking_data['color'] = processed_parking_data['current_occupancy_rate'].apply(calculate_color)

    processed_parking_data['current_occupancy_rate'] = pd.to_numeric(processed_parking_data['current_occupancy_rate'], errors='coerce')  # Convert to float
    processed_parking_data['current_occupancy_rate'] = processed_parking_data['current_occupancy_rate'].apply(lambda x: f"{x:.2f}" if pd.notna(x) else "N/A")


    # Calculate center of the map based on the average of latitudes and longitudes
    avg_latitude = processed_parking_data['latitude'].mean()
    avg_longitude = processed_parking_data['longitude'].mean()

    # PyDeck Map Configuration with adjusted view_state
    view_state = pdk.ViewState(
        latitude=avg_latitude,  # Center map at the average latitude
        longitude=avg_longitude,  # Center map at the average longitude
        zoom=10,  # Zoom level increased for a closer view
        pitch=50
    )

    layer = pdk.Layer(
        "ScatterplotLayer",
        data=processed_parking_data,
        get_position=["longitude", "latitude"],
        get_radius="size",
        get_fill_color="color",
        pickable=True
    )

    deck = pdk.Deck(
        layers=[layer],
        initial_view_state=view_state,
        tooltip={
            "text": "{location}\n" + f"{TRANSLATIONS[st.session_state.selected_language]['available_spaces']}: " + "{current_availability} 🚗\n" + f"{TRANSLATIONS[st.session_state.selected_language]['occupancy_rate']}: " + "{current_occupancy_rate}%"
        },  # Updated tooltip text with two decimal points for occupancy rate
        map_style="road"
    )
    st.pydeck_chart(deck)

    # Interactive Metrics
    selected_location = st.selectbox(
        TRANSLATIONS[st.session_state.selected_language]['select_parking_section'], 
        processed_parking_data['location'].unique(),
        key="selectbox_parking_section"
    )

    # Display selected location details
    if selected_location:
        selected_data = processed_parking_data[processed_parking_data['location'] == selected_location].iloc[0]

        col1, col2, col3 = st.columns(3)
        col1.metric(label=TRANSLATIONS[st.session_state.selected_language]['available_spaces'], value=f"{selected_data['current_availability']} 🚗")
        col2.metric(label=TRANSLATIONS[st.session_state.selected_language]['capacity'], value=f"{selected_data['current_capacity']} 🚗")
        col3.metric(label=TRANSLATIONS[st.session_state.selected_language]['occupancy_rate'], value=f"{selected_data['current_occupancy_rate']}%")

check_password()

Returns True if the user had the correct password.

Source code in src/streamlit_app/pages_in_dashboard/admin/password.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def check_password():
    """Returns `True` if the user had the correct password."""

    def password_entered():
        """Checks whether a password entered by the user is correct."""
        if hmac.compare_digest(st.session_state["password"], st.secrets["password"]):
            st.session_state["password_correct"] = True
            del st.session_state["password"]  # Don't store the password.
        else:
            st.session_state["password_correct"] = False

    # Return True if the password is validated.
    if st.session_state.get("password_correct", False):
        return True

    # Show input for password.
    st.text_input(
        "Password", type="password", on_change=password_entered, key="password"
    )
    if "password_correct" in st.session_state:
        st.error("😕 Password incorrect")
    return False

visitor_prediction_graph(inference_predictions)

Get the visitor counts section with the highest occupancy rate.

Returns:

Type Description

None

Source code in src/streamlit_app/pages_in_dashboard/admin/visitor_count.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@st.fragment
def visitor_prediction_graph(inference_predictions):
    """
    Get the visitor counts section with the highest occupancy rate.

    Args:
        None

    Returns:
        None
    """
    st.markdown(f"## {TRANSLATIONS[st.session_state.selected_language]['visitor_counts_forecasted']}")

    # do a dropdown for the all_preds
    regions_to_select = list(regions.keys())
    selected_region = st.selectbox(TRANSLATIONS[st.session_state.selected_language]['select_region'], regions_to_select)

    if selected_region:

        predictions_per_region = regions[selected_region]
        further_columns_to_show = ["Time", "day_date"]

        # Filter the DataFrame based on the selected region
        selected_region_predictions = inference_predictions[predictions_per_region + further_columns_to_show]

        # Get unique values for the day and date list
        days_list = selected_region_predictions['day_date'].unique()

        # Add a note that this is forecasted data
        st.markdown(f":green[*{TRANSLATIONS[st.session_state.selected_language]['forecasted_visitor_data']}*].")

        # Create a layout for the radio button and chart
        col1, _ = st.columns([1, 3])

        with col1:
            # Get radio button for selecting the day
            day_selected = st.radio(
                label=TRANSLATIONS[st.session_state.selected_language]['select_day'], options=days_list, index=0
            )

        # Extract the selected day for filtering (using date)
        day_df = selected_region_predictions[selected_region_predictions['day_date'] == day_selected]

        # Plot an interactive bar chart for relative traffic
        fig1 = px.bar(
            day_df,
            x='Time',  
            y=predictions_per_region,
            barmode='group',
            labels={f'traffic_{selected_region}': '', 'Time': 'Hour of Day'},
            title=f"{TRANSLATIONS[st.session_state.selected_language]['visitor_foot_traffic_for_day']} - {day_selected}",
            color_discrete_map={'red': 'red', 'blue': 'blue', 'green': 'green'}
        )

        # Customize hover text for relative traffic
        fig1.update_traces(
            hovertemplate=(
                'Traffic: %{y}<br>'  # Display the traffic value
                'Hour: %{x|%H:%M}<br>'  # Display the hour in HH:MM format
            )
        )

        weekly_max_value_per_region = selected_region_predictions[predictions_per_region].max().max()

        # Update layout for relative traffic chart
        fig1.update_yaxes(range=[0, weekly_max_value_per_region])  # Set y-axis to range from 0 to the max traffic value of the forecasted week for a region
        fig1.update_xaxes(showticklabels=True)  # Keep the x-axis tick labels visible

        fig1.update_layout(
            xaxis_title=None,  # Hide the x-axis title
            yaxis_title=None,  # Hide the y-axis title
            template='plotly_dark',
            legend_title_text=TRANSLATIONS[st.session_state.selected_language]['visitor_foot_traffic'],
            legend=dict(
                itemsizing='constant',
                traceorder="normal",
                font=dict(size=12),
                orientation="h",
                yanchor="top",
                y=-0.3,  # Position the legend below the chart
                xanchor="center",
                x=0.5  # Center the legend horizontally
            ),
            xaxis=dict(
                tickformat='%H:%M'
            )
        )

        # Display the interactive bar chart for relative traffic below the radio button
        st.plotly_chart(fig1)

convert_number_to_month_name(month)

Convert a month number (1-12) to its corresponding month name.

Parameters:

Name Type Description Default
month int

The month number.

required

Returns:

Name Type Description
str

The name of the month.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/data_retrieval.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def convert_number_to_month_name(month):

    """
    Convert a month number (1-12) to its corresponding month name.

    Parameters:
        month (int): The month number.

    Returns:
        str: The name of the month.
    """

    month_dict = {1: 'January', 2: 'February', 3: 'March', 4: 'April',
                   5: 'May', 6: 'June', 7: 'July', 8: 'August', 
                   9: 'September', 10: 'October', 11: 'November', 12: 'December'}

    return month_dict[month]

create_temporal_columns(df)

Create temporal columns from the DataFrame index.

This function takes a DataFrame with a datetime index and creates additional columns for month names, year, and season based on the index.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame with a datetime index.

required

Returns:

Type Description

pd.DataFrame: The original DataFrame with added columns: - 'month': The name of the month corresponding to the index. - 'year': The year extracted from the index. - 'season': The name of the season corresponding to the index.

Raises:

Type Description
ValueError

If the index of the DataFrame cannot be converted to datetime.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/data_retrieval.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def create_temporal_columns(df):

    """Create temporal columns from the DataFrame index.

    This function takes a DataFrame with a datetime index and creates additional
    columns for month names, year, and season based on the index.

    Args:
        df (pd.DataFrame): The input DataFrame with a datetime index.

    Returns:
        pd.DataFrame: The original DataFrame with added columns:
            - 'month': The name of the month corresponding to the index.
            - 'year': The year extracted from the index.
            - 'season': The name of the season corresponding to the index.

    Raises:
        ValueError: If the index of the DataFrame cannot be converted to datetime.
    """

    df.index = pd.to_datetime(df.index)

    # make a new column called 'month' from the index
    df['month'] = df.index.month

    # convert the numbers in the months column to the month names from the
    df['month'] = df['month'].apply(convert_number_to_month_name)

    # make a new column called 'year' from the index
    df['year'] = df.index.year

    # make a new column called 'season' from the index
    df['season'] = (df.index.month%12 + 3)//3
    # convert the numbers in the season column to the season names

    df['season'] = df['season'].apply(convert_number_to_season_name)

    return df

extract_values_according_to_type(selected_query, type)

Extract values from a query string based on the specified query type.

This function uses regular expressions to extract relevant values from the selected_query string according to the specified type. The extracted values may include properties, sensors, dates, months, seasons, and years, depending on the type.

Parameters:

Name Type Description Default
selected_query str

The selected query string from which to extract values.

required
type str

The type of the query. Options include: - 'type1': Query for date range. - 'type2': Query for month and year. - 'type3': Query for season and year. - 'type4': Query for date range (weather category). - 'type5': Query for month and year (weather category). - 'type6': Query for season and year (weather category).

required

Returns:

Name Type Description
dict

A dictionary containing the extracted values, where keys are based on the field names specified in query_types, including: - 'property' - 'sensor' - 'month' - 'season' - 'year' - 'start_date' - 'end_date'

Raises:

Type Description
AttributeError

If the expected regex match is not found in the selected_query.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/data_retrieval.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def extract_values_according_to_type(selected_query,type):
    """Extract values from a query string based on the specified query type.

    This function uses regular expressions to extract relevant values from the 
    `selected_query` string according to the specified `type`. The extracted values
    may include properties, sensors, dates, months, seasons, and years, depending on the type.

    Args:
        selected_query (str): The selected query string from which to extract values.
        type (str): The type of the query. Options include:
            - 'type1': Query for date range.
            - 'type2': Query for month and year.
            - 'type3': Query for season and year.
            - 'type4': Query for date range (weather category).
            - 'type5': Query for month and year (weather category).
            - 'type6': Query for season and year (weather category).

    Returns:
        dict: A dictionary containing the extracted values, where keys are based on
              the field names specified in `query_types`, including:
              - 'property'
              - 'sensor'
              - 'month'
              - 'season'
              - 'year'
              - 'start_date'
              - 'end_date'

    Raises:
        AttributeError: If the expected regex match is not found in the selected_query.
    """

    if selected_query == None:
        return None

    if type == 'type1':
        property = re.search(r'What is the (.+?) value', selected_query).group(1)
        sensor = re.search(r'for the sensor (.+?) from', selected_query).group(1)
        start_date = re.search(r'from (.+?) to', selected_query).group(1)
        end_date = re.search(r'to (.+?)\?', selected_query).group(1)
        extracted_values = [property, sensor, start_date, end_date]
    elif type == 'type2':
        property = re.search(r'What is the (.+?) value', selected_query).group(1)
        sensor = re.search(r'for the sensor (.+?) for the month of', selected_query).group(1)
        month = re.search(r'for the month of (.+?) ', selected_query).group(1)
        year = re.search(r'for the year (.+?)\?', selected_query).group(1)
        extracted_values = [property, sensor]

    elif type == 'type3':
        property = re.search(r'What is the (.+?) value', selected_query).group(1)
        sensor = re.search(r'for the sensor (.+?) for the season of', selected_query).group(1)
        season = re.search(r'for the season of (.+?) for', selected_query).group(1)
        year = re.search(r'for the year (.+?)\?', selected_query).group(1)
        extracted_values = [property, sensor]

    elif type == 'type4':
        property = re.search(r'What is the (.+?) value', selected_query).group(1)
        start_date = re.search(r'from (.+?) to', selected_query).group(1)
        end_date = re.search(r'to (.+?)\?', selected_query).group(1)
        extracted_values = [property, start_date, end_date]

    elif type == 'type5':
        property = re.search(r'What is the (.+?) value', selected_query).group(1)
        month = re.search(r'for the month of (.+?) for the year', selected_query).group(1)
        year = re.search(r'for the year (.+?)\?', selected_query).group(1)
        extracted_values = [property]

    elif type == 'type6':
        property = re.search(r'What is the (.+?) value', selected_query).group(1)
        season = re.search(r'for the season of (.+?) for the year', selected_query).group(1)
        year = re.search(r'for the year (.+?)\?', selected_query).group(1)
        extracted_values = [property]


    # Get the structure from query_types for 'type1'
    type_struc = query_types[type][1] 

    # Map the extracted values to the keys in the structure
    result = {key: value for key, value in zip(type_struc, extracted_values)}

    return result

get_data_from_query(selected_category, selected_query, selected_query_type, start_date, end_date, selected_sensors)

Retrieve data based on the selected category and query.

This function extracts values from the provided query, retrieves data from AWS based on the selected category, processes the data, and returns a DataFrame containing the queried information.

Parameters:

Name Type Description Default
selected_category str

The category of data to retrieve. Options include: - 'visitor_sensors' - 'parking' - 'weather' - 'visitor_centers'

required
selected_query str

The query string used to extract specific values.

required
selected_query_type str

The type of the query, which determines the format of the expected values.

required

Returns:

Type Description

pd.DataFrame: A DataFrame containing the filtered data based on the query.

Raises:

Type Description
ValueError

If the selected category is not recognized.

KeyError

If the expected values are not found in the query.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/data_retrieval.py
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
@st.cache_data(max_entries=1)
def get_data_from_query(selected_category,selected_query,selected_query_type, start_date, end_date, selected_sensors):

    """Retrieve data based on the selected category and query.

    This function extracts values from the provided query, retrieves data from
    AWS based on the selected category, processes the data, and returns a DataFrame
    containing the queried information.

    Args:
        selected_category (str): The category of data to retrieve. Options include:
            - 'visitor_sensors'
            - 'parking'
            - 'weather'
            - 'visitor_centers'
        selected_query (str): The query string used to extract specific values.
        selected_query_type (str): The type of the query, which determines the 
            format of the expected values.

    Returns:
        pd.DataFrame: A DataFrame containing the filtered data based on the query.

    Raises:
        ValueError: If the selected category is not recognized.
        KeyError: If the expected values are not found in the query.
    """

    if selected_category == 'visitor_sensors':
        sensor_df = get_sensors_data()
        sensor_df = sensor_df.set_index('Time') 
        processed_category_df = create_temporal_columns(sensor_df)

    if selected_category == 'parking':
        category_df = get_parking_data_for_selected_sensor(selected_sensors)
        processed_category_df = create_temporal_columns(category_df)

    if selected_category == 'weather':
        objects = get_files_from_aws(selected_category)
        category_df = get_weather_data(objects)
        processed_category_df = create_temporal_columns(category_df)

    if selected_category == 'visitor_centers':
        objects = get_files_from_aws(selected_category)
        category_df = get_visitor_centers_data(objects)
        category_df = category_df.set_index('Datum')
        processed_category_df = create_temporal_columns(category_df)

    get_values = extract_values_according_to_type(selected_query,selected_query_type)

    processed_category_df = get_queried_df(processed_category_df, get_values,selected_query_type, selected_category, start_date, end_date)

    return processed_category_df

get_parking_data_for_selected_sensor(selected_sensor)

Fetches parking data for a specified sensor from S3.

This function searches through a list of S3 object paths to find the most relevant object that contains the specified sensor name. It then retrieves the parking data from the corresponding Parquet file.

Parameters:

Name Type Description Default
objects list

A list of S3 object paths to search for the selected sensor.

required
selected_sensor str

The name of the sensor to filter the objects.

required

Returns:

Type Description

pandas.DataFrame: A DataFrame containing the parking data read

from the Parquet file.

Raises:

Type Description
ValueError

If the selected sensor is not found in any object.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/data_retrieval.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
def get_parking_data_for_selected_sensor(selected_sensor):

    """Fetches parking data for a specified sensor from S3.

    This function searches through a list of S3 object paths to find
    the most relevant object that contains the specified sensor name.
    It then retrieves the parking data from the corresponding Parquet file.

    Args:
        objects (list): A list of S3 object paths to search for the
            selected sensor.
        selected_sensor (str): The name of the sensor to filter the
            objects.

    Returns:
        pandas.DataFrame: A DataFrame containing the parking data read
        from the Parquet file.

    Raises:
        ValueError: If the selected sensor is not found in any object.
    """
    path = f"s3://{aws_s3_bucket}/preprocessed_data/preprocessed_parking_data/merged_parking_data/{selected_sensor}.csv"
    df = wr.s3.read_csv(path)
    df.set_index("time", inplace=True)
    return df

get_queried_df(processed_category_df, get_values, type, selected_category, start_date, end_date)

Retrieve a filtered DataFrame based on the selected category and query type.

This function filters the input DataFrame processed_category_df according to the specified selected_category and type. It uses values provided in the get_values dictionary to perform the filtering.

Parameters:

Name Type Description Default
processed_category_df DataFrame

The DataFrame containing processed data.

required
get_values dict

A dictionary containing values for filtering, including: - 'property' (str): The property to select from the DataFrame. - 'start_date' (str): Start date for filtering (format: 'YYYY-MM-DD'). - 'end_date' (str): End date for filtering (format: 'YYYY-MM-DD'). - 'month' (int): Month for filtering. - 'year' (int): Year for filtering. - 'season' (str): Season for filtering (e.g., 'spring', 'summer').

required
type str

The type of query to perform. Options include: - 'type1': Filter by date range. - 'type2': Filter by month and year. - 'type3': Filter by season and year. - 'type4': Filter by date range (weather category). - 'type5': Filter by month and year (weather category). - 'type6': Filter by season and year (weather category).

required
selected_category str

The category to filter by. Options include: - 'parking' - 'weather' - 'visitor_centers' - 'visitor_sensors'

required

Returns:

Type Description

pd.DataFrame: A DataFrame containing the filtered data for the specified property.

Raises:

Type Description
KeyError

If 'property' is not in get_values.

ValueError

If an invalid type or selected_category is provided.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/data_retrieval.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def get_queried_df(processed_category_df, get_values, type, selected_category, start_date, end_date):

    """Retrieve a filtered DataFrame based on the selected category and query type.

    This function filters the input DataFrame `processed_category_df` according to the
    specified `selected_category` and `type`. It uses values provided in the `get_values`
    dictionary to perform the filtering.

    Args:
        processed_category_df (pd.DataFrame): The DataFrame containing processed data.
        get_values (dict): A dictionary containing values for filtering, including:
            - 'property' (str): The property to select from the DataFrame.
            - 'start_date' (str): Start date for filtering (format: 'YYYY-MM-DD').
            - 'end_date' (str): End date for filtering (format: 'YYYY-MM-DD').
            - 'month' (int): Month for filtering.
            - 'year' (int): Year for filtering.
            - 'season' (str): Season for filtering (e.g., 'spring', 'summer').
        type (str): The type of query to perform. Options include:
            - 'type1': Filter by date range.
            - 'type2': Filter by month and year.
            - 'type3': Filter by season and year.
            - 'type4': Filter by date range (weather category).
            - 'type5': Filter by month and year (weather category).
            - 'type6': Filter by season and year (weather category).
        selected_category (str): The category to filter by. Options include:
            - 'parking'
            - 'weather'
            - 'visitor_centers'
            - 'visitor_sensors'

    Returns:
        pd.DataFrame: A DataFrame containing the filtered data for the specified property.

    Raises:
        KeyError: If 'property' is not in `get_values`.
        ValueError: If an invalid type or selected_category is provided.
    """

    if get_values is None:
        start_date = pd.to_datetime(start_date)
        end_date = pd.to_datetime(end_date)
        queried_df = processed_category_df[
            (processed_category_df.index.date >= start_date.date()) &
            (processed_category_df.index.date <= end_date.date())
        ]
        return queried_df

    # get the property value from the get values dictionary
    if 'property' in get_values:
        property_value = get_values['property']

    # Implement filtering logic based on selected_category and type

    if selected_category == 'parking':

        if type == 'type1':
            start_date = pd.to_datetime(get_values['start_date'])
            end_date = pd.to_datetime(get_values['end_date'])
            queried_df = processed_category_df[
                (processed_category_df.index.date >= start_date.date()) &
                (processed_category_df.index.date <= end_date.date())
            ]
            queried_df = queried_df[[property_value]]
            return queried_df  

        if type == 'type2':
            month = get_values['month']
            year = int(get_values['year'])
            queried_df = processed_category_df[
                (processed_category_df['month'] == month) &
                (processed_category_df['year'] == year)
            ]
            queried_df = queried_df[[property_value]]
            return queried_df

        if type == 'type3':
            season = get_values['season']
            year = int(get_values['year'])
            queried_df = processed_category_df[
                (processed_category_df['season'] == season) &
                (processed_category_df['year'] == year)
            ]
            queried_df = queried_df[[property_value]]
            return queried_df

    if selected_category == 'weather':
        if type == 'type4':
            start_date = pd.to_datetime(get_values['start_date'])
            end_date = pd.to_datetime(get_values['end_date'])
            queried_df = processed_category_df[
                (processed_category_df.index.date >= start_date.date()) &
                (processed_category_df.index.date <= end_date.date())
            ]
            queried_df = queried_df[[property_value]]
            return queried_df

        if type == 'type5':
            month = get_values['month']
            year = int(get_values['year'])
            queried_df = processed_category_df[
                (processed_category_df['month'] == month) &
                (processed_category_df['year'] == year)
            ]
            queried_df = queried_df[[property_value]]
            return queried_df

        if type == 'type6':
            season = get_values['season']
            year = int(get_values['year'])
            queried_df = processed_category_df[
                (processed_category_df['season'] == season) &
                (processed_category_df['year'] == year)
            ]
            queried_df = queried_df[[property_value]]
            return queried_df

    if selected_category == 'visitor_centers' or selected_category == 'visitor_sensors':
        if type == 'type4':
            start_date = pd.to_datetime(get_values['start_date'])
            end_date = pd.to_datetime(get_values['end_date'])
            queried_df = processed_category_df[
                (processed_category_df.index.date >= start_date.date()) &
                (processed_category_df.index.date <= end_date.date())
            ]
            queried_df = queried_df[[property_value]]
            return queried_df

        if type == 'type5':
            month = get_values['month']
            year = int(get_values['year'])
            queried_df = processed_category_df[
                (processed_category_df['month'] == month) &
                (processed_category_df['year'] == year)
            ]
            queried_df = queried_df[[property_value]]
            return queried_df

        if type == 'type6':
            season = get_values['season']
            year = int(get_values['year'])
            queried_df = processed_category_df[
                (processed_category_df['season'] == season) &
                (processed_category_df['year'] == year)
            ]
            queried_df = queried_df[[property_value]]
            return queried_df

get_sensors_data()

Fetches sensor data from the most recently modified object.

This function retrieves the sensor data from a specified object in S3 by reading a CSV file. It selects the last object from the provided list of objects, assuming this is the most recently modified.

Parameters:

Name Type Description Default
objects list

A list of S3 object paths, where the last object is the most recently modified.

required

Returns:

Type Description

pandas.DataFrame: A DataFrame containing the sensor data read

from the CSV file.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/data_retrieval.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
def get_sensors_data():
    """Fetches sensor data from the most recently modified object.

    This function retrieves the sensor data from a specified object
    in S3 by reading a CSV file. It selects the last object from
    the provided list of objects, assuming this is the most recently
    modified.

    Args:
        objects (list): A list of S3 object paths, where the last
            object is the most recently modified.

    Returns:
        pandas.DataFrame: A DataFrame containing the sensor data read
        from the CSV file.
    """

    df = wr.s3.read_parquet(f"s3://{aws_s3_bucket}/preprocessed_data/preprocessed_visitor_count_sensors_data.parquet")
    return df

get_visitor_centers_data(objects)

Fetches visitor centers data from the most recently modified Excel file.

This function retrieves visitor centers data from a specified Excel file in S3. It selects the last object from the provided list of objects that is an Excel file (with extensions '.xlsx' or '.xls'), assuming this is the most recently modified Excel file.

Parameters:

Name Type Description Default
objects list

A list of S3 object paths, where the last object ending in '.xlsx' or '.xls' is the most recently modified Excel file.

required

Returns:

Type Description

pandas.DataFrame: A DataFrame containing the visitor centers

data read from the Excel file.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/data_retrieval.py
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def get_visitor_centers_data(objects):
    """Fetches visitor centers data from the most recently modified Excel file.

    This function retrieves visitor centers data from a specified Excel file
    in S3. It selects the last object from the provided list of objects
    that is an Excel file (with extensions '.xlsx' or '.xls'), assuming this
    is the most recently modified Excel file.

    Args:
        objects (list): A list of S3 object paths, where the last
            object ending in '.xlsx' or '.xls' is the most recently modified Excel file.

    Returns:
        pandas.DataFrame: A DataFrame containing the visitor centers
        data read from the Excel file.
    """
    # Filter the list to include only objects that are Excel files
    excel_objects = [obj for obj in objects if obj.endswith(('.xlsx', '.xls'))]

    if not excel_objects:
        raise ValueError("No visitor center data found!")

    # Select the most recently modified Excel file (i.e., the last one in the list)
    object_to_be_queried = excel_objects[-1]

    # Read the Excel file from S3, skipping the last row which is a NaN row
    df = wr.s3.read_excel(f"{object_to_be_queried}", skipfooter=1, engine="openpyxl")

    return df

get_weather_data(objects)

Fetches weather data from the most recently modified object.

This function retrieves weather data from a specified object in S3 by reading an Parquet file. It selects the last object from the provided list of objects, assuming this is the most recently modified.

Parameters:

Name Type Description Default
objects list

A list of S3 object paths, where the last object is the most recently modified.

required

Returns:

Type Description

pandas.DataFrame: A DataFrame containing the weather

data read from the Parquet file.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/data_retrieval.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def get_weather_data(objects):

    """Fetches weather data from the most recently modified object.

    This function retrieves weather data from a specified object
    in S3 by reading an Parquet file. It selects the last object from
    the provided list of objects, assuming this is the most recently
    modified.

    Args:
        objects (list): A list of S3 object paths, where the last
            object is the most recently modified.

    Returns:
        pandas.DataFrame: A DataFrame containing the weather
        data read from the Parquet file.
    """

    # if there are multiple objects get the last mostfied one
    object_to_be_queried = objects[-1]
    # Read the parquet file from S3
    df = wr.s3.read_parquet(f"{object_to_be_queried}")
    return df

parse_german_dates_regex(df, date_column_name)

Parses German dates in the specified date column of the DataFrame using regex, including hours and minutes if available.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame containing the date column.

required
date_column_name str

The name of the date column.

required

Returns:

Type Description
DataFrame

pd.DataFrame: The DataFrame with parsed German dates.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/data_retrieval.py
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
def parse_german_dates_regex(
    df: pd.DataFrame,
    date_column_name: str
) -> pd.DataFrame:
    """
    Parses German dates in the specified date column of the DataFrame using regex,
    including hours and minutes if available.

    Args:
        df (pd.DataFrame): The DataFrame containing the date column.
        date_column_name (str): The name of the date column.

    Returns:
        pd.DataFrame: The DataFrame with parsed German dates.
    """

    # Define a mapping of German month names to their numeric values
    month_map = {
        "Jan.": "01",
        "Feb.": "02",
        "März": "03",
        "Apr.": "04",
        "Mai": "05",
        "Juni": "06",
        "Juli": "07",
        "Aug.": "08",
        "Sep.": "09",
        "Okt.": "10",
        "Nov.": "11",
        "Dez.": "12"
    }

    # Create a regex pattern for replacing months and capturing time
    pattern = re.compile(r'(\d{1,2})\.\s*(' + '|'.join(month_map.keys()) + r')\s*(\d{4})\s*(\d{2}):(\d{2})')

    # Function to replace the month in the matched string and keep the time part
    def replace_month(match):
        day = match.group(1)
        month = month_map[match.group(2)]
        year = match.group(3)
        hour = match.group(4)
        minute = match.group(5)
        return f"{year}-{month}-{day} {hour}:{minute}:00"

    # Apply regex replacement and convert to datetime
    df[date_column_name] = df[date_column_name].apply(lambda x: replace_month(pattern.search(x)) if pattern.search(x) else x)
    df[date_column_name] = pd.to_datetime(df[date_column_name], errors='coerce')

    return df

list_files_in_s3(category)

Lists files in S3 for a given category and returns only file names.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/download.py
10
11
12
13
14
def list_files_in_s3(category: str) -> list:
    """Lists files in S3 for a given category and returns only file names."""
    s3_prefix = f"{base_folder}/{category.replace(' ', '_')}"
    full_paths = wr.s3.list_objects(f"s3://{aws_s3_bucket}/{s3_prefix}/")
    return [path.split('/')[-1] for path in full_paths]  # Extract only the file names

load_csv_files_from_aws_s3(path, **kwargs)

Loads individual or multiple CSV files from an AWS S3 bucket.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/download.py
16
17
18
19
def load_csv_files_from_aws_s3(path: str, **kwargs) -> pd.DataFrame:
    """Loads individual or multiple CSV files from an AWS S3 bucket."""
    df = wr.s3.read_csv(path=path, **kwargs)
    return df

generate_queries(category, start_date, end_date, selected_properties, selected_sensors)

Generate queries based on the selected category and date range.

Parameters:

Name Type Description Default
category str

The category of data (e.g., 'parking', 'weather', 'visitor_sensors', 'visitor_centers').

required
start_date str

The start date for the queries.

required
end_date str

The end date for the queries.

required
selected_properties list

List of selected properties relevant to the category.

required
selected_sensors list

List of selected sensors relevant to the category.

required

Returns:

Name Type Description
dict

A dictionary containing generated queries based on the specified category and filters.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/query_box.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
def generate_queries(category, start_date, end_date, selected_properties, selected_sensors):

    """
    Generate queries based on the selected category and date range.

    Args:
        category (str): The category of data (e.g., 'parking', 'weather', 'visitor_sensors', 'visitor_centers').
        start_date (str): The start date for the queries.
        end_date (str): The end date for the queries.
        selected_properties (list): List of selected properties relevant to the category.
        selected_sensors (list): List of selected sensors relevant to the category.

    Returns:
        dict: A dictionary containing generated queries based on the specified category and filters.
    """

    if category == 'parking':
        queries = get_queries_for_parking(start_date, end_date,selected_properties, selected_sensors)
    if category == 'weather':
        queries = get_queries_for_weather(start_date, end_date, selected_properties)
    if category == 'visitor_sensors':
        queries = get_queries_for_visitor_sensors(start_date, end_date, selected_sensors)
    if category == 'visitor_centers':
        queries = get_queries_for_visitor_centers(start_date, end_date, selected_sensors)
    return queries

get_queries_for_parking(start_date, end_date, selected_properties, selected_sensors)

Generate queries for parking data based on selected date range, properties, and sensors.

Parameters:

Name Type Description Default
start_date str

The start date for the query.

required
end_date str

The end date for the query.

required
selected_properties list

List of parking properties to include in the query (e.g., occupancy, capacity).

required
selected_sensors list

List of parking sensors to query data for.

required

Returns:

Name Type Description
dict

A dictionary with keys "type1", "type2", and "type3" containing queries for the date range.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/query_box.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def get_queries_for_parking(start_date, end_date, selected_properties, selected_sensors):

    """
    Generate queries for parking data based on selected date range, properties, and sensors.

    Args:
        start_date (str): The start date for the query.
        end_date (str): The end date for the query.
        selected_properties (list): List of parking properties to include in the query (e.g., occupancy, capacity).
        selected_sensors (list): List of parking sensors to query data for.

    Returns:
        dict: A dictionary with keys "type1", "type2", and "type3" containing queries for the date range.
    """

    queries = {}

    if selected_sensors:
        for property in selected_properties:
            queries.setdefault("type1", []).append(
                f"What is the {property} value for the sensor {selected_sensors} from {start_date} to {end_date}?"
            )

    return queries

get_queries_for_visitor_centers(start_date, end_date, selected_sensors)

Generate queries for visitor center data based on selected date range,and sensors.

Parameters:

Name Type Description Default
start_date str

The start date for the query.

required
end_date str

The end date for the query.

required
selected_sensors list

List of visitor center sensors to query data for.

required

Returns:

Name Type Description
dict

A dictionary with keys "type4", "type5", and "type6" containing queries for the date range.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/query_box.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def get_queries_for_visitor_centers(start_date, end_date, selected_sensors):

    """
    Generate queries for visitor center data based on selected date range,and sensors.

    Args:
        start_date (str): The start date for the query.
        end_date (str): The end date for the query.
        selected_sensors (list): List of visitor center sensors to query data for.

    Returns:
        dict: A dictionary with keys "type4", "type5", and "type6" containing queries for the date range.
    """

    queries = {}

    # Queries for the date range (type1)
    for sensor in selected_sensors:
        queries.setdefault("type4", []).append(
            f"What is the {sensor} value from {start_date} to {end_date}?"
        )

    return queries

get_queries_for_visitor_sensors(start_date, end_date, selected_sensors)

Generate queries for visitor sensor data based on selected date range and sensors.

Parameters:

Name Type Description Default
start_date str

The start date for the query.

required
end_date str

The end date for the query.

required
selected_sensors list

List of visitor sensors to query data for.

required

Returns:

Name Type Description
dict

A dictionary with keys "type4", "type5", and "type6" containing queries for the date range.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/query_box.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def get_queries_for_visitor_sensors(start_date, end_date, selected_sensors):

    """
    Generate queries for visitor sensor data based on selected date range and sensors.

    Args:
        start_date (str): The start date for the query.
        end_date (str): The end date for the query.
        selected_sensors (list): List of visitor sensors to query data for.

    Returns:
        dict: A dictionary with keys "type4", "type5", and "type6" containing queries for the date range.
    """

    queries = {}

    # Queries for the date range (type1)
    for sensor in selected_sensors:
        queries.setdefault("type4", []).append(
            f"What is the {sensor} value from {start_date} to {end_date}?"
        )

    return queries

get_query_section()

Get the query section for data selection and execution.

This function displays a user interface for selecting data categories, date ranges, and additional filters. It allows users to generate specific queries and execute them to retrieve data.

Returns:

Name Type Description
None

This function does not return any values but updates the Streamlit UI with the selected query results and visualizations.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/query_box.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def get_query_section():

    """
    Get the query section for data selection and execution.

    This function displays a user interface for selecting data categories, date ranges, 
    and additional filters. It allows users to generate 
    specific queries and execute them to retrieve data.

    Returns:
        None: This function does not return any values but updates the Streamlit UI
              with the selected query results and visualizations.
    """
    # display the query box
    st.markdown("## Data query")

    col1, col2 = st.columns((1,1))

    with col1:
        selected_category = select_category()
        print(selected_category)

    with col2:
        start_date, end_date = select_date()
        print(start_date, end_date)

    selected_properties, selected_sensors = select_filters(selected_category, start_date, end_date)

    # Give options to select your queries in form of a dropdown
    queries_dict = generate_queries(selected_category, start_date, end_date, selected_properties,selected_sensors)

    # get all the values of the all the keys in the dictionary queries

    queries = [query for query_list in queries_dict.values() for query in query_list]

    selected_query_type = None

    with st.form("Select a query"):

        selected_query = st.selectbox("Select a query", queries)

        # get the type of the selected query from the queries dictionary
        for key, value in queries_dict.items():
            if selected_query in value:
                selected_query_type = key

        submitted = st.form_submit_button(":green[Run Query]")
    if submitted:
        # get_data_from_query(selected_query,selected_query_type,selected_category)
        queried_df = get_data_from_query(selected_category,selected_query,selected_query_type, start_date, end_date, selected_sensors)

        # handle error if the queried df is an empty dataframe
        if queried_df.empty:
            st.error("Error: The query returned an empty dataframe. Please try again.")
            st.stop()
        else:
            st.write("Query executed successfully!")

            # get visualization for the queried data
            get_visualization_section(queried_df)

select_category()

Select the category of data to access using st.selectbox from Streamlit.

Returns:

Name Type Description
category str

The category selected by the user.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/query_box.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
def select_category():
    """
    Select the category of data to access using st.selectbox from Streamlit.

    Returns: 
        category (str): The category selected by the user.
    """
    # select the dropdown for the category
    category = st.selectbox("Select data category", 
                            ["weather", "visitor_sensors", "parking", "visitor_centers"],
                            index=0)

    return category

select_date()

Select the start and end date for data access using date inputs in Streamlit.

Returns:

Name Type Description
tuple

The selected start and end date in the format "MM-DD-YYYY".

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/query_box.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def select_date():
    """
    Select the start and end date for data access using date inputs in Streamlit.

    Returns: 
        tuple: The selected start and end date in the format "MM-DD-YYYY".
    """

    # Define the default start and end dates of the 01.01.2023 to 31.12.2023
    default_start = datetime.datetime(2023, 1, 1)
    default_end = datetime.datetime(2023, 12, 31)

    # Create the date input widget with start date
    d = st.date_input(
        "Select the start date",
        default_start,
        format="DD.MM.YYYY",
    )
    # Create the date input widget with end date
    e = st.date_input(
        "Select the end date",
        default_end,
        format="DD.MM.YYYY",
    )
    # capture the selected date
    start_date = d.strftime("%m-%d-%Y")
    end_date = e.strftime("%m-%d-%Y")

    # prompt if the end date is chosen before start date
    if start_date > end_date:
        st.error("Error: End date must fall after start date.")
        st.stop()

    return start_date, end_date

select_filters(category, start_date, end_date)

Select additional filters such as sensors, weather values, or parking values.

Parameters:

Name Type Description Default
category str

The category selected by the user. Can be one of: "weather", "parking", "visitor_sensors", "visitor_centers".

required

Returns:

Name Type Description
tuple

A tuple containing selected_properties and selected_sensors.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/query_box.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def select_filters(category, start_date, end_date):

    """
    Select additional filters such as sensors, weather values, or parking values.

    Args:
        category (str): The category selected by the user. Can be one of:
                        "weather", "parking", "visitor_sensors", "visitor_centers".

    Returns:
        tuple: A tuple containing selected_properties and selected_sensors.
    """

    st.markdown("### More Filters")

    # Select the sensors or weather values or parking values
    category_based_filters = {
        "weather" : [
            'Temperature (°C)', 'Precipitation (mm)', 'Wind Speed (km/h)', 'Relative Humidity (%)', 'Sunshine Duration (min)'
            ],
        "parking" : {'sensors':[
                                'p-r-spiegelau-1','parkplatz-fredenbruecke-1','parkplatz-graupsaege-1',
                                'parkplatz-nationalparkzentrum-falkenstein-2','parkplatz-nationalparkzentrum-lusen-p2',
                                'parkplatz-skisportzentrum-finsterau-1','parkplatz-waldhaeuser-ausblick-1',
                                'parkplatz-waldhaeuser-kirche-1','parkplatz-zwieslerwaldhaus-1',
                                'parkplatz-zwieslerwaldhaus-nord-1','scheidt-bachmann-parkplatz-1',
                                'skiwanderzentrum-zwieslerwaldhaus-2'],

                     'properties':[
                         'occupancy', 'capacity', 'occupancy_rate'],
                    },

        'visitor_sensors'  :[
                                        'Bayerisch Eisenstein', 'Bayerisch Eisenstein IN',
                                        'Bayerisch Eisenstein OUT', 'Bayerisch Eisenstein Fußgänger',
                                        'Bayerisch Eisenstein Fahrräder',
                                        'Bayerisch Eisenstein Fußgänger IN',
                                        'Bayerisch Eisenstein Fußgänger OUT',
                                        'Bayerisch Eisenstein Fahrräder IN',
                                        'Bayerisch Eisenstein Fahrräder OUT', 'Brechhäuslau IN',
                                        'Brechhäuslau OUT', 'Brechhäuslau', 'Brechhäuslau Fußgänger IN',
                                        'Brechhäuslau Fußgänger OUT', 'Bucina_Multi', 'Bucina_Multi OUT',
                                        'Bucina_Multi Fußgänger', 'Bucina_Multi Fahrräder',
                                        'Bucina_Multi Fußgänger IN', 'Bucina_Multi Fahrräder IN',
                                        'Bucina_Multi Fahrräder OUT', 'Bucina_Multi Fußgänger OUT',
                                        'Deffernik', 'Deffernik IN', 'Deffernik OUT',
                                        'Deffernik Fußgänger', 'Deffernik Fahrräder',
                                        'Deffernik Fahrräder IN', 'Deffernik Fahrräder OUT',
                                        'Deffernik Fußgänger IN', 'Deffernik Fußgänger OUT',
                                        'Diensthüttenstraße', 'Diensthüttenstraße Fußgänger IN',
                                        'Diensthüttenstraße Fußgänger OUT', 'Felswandergebiet',
                                        'Felswandergebiet IN', 'Felswandergebiet OUT', 'Ferdinandsthal',
                                        'Ferdinandsthal IN', 'Ferdinandsthal OUT', 'Fredenbrücke',
                                        'Fredenbrücke Fußgänger IN', 'Fredenbrücke Fußgänger OUT', 'Gfäll',
                                        'Gfäll Fußgänger IN', 'Gfäll Fußgänger OUT', 'Gsenget',
                                        'Gsenget IN', 'Gsenget OUT', 'Gsenget Fußgänger',
                                        'Gsenget Fahrräder', 'Gsenget IN.1', 'Gsenget OUT.1',
                                        'Gsenget Fahrräder IN', 'Gsenget Fahrräder OUT',
                                        'Klingenbrunner Wald', 'Klingenbrunner Wald IN',
                                        'Klingenbrunner Wald OUT', 'Klingenbrunner Wald Fußgänger',
                                        'Klingenbrunner Wald Fahrräder',
                                        'Klingenbrunner Wald Fußgänger IN',
                                        'Klingenbrunner Wald Fußgänger OUT',
                                        'Klingenbrunner Wald Fahrräder IN',
                                        'Klingenbrunner Wald Fahrräder OUT', 'Klosterfilz',
                                        'Klosterfilz IN', 'Klosterfilz OUT', 'Klosterfilz Fußgänger',
                                        'Klosterfilz Fahrräder', 'Klosterfilz Fußgänger IN',
                                        'Klosterfilz Fußgänger OUT', 'Klosterfilz Fahrräder IN',
                                        'Klosterfilz Fahrräder OUT', 'Racheldiensthütte',
                                        'Racheldiensthütte IN', 'Racheldiensthütte OUT',
                                        'Racheldiensthütte Fußgänger', 'Racheldiensthütte Fahrräder',
                                        'Racheldiensthütte Fahrräder IN', 'Racheldiensthütte Cyclist OUT',
                                        'Racheldiensthütte Pedestrian IN',
                                        'Racheldiensthütte Pedestrian OUT', 'Sagwassersäge',
                                        'Sagwassersäge Fußgänger IN', 'Sagwassersäge Fußgänger OUT',
                                        'Scheuereck', 'Scheuereck IN', 'Scheuereck OUT', 'Schillerstraße',
                                        'Schillerstraße IN', 'Schillerstraße OUT', 'Schwarzbachbrücke',
                                        'Schwarzbachbrücke Fußgänger IN',
                                        'Schwarzbachbrücke Fußgänger OUT', 'TFG_Falkenstein_1',
                                        'TFG_Falkenstein_1 Fußgänger zum Parkplatz',
                                        'TFG_Falkenstein_1 Fußgänger zum HZW', 'TFG_Falkenstein_2',
                                        'TFG_Falkenstein_2 Fußgänger In Richtung Parkplatz',
                                        'TFG_Falkenstein_2 Fußgänger In Richtung TFG', 'TFG_Lusen_1',
                                        'TFG_Lusen_1 Fußgänger Richtung TFG',
                                        'TFG_Lusen_1 Fußgänger Richtung Parkplatz', 'TFG_Lusen_2',
                                        'TFG_Lusen_2 Fußgänger Richtung Vögel am Waldrand',
                                        'TFG_Lusen_2 Fußgänger Richtung Parkplatz', 'TFG_Lusen_3',
                                        'TFG_Lusen_3 TFG Lusen 3 IN', 'TFG_Lusen_3 TFG Lusen 3 OUT',
                                        'Trinkwassertalsperre_MULTI', 'Trinkwassertalsperre_MULTI IN',
                                        'Trinkwassertalsperre_MULTI OUT',
                                        'Trinkwassertalsperre_MULTI Fußgänger',
                                        'Trinkwassertalsperre_MULTI Fußgänger IN',
                                        'Trinkwassertalsperre_MULTI Fußgänger OUT',
                                        'Trinkwassertalsperre_MULTI Fahrräder',
                                        'Trinkwassertalsperre_MULTI Fahrräder IN',
                                        'Trinkwassertalsperre_MULTI Fahrräder OUT', 'Waldhausreibe',
                                        'Waldhausreibe IN', 'Waldhausreibe OUT', 'Waldspielgelände_1',
                                        'Waldspielgelände_1 IN', 'Waldspielgelände_1 OUT', 'Wistlberg',
                                        'Wistlberg Fußgänger IN', 'Wistlberg Fußgänger OUT'],


        'visitor_centers' : [
                            'Besuchszahlen_HEH',
                            'Besuchszahlen_HZW', 'Besuchszahlen_WGM', 'Parkpl_HEH_PKW',
                            'Parkpl_HEH_BUS', 'Parkpl_HZW_PKW', 'Parkpl_HZW_BUS'
                            ]
    }
    if category == "weather":
        selected_properties = st.multiselect("Select the weather properties", category_based_filters[category], default=None)
        selected_sensors = None

    elif category == "parking":
        selected_properties = st.multiselect("Select the parking values", category_based_filters[category]['properties'], default=None)  
        selected_sensors = st.selectbox("Select the parking sensor you want to find the values for?", category_based_filters[category]['sensors'])

    elif category == "visitor_sensors":
        visitor_sensors_data = get_data_from_query(
            selected_category=category,
            selected_query=None,
            selected_query_type=None,
            start_date=start_date,
            end_date=end_date,
            selected_sensors=None)

        visitor_sensor_options = list(set(visitor_sensors_data.columns.tolist()) - set(["month", "year", "season"]))

        selected_sensors = st.multiselect("Select the visitor sensor you want to find the count for?", visitor_sensor_options, default=None)
        selected_properties = None

    elif category == "visitor_centers":
        selected_sensors = st.multiselect("Select the visitor center you want to find the count for?", category_based_filters[category], default=None)
        selected_properties = None

    else:
        selected_properties = None
        selected_sensors = None

    return selected_properties, selected_sensors

get_visualization_section(retrieved_df)

Get the visualization section.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/query_viz_and_download.py
 8
 9
10
11
12
13
14
15
16
17
18
def get_visualization_section(retrieved_df):
    """
    Get the visualization section.
    """
    st.markdown("# Data visualization")

    st.dataframe(retrieved_df)

    # Generate Pandas Profiling report
    pr = ProfileReport(retrieved_df,minimal=True)
    st_profile_report(pr)

generate_file_name(category, upload_timestamp)

Generates a file name based on the category.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/upload.py
16
17
18
def generate_file_name(category: str, upload_timestamp: str) -> str:
    """Generates a file name based on the category."""
    return f"{category.replace(' ', '_')}_uploaded_{upload_timestamp}.csv"

write_csv_file_to_aws_s3(df, path, **kwargs)

Writes a CSV file to AWS S3.

Source code in src/streamlit_app/pages_in_dashboard/data_accessibility/upload.py
12
13
14
def write_csv_file_to_aws_s3(df: pd.DataFrame, path: str, **kwargs) -> None:
    """Writes a CSV file to AWS S3."""
    wr.s3.to_csv(df, path=path, **kwargs)

get_other_information()

Get the other information section.

Source code in src/streamlit_app/pages_in_dashboard/visitors/other_information.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def get_other_information():

    """
    Get the other information section.
    """
    st.markdown(f"### {TRANSLATIONS[st.session_state.selected_language]['other_information']}")


    with st.expander(f":green[{TRANSLATIONS[st.session_state.selected_language]['visitor_centers']}]"):
        st.markdown(f":green[{TRANSLATIONS[st.session_state.selected_language]['visitor_centers_description']}]")
        st.markdown(f"[{TRANSLATIONS[st.session_state.selected_language]['learn_more']}]({TRANSLATIONS[st.session_state.selected_language]['visitor_centers_link']})")

    with st.expander(f":green[{TRANSLATIONS[st.session_state.selected_language]['popular_entrances']}]"):
        st.markdown(f":green[{TRANSLATIONS[st.session_state.selected_language]['entrances_description']}]")
        st.markdown(":green[**1 - Falkenstein**]")
        st.markdown(f"[{TRANSLATIONS[st.session_state.selected_language]['learn_more']}](https://www.nationalpark-bayerischer-wald.bayern.de/besucher/einrichtungen/npz_falkenstein/index.htm)")

        st.markdown(":green[**2 - Lusen**]")
        st.markdown(f"[{TRANSLATIONS[st.session_state.selected_language]['learn_more']}](https://www.nationalpark-bayerischer-wald.bayern.de/besucher/einrichtungen/npz_lusen/index.htm)")

    with st.expander(f":green[{TRANSLATIONS[st.session_state.selected_language]['best_way_to_get_there']}]"):
        st.markdown(f":green[{TRANSLATIONS[st.session_state.selected_language]['getting_there_description']}]")
        st.markdown(f"[{TRANSLATIONS[st.session_state.selected_language]['learn_more']}](https://www.nationalpark-bayerischer-wald.bayern.de/service/anreise/)")

get_page_layout()

Set the page layout for the Streamlit app.

Returns:

Type Description

col1, col2: The two columns of the page layout.

Source code in src/streamlit_app/pages_in_dashboard/visitors/page_layout_config.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def get_page_layout():
    """
    Set the page layout for the Streamlit app.

    Returns:
        col1, col2: The two columns of the page layout.
    """
    st.set_page_config(
    page_title='Plan your trip🌲',
    page_icon="🌲",
    layout="wide",
    initial_sidebar_state="expanded")

    alt.themes.enable("dark")

    # Define the app layout

    col1, col2 = st.columns((2.2,1), gap='medium')

    return col1, col2

calculate_color_based_on_occupancy_rate(occupancy_rate)

Calculate the color of the marker based on the occupancy rate. Returns a named tuple with the RGB values and a CSS gradient color value.

Parameters:

Name Type Description Default
occupancy_rate float

The occupancy rate of the parking section.

required

Returns:

Name Type Description
list dict

A list of RGB values representing the color of the marker.

Source code in src/streamlit_app/pages_in_dashboard/visitors/parking.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def calculate_color_based_on_occupancy_rate(occupancy_rate) -> dict:
    """
    Calculate the color of the marker based on the occupancy rate.
    Returns a named tuple with the RGB values and a CSS gradient color value.



    Args:
        occupancy_rate (float): The occupancy rate of the parking section.

    Returns:
        list: A list of RGB values representing the color of the marker.
    """
    occupancy_rate = float(occupancy_rate)

    if occupancy_rate >= 80:
        return {"color_markers_map_visualization": [230, 39, 39],
                "color_bar_occupancy_rate": "red"} # red
    elif occupancy_rate >= 60:
        return {"color_markers_map_visualization": [250, 232, 8],
                "color_bar_occupancy_rate": "yellow"} # yellow
    else:
        return {"color_markers_map_visualization": [109, 249, 2],
                "color_bar_occupancy_rate": "green"} # green

get_fixed_size()

Get a fixed size value for the map markers.

Source code in src/streamlit_app/pages_in_dashboard/visitors/parking.py
10
11
12
13
14
def get_fixed_size():
    """
    Get a fixed size value for the map markers.
    """
    return 450  

get_occupancy_status(occupancy_rate)

Get the occupancy status (High, Medium, Low) based on the occupancy rate.

Parameters:

Name Type Description Default
occupancy_rate float

The occupancy rate of the parking section.

required

Returns:

Name Type Description
str

The occupancy status ("High", "Medium", "Low").

Source code in src/streamlit_app/pages_in_dashboard/visitors/parking.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def get_occupancy_status(occupancy_rate):
    """
    Get the occupancy status (High, Medium, Low) based on the occupancy rate.

    Args:
        occupancy_rate (float): The occupancy rate of the parking section.

    Returns:
        str: The occupancy status ("High", "Medium", "Low").
    """
    if occupancy_rate >= 80:
        return TRANSLATIONS[st.session_state.selected_language]["parking_status_high"]
    elif occupancy_rate >= 60:
        return TRANSLATIONS[st.session_state.selected_language]["parking_status_moderate"]
    else:
        return TRANSLATIONS[st.session_state.selected_language]["parking_status_low"]

get_parking_section()

Display the parking section of the dashboard with a map showing the real-time parking occupancy and interactive metrics.

Parameters:

Name Type Description Default
processed_parking_data DataFrame

Processed parking data.

required

Returns:

Type Description

None

Source code in src/streamlit_app/pages_in_dashboard/visitors/parking.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@st.fragment(run_every="15min")
def get_parking_section():
    """
    Display the parking section of the dashboard with a map showing the real-time parking occupancy 
    and interactive metrics.

    Args:
        processed_parking_data (pd.DataFrame): Processed parking data.

    Returns:
        None
    """

    print("Rendering parking section for the visitor dashboard...")

    def get_current_15min_interval():
        """
        Get the current 15-minute interval in the format "HH:MM:00".

        Returns:
            str: The current 15-minute interval in the format "HH:MM:00".
        """
        current_time = datetime.now(pytz.timezone('Europe/Berlin'))
        minutes = (current_time.minute // 15) * 15

        # Replace the minute value with the truncated value and set seconds and microseconds to 0
        timestamp_latest_parking_data_fetch = current_time.replace(minute=minutes, second=0, microsecond=0)

        # If you want to format it as a string in the "%Y-%m-%d %H:%M:%S" format
        timestamp_latest_parking_data_fetch_str = timestamp_latest_parking_data_fetch.strftime("%Y-%m-%d %H:%M:%S")

        return timestamp_latest_parking_data_fetch_str

    timestamp_latest_parking_data_fetch = get_current_15min_interval()

    # Source and preprocess the parking data
    processed_parking_data = source_and_preprocess_realtime_parking_data(timestamp_latest_parking_data_fetch)

    st.markdown(f"### {TRANSLATIONS[st.session_state.selected_language]['real_time_parking_occupancy']}")

    # Set a fixed size for all markers
    processed_parking_data['size'] = get_fixed_size()
    processed_parking_data['color'] = processed_parking_data['current_occupancy_rate'].apply(lambda occupancy_rate: calculate_color_based_on_occupancy_rate(occupancy_rate)["color_markers_map_visualization"])

    # Convert the occupancy rate to numeric and handle errors
    processed_parking_data['current_occupancy_rate'] = pd.to_numeric(processed_parking_data['current_occupancy_rate'], errors='coerce')

    # Map occupancy rate to status (High, Medium, Low)
    processed_parking_data['occupancy_status'] = processed_parking_data['current_occupancy_rate'].apply(get_occupancy_status)

    # Calculate center of the map based on the average of latitudes and longitudes
    avg_latitude = processed_parking_data['latitude'].mean()
    avg_longitude = processed_parking_data['longitude'].mean()

    # PyDeck Map Configuration with adjusted view_state
    view_state = pdk.ViewState(
        latitude=avg_latitude,  # Center map at the average latitude
        longitude=avg_longitude,  # Center map at the average longitude
        zoom=10,  # Zoom level increased for a closer view
        pitch=50
    )

    layer = pdk.Layer(
        "ScatterplotLayer",
        data=processed_parking_data,
        get_position=["longitude", "latitude"],
        get_radius="size",
        get_fill_color="color",
        pickable=True,
    )

    deck = pdk.Deck(
        layers=[layer],
        initial_view_state=view_state,
        tooltip={
            "text": "{location}\n" + f"{TRANSLATIONS[st.session_state.selected_language]['occupancy_status']}: " + "{occupancy_status}"
        },
        map_style="road"
    )
    st.pydeck_chart(deck)

    # Interactive Metrics
    selected_location = st.selectbox(
        TRANSLATIONS[st.session_state.selected_language]['select_parking_section'], 
        processed_parking_data['location'].unique(),
        key="selectbox_parking_section"
    )

    # Display selected location details
    if selected_location:
        selected_data = processed_parking_data[processed_parking_data['location'] == selected_location].iloc[0]

        col1, col2, col3 = st.columns(3)
        col1.metric(label=TRANSLATIONS[st.session_state.selected_language]['capacity'], value=f"{selected_data['current_capacity']} 🚗")

        # Display occupancy status and bar
        with col2:
            st.metric(label = TRANSLATIONS[st.session_state.selected_language]['occupancy_status'], value=f"{selected_data['occupancy_status']}")
        with col3:
            st.markdown(f"**{TRANSLATIONS[st.session_state.selected_language]['occupancy_rate']}**")
            render_occupancy_bar(selected_data['current_occupancy_rate'])

render_occupancy_bar(occupancy_rate)

Render a color bar representing the occupancy rate using HTML and CSS.

Parameters:

Name Type Description Default
occupancy_rate float

The occupancy rate of the parking section.

required

Returns:

Type Description

None

Source code in src/streamlit_app/pages_in_dashboard/visitors/parking.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def render_occupancy_bar(occupancy_rate):
    """
    Render a color bar representing the occupancy rate using HTML and CSS.

    Args:
        occupancy_rate (float): The occupancy rate of the parking section.

    Returns:
        None
    """
    # Ensure occupancy rate is between minimum value and 100
    minimum_value_of_occupancy = 5
    occupancy_rate = min(max(float(occupancy_rate), minimum_value_of_occupancy), 100)

    # Define the color based on occupancy
    bar_color = calculate_color_based_on_occupancy_rate(occupancy_rate)["color_bar_occupancy_rate"]

    # Create an HTML div with the appropriate width based on occupancy rate
    st.markdown(f"""
    <div style="width: 100%; background-color: lightgrey; border-radius: 5px; padding: 3px;">
        <div style="width: {occupancy_rate}%; background-color: {bar_color}; height: 25px; border-radius: 5px;"></div>
    </div>
    """, unsafe_allow_html=True)

get_recreation_section()

Get the recreational activities section for the Bavarian Forest National Park.

Returns: None

Source code in src/streamlit_app/pages_in_dashboard/visitors/recreational_activities.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def get_recreation_section():
    """
    Get the recreational activities section for the Bavarian Forest National Park.

    Args:
        None
    Returns:
        None
    """

    st.markdown(f"### {TRANSLATIONS[st.session_state.selected_language]['recreational_activities']}")


    activities = {
        TRANSLATIONS[st.session_state.selected_language]['hiking']: {
            "emoji": "🥾",
            "description": TRANSLATIONS[st.session_state.selected_language]['hiking_description'],
            "link": TRANSLATIONS[st.session_state.selected_language]['hiking_link']
        },
        TRANSLATIONS[st.session_state.selected_language]['cycling']: {
            "emoji": "🚴‍♂️",
            "description": TRANSLATIONS[st.session_state.selected_language]['cycling_description'],
            "link": TRANSLATIONS[st.session_state.selected_language]['cycling_link']
        },
        TRANSLATIONS[st.session_state.selected_language]['camping']: {
            "emoji": "🏕️",
            "description": TRANSLATIONS[st.session_state.selected_language]['camping_description'],
            "link": TRANSLATIONS[st.session_state.selected_language]['camping_link']
        },
        TRANSLATIONS[st.session_state.selected_language]['snowshoeing']: {
            "emoji": "🌨️",
            "description": TRANSLATIONS[st.session_state.selected_language]['snowshoeing_description'],
            "link": TRANSLATIONS[st.session_state.selected_language]['snowshoeing_link']
        },
        TRANSLATIONS[st.session_state.selected_language]['skiing']: {
            "emoji": "🎿",
            "description": TRANSLATIONS[st.session_state.selected_language]['skiing_description'],
            "link": TRANSLATIONS[st.session_state.selected_language]['skiing_link']
        },
    }

    for activity, info in activities.items():
        st.markdown(f"""
            <div style="
                padding: 5px 10px;
                margin-bottom: 8px;
                background-color: #215202;
                border-radius: 5px;
                text-align: left;">
                <h5 style="color: #fff; margin: 0;">{info['emoji']} {activity}</h5>
                <p style="color: #ccc; margin: 3px 0; font-size: 0.9em;">{info['description']}</p>
                <a href="{info['link']}" target="_blank" style="color: #00a0ff; font-size: 0.9em; text-decoration: none;f">{TRANSLATIONS[st.session_state.selected_language]['learn_more']}</a>
            </div>
        """, unsafe_allow_html=True)

get_visitor_counts_section(inference_predictions)

Get the visitor counts section with the highest occupancy rate.

Returns:

Type Description

None

Source code in src/streamlit_app/pages_in_dashboard/visitors/visitor_count.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@st.fragment
def get_visitor_counts_section(inference_predictions):
    """
    Get the visitor counts section with the highest occupancy rate.

    Args:
        None

    Returns:
        None
    """
    st.markdown(f"## {TRANSLATIONS[st.session_state.selected_language]['visitor_counts_forecasted']}")

    # do a dropdown for the all_preds
    regions_to_select = list(regions.keys())
    selected_region = st.selectbox(TRANSLATIONS[st.session_state.selected_language]['select_region'], regions_to_select)

    if selected_region:

        # Filter the DataFrame based on the selected region
        selected_region_predictions = inference_predictions[["Time", "day_date", selected_region, f"weekly_relative_traffic_{selected_region}", f"traffic_color_{selected_region}"]]

        # Get unique values for the day and date list
        days_list = selected_region_predictions['day_date'].unique()

        # Add a note that this is forecasted data
        st.markdown(f":green[*{TRANSLATIONS[st.session_state.selected_language]['forecasted_visitor_data']}*].")

        # Create a layout for the radio button and chart
        col1, _ = st.columns([1, 3])

        with col1:
            # Get radio button for selecting the day
            day_selected = st.radio(
                label=TRANSLATIONS[st.session_state.selected_language]['select_day'], options=days_list, index=0
            )

        # Extract the selected day for filtering (using date)
        day_df = selected_region_predictions[selected_region_predictions['day_date'] == day_selected]

        # Plot an interactive bar chart for relative traffic
        fig1 = px.bar(
            day_df,
            x='Time',  
            y=f'weekly_relative_traffic_{selected_region}',
            color=f'traffic_color_{selected_region}',  # Use the traffic color column
            labels={f'weekly_relative_traffic_{selected_region}': '', 'Time': 'Hour of Day'},
            title=f"{TRANSLATIONS[st.session_state.selected_language]['visitor_foot_traffic_for_day']} - {day_selected}",
            color_discrete_map={'red': 'red', 'blue': 'blue', 'green': 'green'}
        )

        # Customize hover text for relative traffic
        fig1.update_traces(
            hovertemplate=(
                'Hour: %{x|%H:%M}<br>'  # Display the hour in HH:MM format
            )
        )

        # Update layout for relative traffic chart
        fig1.update_yaxes(range=[0, 1], showticklabels=False)  # Set y-axis to range from 0 to 1 and hide tick labels
        fig1.update_xaxes(showticklabels=True)  # Keep the x-axis tick labels visible

        fig1.update_layout(
            xaxis_title=None,  # Hide the x-axis title
            yaxis_title=None,  # Hide the y-axis title
            template='plotly_dark',
            legend_title_text=TRANSLATIONS[st.session_state.selected_language]['visitor_foot_traffic'],
            legend=dict(
                itemsizing='constant',
                traceorder="normal",
                font=dict(size=12),
                orientation="h",
                yanchor="top",
                y=-0.3,  # Position the legend below the chart
                xanchor="center",
                x=0.5  # Center the legend horizontally
            ),
            xaxis=dict(
                tickformat='%H:%M'
            )
        )

        # Update the legend names
        fig1.for_each_trace(
            lambda t: t.update(name={
                'red': TRANSLATIONS[st.session_state.selected_language]['peak_traffic'], 'green': TRANSLATIONS[st.session_state.selected_language]['low_traffic'], 'blue': TRANSLATIONS[st.session_state.selected_language]['moderate_traffic']}[t.name])
        )

        # Display the interactive bar chart for relative traffic below the radio button
        st.plotly_chart(fig1)

find_peaks(data)

Find peaks in the data.

Parameters:

Name Type Description Default
data Series

The data to find peaks in.

required

Returns:

Name Type Description
list

A list of indices where peaks occur.

Source code in src/streamlit_app/pages_in_dashboard/visitors/weather.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def find_peaks(data):
    """
    Find peaks in the data.

    Args:
        data (pd.Series): The data to find peaks in.

    Returns:
        list: A list of indices where peaks occur.
    """
    peaks = []
    for i in range(1, len(data) - 1):
        if data[i] > data[i-1] and data[i] > data[i+1]:
            peaks.append(i)
    return peaks

get_graph(forecast_data)

Display a line graph of the temperature forecast in the same plot, with clear day labels on the x-axis and properly formatted hover info.

Parameters:

Name Type Description Default
forecast_data DataFrame

The forecast data to plot.

required

Returns:

Type Description

plotly.graph_objects.Figure: The plotly figure object.

Source code in src/streamlit_app/pages_in_dashboard/visitors/weather.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def get_graph(forecast_data):
    """
    Display a line graph of the temperature forecast in the same plot,
    with clear day labels on the x-axis and properly formatted hover info.

    Args:
        forecast_data (pd.DataFrame): The forecast data to plot.

    Returns:
        plotly.graph_objects.Figure: The plotly figure object.
    """

    forecast_data.set_index('time', inplace=True)

    fig = go.Figure()

    # Add the temperature line with improved styling
    fig.add_trace(go.Scatter(
        x=forecast_data.index, 
        y=forecast_data['temp'], 
        mode='lines', 
        name=TRANSLATIONS[st.session_state.selected_language]['temperature'],
        line=dict(color='orange', width=2),  # Smoother line with better color
        hovertemplate=f'{TRANSLATIONS[st.session_state.selected_language]["date"]}: ' +  '%{x|%d-%m-%Y, %H:%M}<br>' + f'{TRANSLATIONS[st.session_state.selected_language]["temperature"]}: ' + ' %{y}°C<extra></extra>'
    ))

    # Find peak indices for temperature
    peak_indices = find_peaks(forecast_data['temp'])

    # Create a scatter trace for peaks
    peak_points_trace = go.Scatter(
        x=forecast_data.index[peak_indices],
        y=forecast_data['temp'][peak_indices],
        mode='markers',
        marker=dict(
            color=['red' if temp > 26 else 'green' for temp in forecast_data['temp'][peak_indices]], 
            size=10,
            symbol='circle-open'
        ),
        name=TRANSLATIONS[st.session_state.selected_language]['peaks'],
        text=forecast_data['temp'][peak_indices].astype(str) + "°C",
        textposition='top center',
        hoverinfo='none'  # Disable hover for peaks to avoid overlapping
    )

    # Add peak points trace to the figure
    fig.add_trace(peak_points_trace)

    fig.update_layout(
    title=TRANSLATIONS[st.session_state.selected_language]['7_day_hourly_weather'],
    xaxis_title=TRANSLATIONS[st.session_state.selected_language]['date'],
    yaxis_title=TRANSLATIONS[st.session_state.selected_language]['temperature'],
    xaxis=dict(
        tickformat='%d-%m',  # Format x-axis as 'Day, Month Date'
        dtick=24 * 60 * 60 * 1000,  # Tick every day
        tickangle=-45,  # Rotate the labels to make them more readable
        color='white',  # Ensure labels are visible on the dark background
        showgrid=False,  # Hide vertical grid lines for better clarity
    ),
    yaxis=dict(
        color='white'  # Ensure y-axis labels are visible
    ),
    legend=dict(
        orientation="h",  # Horizontal legend
        yanchor="top",
        y=-0.4,  # Move the legend further down below the plot
        xanchor="center",
        x=0.5
    ),
    margin=dict(
        l=50, r=50, t=50, b=100  # Increase bottom margin to make space for the x-axis labels
    ),
    template='plotly_dark',
    hovermode="x unified"  # Unified hover to show temperature together
    )


    return fig

get_weather_section()

Display the weather section of the dashboard.

Parameters:

Name Type Description Default
processed_weather_data DataFrame

Processed weather data.

required

Returns:

Type Description

None

Source code in src/streamlit_app/pages_in_dashboard/visitors/weather.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
@st.fragment(run_every="1h")
def get_weather_section():
    """
    Display the weather section of the dashboard.

    Args:
        processed_weather_data (pd.DataFrame): Processed weather data.

    Returns:
        None
    """

    print("Fetching the latest weather forecast data for the current hour...")

    def get_current_hour():
        """
        Get the current hour in the format "HH:00:00".

        Returns:
            str: The current hour interval in the format "HH:MM:00".
        """
        current_time = datetime.now(pytz.timezone('Europe/Berlin'))

        # Get the current hour: Replace the minute value with the truncated value and set seconds and microseconds to 0
        current_hour = current_time.replace(minute=0, second=0, microsecond=0)

        return current_hour

    timestamp_latest_weather_data_fetch = get_current_hour()

    processed_weather_data = source_and_preprocess_forecasted_weather_data(timestamp_latest_weather_data_fetch)


    st.markdown(f"### {TRANSLATIONS[st.session_state.selected_language]['weather_forecast']}")
    st.markdown(f"{TRANSLATIONS[st.session_state.selected_language]['weather_data_last_updated']} {timestamp_latest_weather_data_fetch}")


    fig  = get_graph(processed_weather_data)

    st.plotly_chart(fig)

get_historical_data_for_location(location_id, location_slug, data_type, api_endpoint_suffix, column_name, save_file_path='outputs')

Fetch historical data from the BayernCloud API and save it as a CSV file.

Parameters:

Name Type Description Default
location_id str

The ID of the location for which the data is to be fetched.

required
location_slug str

A slug (a URL-friendly string) representing the location.

required
data_type str

The type of data being fetched (e.g., 'occupancy', 'occupancy_rate', 'capacity').

required
api_endpoint_suffix str

The specific suffix of the API endpoint for the data type (e.g., 'dcls_occupancy', 'dcls_occupancy_rate').

required
column_name str

The name of the column to store the fetched data in the DataFrame.

required
save_file_path str

The base directory where the CSV file will be saved (default is 'outputs').

'outputs'

Returns:

Name Type Description
historical_df DataFrame

A Pandas DataFrame containing the historical data for a location.

Source code in src/prediction_pipeline/sourcing_data/source_historic_parking_data.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def get_historical_data_for_location(
    location_id: str,
    location_slug: str,
    data_type: str,
    api_endpoint_suffix: str,
    column_name: str,
    save_file_path: str = 'outputs'
):
    """
    Fetch historical data from the BayernCloud API and save it as a CSV file.

    Args:
        location_id (str): The ID of the location for which the data is to be fetched.
        location_slug (str): A slug (a URL-friendly string) representing the location.
        data_type (str): The type of data being fetched (e.g., 'occupancy', 'occupancy_rate', 'capacity').
        api_endpoint_suffix (str): The specific suffix of the API endpoint for the data type (e.g., 'dcls_occupancy', 'dcls_occupancy_rate').
        column_name (str): The name of the column to store the fetched data in the DataFrame.
        save_file_path (str, optional): The base directory where the CSV file will be saved (default is 'outputs').

    Returns:
        historical_df (pd.DataFrame): A Pandas DataFrame containing the historical data for a location.
    """
    # Construct the API endpoint URL
    API_endpoint = f'https://data.bayerncloud.digital/api/v4/things/{location_id}/{api_endpoint_suffix}/'

    # Set request parameters
    request_params = {
        'token': BAYERN_CLOUD_API_KEY
    }

    # Send the GET request to the API
    response = requests.get(API_endpoint, params=request_params)
    response_json = response.json()

    # Convert the response to a Pandas DataFrame and preprocess it
    historical_df = pd.DataFrame(response_json['data'], columns=['time', column_name])
    historical_df["time"] = pd.to_datetime(historical_df["time"])
    # historical_df.set_index("time", inplace=True)

    return historical_df

process_all_locations(parking_sensors)

Process and fetch all types of historical data for each location in the parking sensors dictionary.

Parameters:

Name Type Description Default
parking_sensors dict

Dictionary containing location slugs as keys and location IDs as values.

required
Source code in src/prediction_pipeline/sourcing_data/source_historic_parking_data.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def process_all_locations(parking_sensors):
    """
    Process and fetch all types of historical data for each location in the parking sensors dictionary.

    Args:
        parking_sensors (dict): Dictionary containing location slugs as keys and location IDs as values.
    """

    data_types = [
        ('occupancy', 'dcls_occupancy', 'occupancy'),
        ('occupancy_rate', 'dcls_occupancy_rate', 'occupancy_rate'),
        ('capacity', 'dcls_capacity', 'capacity')
    ]

    for key, value in parking_sensors.items():
        historical_data = []
        for data_type, api_suffix, column_name in data_types:
            print(f"Loading historical {data_type} data for location: {key} with location_id: {value}")

            parking_df  = get_historical_data_for_location(
                location_id=value,
                location_slug=key,
                data_type=data_type,
                api_endpoint_suffix=api_suffix,
                column_name=column_name
            )
            historical_data.append(parking_df)
        merged_df = reduce(lambda x, y: pd.merge(x, y, on='time'), historical_data)

        # Create a filename based on the location name
        filename = f"{key}_historical_parking_data.csv"

        # make the output directory if it doesn't exist
        os.makedirs(OUTPUT_DIR, exist_ok=True)
        output_path = os.path.join(OUTPUT_DIR, filename)

        merged_df.to_csv(output_path, index=False)

        print(f"Saved historical parking data for location: {key} to {output_path}")

source_historic_visitor_count()

Source historic visitor count data from AWS S3.

Source code in src/prediction_pipeline/sourcing_data/source_historic_visitor_count.py
106
107
108
109
110
111
112
113
114
115
116
def source_historic_visitor_count():
    """Source historic visitor count data from AWS S3."""

    # Load visitor count data from AWS S3
    visitor_counts = wr.s3.read_csv(
        path=f"s3://{aws_s3_bucket}/{raw_data_folder}/{visitor_counts_folder}/*.csv",
        skiprows=2,
        usecols=common_columns
    )

    return visitor_counts

source_data_from_aws_s3(path, **kwargs)

Loads individual or multiple CSV files from an AWS S3 bucket. Args: path (str): The path to the CSV files on AWS S3. **kwargs: Additional arguments to pass to the read_csv function. Returns: pd.DataFrame: The DataFrame containing the data from the CSV files.

Source code in src/prediction_pipeline/sourcing_data/source_visitor_center_data.py
 7
 8
 9
10
11
12
13
14
15
16
def source_data_from_aws_s3(path: str, **kwargs) -> pd.DataFrame:
    """Loads individual or multiple CSV files from an AWS S3 bucket.
    Args:
        path (str): The path to the CSV files on AWS S3.
        **kwargs: Additional arguments to pass to the read_csv function.
    Returns:
        pd.DataFrame: The DataFrame containing the data from the CSV files.
    """
    df = wr.s3.read_excel(path=path, **kwargs)
    return df

source_preprocessed_hourly_visitor_center_data()

Load the preprocessed hourly visitor center data from AWS S3.

Source code in src/prediction_pipeline/sourcing_data/source_visitor_center_data.py
23
24
25
26
27
28
29
30
31
32
33
34
def source_preprocessed_hourly_visitor_center_data():

    """
    Load the preprocessed hourly visitor center data from AWS S3.
    """

    # Load visitor count data from AWS S3
    preprocessed_hourly_visitor_center_data = wr.s3.read_parquet(
        path=f"s3://{aws_s3_bucket}/preprocessed_data/visitor_centers_hourly.parquet"
    )

    return preprocessed_hourly_visitor_center_data

get_hourly_data(region, start_time, end_time)

Fetch hourly weather data for a specified region and date range.

This function retrieves hourly weather data from the Meteostat API or another defined source, returning it as a pandas DataFrame.

Parameters:

Name Type Description Default
region Point

A Point object representing the geographical location (latitude, longitude).

required
start_date datetime

The start date for data retrieval.

required
end_date datetime

The end date for data retrieval.

required

Returns:

Type Description

pandas.DataFrame: A DataFrame containing hourly weather data with the following columns: - time: Datetime of the record. - temp: Temperature in degrees Celsius. - dwpt: Dew point in degrees Celsius. - prcp: Precipitation in millimeters. - wdir: Wind direction in degrees. - wspd: Wind speed in km/h. - wpgt: Wind gust in km/h. - pres: Sea-level air pressure in hPa. - tsun: Sunshine duration in minutes. - snow: Snowfall in millimeters. - rhum: Relative humidity in percent. - coco: Weather condition code.

Source code in src/prediction_pipeline/sourcing_data/source_weather.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def get_hourly_data(region, start_time, end_time):
    """
    Fetch hourly weather data for a specified region and date range.

    This function retrieves hourly weather data from the Meteostat API or another defined source,
    returning it as a pandas DataFrame.

    Args:
        region (Point): A `Point` object representing the geographical location (latitude, longitude).
        start_date (datetime): The start date for data retrieval.
        end_date (datetime): The end date for data retrieval.

    Returns:
        pandas.DataFrame: A DataFrame containing hourly weather data with the following columns:
            - time: Datetime of the record.
            - temp: Temperature in degrees Celsius.
            - dwpt: Dew point in degrees Celsius.
            - prcp: Precipitation in millimeters.
            - wdir: Wind direction in degrees.
            - wspd: Wind speed in km/h.
            - wpgt: Wind gust in km/h.
            - pres: Sea-level air pressure in hPa.
            - tsun: Sunshine duration in minutes.
            - snow: Snowfall in millimeters.
            - rhum: Relative humidity in percent.
            - coco: Weather condition code.
    """
    # Fetch hourly data
    data = Hourly(region, start_time, end_time).fetch()

    # Reset the index
    data.reset_index(inplace=True)
    return data

process_hourly_data(data)

Process raw hourly weather data by cleaning and formatting.

This function drops unnecessary columns, renames the remaining columns to more descriptive names, and converts the 'time' column to a datetime format.

Parameters:

Name Type Description Default
data DataFrame

A DataFrame containing raw hourly weather data.

required

Returns:

Type Description

pandas.DataFrame: A DataFrame containing the processed hourly weather data with the following columns: - Time: Datetime of the record. - Temperature (°C): Temperature in degrees Celsius. - Wind Speed (km/h): Wind speed in km/h. - Relative Humidity (%): Relative humidity in percent. - coco_2: Weather condition code.

Source code in src/prediction_pipeline/sourcing_data/source_weather.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def process_hourly_data(data):
    """
    Process raw hourly weather data by cleaning and formatting.

    This function drops unnecessary columns, renames the remaining columns to more descriptive names,
    and converts the 'time' column to a datetime format.

    Args:
        data (pandas.DataFrame): A DataFrame containing raw hourly weather data.

    Returns:
        pandas.DataFrame: A DataFrame containing the processed hourly weather data with the following columns:
            - Time: Datetime of the record.
            - Temperature (°C): Temperature in degrees Celsius.
            - Wind Speed (km/h): Wind speed in km/h.
            - Relative Humidity (%): Relative humidity in percent.
            - coco_2: Weather condition code.
    """
        # Drop unnecessary columns
    data = data.drop(columns=['dwpt', 'wdir', 'wpgt', 'pres','snow', 'tsun', 'prcp'])

    # Rename columns for clarity
    data = data.rename(columns={
        'time': 'Time',
        'temp': 'Temperature (°C)',
        'wspd': 'Wind Speed (km/h)',
        'rhum': 'Relative Humidity (%)',
        'coco': 'coco_2'
    })


    # Convert the 'Time' column to datetime format
    data['Time'] = pd.to_datetime(data['Time'])
    # Map weather condition codes to new codes
    data['coco_2'] = data['coco_2'].map(coco_to_coco_2_mapping)

    return data

source_weather_data(start_time, end_time)

This function creates a point over the Bavarian Forest National Park, retrieves hourly weather data for the specified time period, processes the data to extract necessary weather parameters, and saves the processed data to a CSV file.

Source code in src/prediction_pipeline/sourcing_data/source_weather.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@st.cache_data(max_entries=1)
def source_weather_data(start_time, end_time):
    """
    This function creates a point over the Bavarian Forest National Park, retrieves hourly weather data
    for the specified time period, processes the data to extract necessary weather parameters,
    and saves the processed data to a CSV file.
    """
    print(f"Sourcing weather data for {start_time} to {end_time} at {datetime.now()}...")

    # Create a Point object for the Bavarian Forest National Park entry
    bavarian_forest = Point(lat=LATITUDE, lon=LONGITUDE)

    # Fetch hourly data for the location
    hourly_data = get_hourly_data(bavarian_forest, start_time, end_time)

    # Process the hourly data to extract and format necessary weather parameters
    sourced_hourly_data = process_hourly_data(hourly_data)


    return sourced_hourly_data

add_daily_max_values(df, columns)

Add columns to the DataFrame that show the maximum daily value for each weather characteristic.

Parameters:

Name Type Description Default
df DataFrame

DataFrame with 'Time' and multiple weather-related columns. - 'Time': Datetime column with timestamps. - columns (list of str): List of column names to compute the daily maximum values for.

required

Returns:

Type Description

pd.DataFrame: DataFrame with new columns that contain the maximum values for each day, repeated for every hour.

Source code in src/prediction_pipeline/pre_processing/features_zscoreweather_distanceholidays.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def add_daily_max_values(df, columns):
    """
    Add columns to the DataFrame that show the maximum daily value for each weather characteristic.

    Args:
        df (pd.DataFrame): DataFrame with 'Time' and multiple weather-related columns.
            - 'Time': Datetime column with timestamps.
            - columns (list of str): List of column names to compute the daily maximum values for.

    Returns:
        pd.DataFrame: DataFrame with new columns that contain the maximum values for each day,
                      repeated for every hour.
    """
    # Ensure the Time column is in datetime format
    df['Time'] = pd.to_datetime(df['Time'])

    # Extract the date from the Time column
    df['Date'] = df['Time'].dt.date

    # Create a DataFrame to store daily max values
    daily_max_df = df.groupby('Date')[columns].max().reset_index()

    # Rename columns to indicate they are daily maximum values
    daily_max_df = daily_max_df.rename(columns={col: f'Daily_Max_{col}' for col in columns})

    # Merge the daily max values back into the original DataFrame
    df = df.merge(daily_max_df, on='Date', how='left')

    return df

add_moving_z_scores(df, columns, window_size)

Add moving z-score columns for weather characteristics based on their daily maximum values.

Parameters:

Name Type Description Default
df DataFrame

DataFrame with 'Time' and daily maximum columns. - 'Time': Datetime column with timestamps.

required
columns list of str

List of column names to compute the moving z-scores for.

required
window_size int

Size of the moving window in days.

required

Returns:

Type Description

pd.DataFrame: DataFrame with new columns that contain the moving z-scores for each column.

Source code in src/prediction_pipeline/pre_processing/features_zscoreweather_distanceholidays.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def add_moving_z_scores(df, columns, window_size):
    """
    Add moving z-score columns for weather characteristics based on their daily maximum values.

    Args:
        df (pd.DataFrame): DataFrame with 'Time' and daily maximum columns.
            - 'Time': Datetime column with timestamps.
        columns (list of str): List of column names to compute the moving z-scores for.
        window_size (int): Size of the moving window in days.

    Returns:
        pd.DataFrame: DataFrame with new columns that contain the moving z-scores for each column.
    """
    # Ensure the Time column is in datetime format
    df['Time'] = pd.to_datetime(df['Time'])

    # Extract unique dates with daily max values
    daily_df = df[['Date'] + [f'Daily_Max_{col}' for col in columns]].drop_duplicates()

    # Calculate rolling mean and standard deviation for daily max values
    for col in columns:
        daily_max_col = f'Daily_Max_{col}'

        # Calculate rolling mean and standard deviation over the specified window size
        daily_df[f'Rolling_Mean_{daily_max_col}'] = daily_df[daily_max_col].rolling(window=window_size, min_periods=window_size).mean()
        daily_df[f'Rolling_Std_{daily_max_col}'] = daily_df[daily_max_col].rolling(window=window_size, min_periods=window_size).std()

        # Calculate the z-score
        daily_df[f'ZScore_{daily_max_col}'] = (
            (daily_df[daily_max_col] - daily_df[f'Rolling_Mean_{daily_max_col}']) /
            daily_df[f'Rolling_Std_{daily_max_col}']
        )

        # Drop the rolling mean and std columns as they are intermediate calculations
        daily_df.drop(columns=[f'Rolling_Mean_{daily_max_col}', f'Rolling_Std_{daily_max_col}'], inplace=True)

    # Merge the z-scores back into the original hourly DataFrame
    df = df.merge(daily_df[['Date'] + [f'ZScore_Daily_Max_{col}' for col in columns]], on='Date', how='left')

    # List of columns to drop (daily max columns)
    daily_max_columns = [f'Daily_Max_{col}' for col in columns]

    # Drop the daily max columns from the main DataFrame
    df.drop(columns=daily_max_columns, inplace=True)

    return df

add_nearest_holiday_distance(df)

Add columns to the DataFrame calculating the distance to the nearest holiday for both 'Feiertag_Bayern' and 'Feiertag_CZ'.

Parameters:

Name Type Description Default
df DataFrame

DataFrame with 'Time', 'Feiertag_Bayern', and 'Feiertag_CZ' columns. - 'Time': Datetime column with timestamps. - 'Feiertag_Bayern': Boolean column indicating if the date is a holiday in Bayern. - 'Feiertag_CZ': Boolean column indicating if the date is a holiday in CZ.

required

Returns:

Type Description

pd.DataFrame: DataFrame with two new columns: - 'Distance_to_Nearest_Holiday_Bayern': Distance in days to the nearest holiday in Bayern for each day/row. - 'Distance_to_Nearest_Holiday_CZ': Distance in days to the nearest holiday in CZ for each day/row.

Source code in src/prediction_pipeline/pre_processing/features_zscoreweather_distanceholidays.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def add_nearest_holiday_distance(df):
    """
    Add columns to the DataFrame calculating the distance to the nearest holiday for both 'Feiertag_Bayern' and 'Feiertag_CZ'.

    Args:
        df (pd.DataFrame): DataFrame with 'Time', 'Feiertag_Bayern', and 'Feiertag_CZ' columns.
            - 'Time': Datetime column with timestamps.
            - 'Feiertag_Bayern': Boolean column indicating if the date is a holiday in Bayern.
            - 'Feiertag_CZ': Boolean column indicating if the date is a holiday in CZ.

    Returns:
        pd.DataFrame: DataFrame with two new columns:
            - 'Distance_to_Nearest_Holiday_Bayern': Distance in days to the nearest holiday in Bayern for each day/row.
            - 'Distance_to_Nearest_Holiday_CZ': Distance in days to the nearest holiday in CZ for each day/row.
    """

    # Ensure the Time column is in datetime format
    df['Time'] = pd.to_datetime(df['Time'])

    # Extract date from Time column
    df['Date'] = df['Time'].dt.date

    # Extract unique dates for holidays
    bayern_holidays = df[df['Feiertag_Bayern']]['Date'].unique()
    cz_holidays = df[df['Feiertag_CZ']]['Date'].unique()

    # Create a DataFrame with unique dates
    dates_df = pd.DataFrame({'Date': df['Date'].unique()})


    def get_nearest_holiday_distance(date, holidays):
        """
        Calculate the distance in days to the nearest holiday.

        Args:
            date (pd.Timestamp): The date for which to calculate the distance.
            holidays (np.ndarray): Array of holiday dates.

        Returns:
            float: Distance in days to the nearest holiday, or NaN if no holidays are provided.
        """
        if len(holidays) == 0:
            return np.nan
        nearest_holiday = min(abs((date - pd.to_datetime(holidays)).days))
        return nearest_holiday

    # Apply the function to calculate distances for both sets of holidays
    dates_df['Distance_to_Nearest_Holiday_Bayern'] = dates_df['Date'].apply(
        lambda x: get_nearest_holiday_distance(pd.to_datetime(x), bayern_holidays)
    )
    dates_df['Distance_to_Nearest_Holiday_CZ'] = dates_df['Date'].apply(
        lambda x: get_nearest_holiday_distance(pd.to_datetime(x), cz_holidays)
    )
    # Merge the distances back with the original DataFrame
    df = df.merge(dates_df, on='Date', how='left')


    return df

load_csv_files_from_aws_s3(path, **kwargs)

Loads individual or multiple CSV files from an AWS S3 bucket. Args: path (str): The path to the CSV files on AWS S3. **kwargs: Additional arguments to pass to the read_csv function. Returns: pd.DataFrame: The DataFrame containing the data from the CSV files.

Source code in src/prediction_pipeline/pre_processing/features_zscoreweather_distanceholidays.py
19
20
21
22
23
24
25
26
27
28
def load_csv_files_from_aws_s3(path: str, **kwargs) -> pd.DataFrame:
    """Loads individual or multiple CSV files from an AWS S3 bucket.
    Args:
        path (str): The path to the CSV files on AWS S3.
        **kwargs: Additional arguments to pass to the read_csv function.
    Returns:
        pd.DataFrame: The DataFrame containing the data from the CSV files.
    """
    df = wr.s3.read_csv(path=path, low_memory=False, **kwargs) #the low_memory parameter is set to False to avoid an error with dtypes in some of the files
    return df

slice_at_first_non_null(df)

Slices the DataFrame starting at the first non-null value in the 'Feiertag_Bayern' column.

We don't have data for holidays in 2016, so the function finds the index of the first non-null value in the 'Feiertag_Bayern' column and returns the DataFrame sliced from that index onward.

Parameters:

Name Type Description Default
df DataFrame

DataFrame containing the 'Feiertag_Bayern' column.

required

Returns:

Type Description

pandas.DataFrame: The sliced DataFrame starting from the first non-null value in 'Feiertag_Bayern'.

Source code in src/prediction_pipeline/pre_processing/features_zscoreweather_distanceholidays.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def slice_at_first_non_null(df):
    """
    Slices the DataFrame starting at the first non-null value in the 'Feiertag_Bayern' column.

    We don't have data for holidays in 2016, so the function finds the index of the first non-null 
    value in the 'Feiertag_Bayern' column and returns the DataFrame sliced from that index onward.

    Args:
        df (pandas.DataFrame): DataFrame containing the 'Feiertag_Bayern' column.

    Returns:
        pandas.DataFrame: The sliced DataFrame starting from the first non-null value in 'Feiertag_Bayern'.
    """
    # Find the index of the first non-null value in the 'Feiertag_Bayern' column
    first_non_null_index = df['Feiertag_Bayern'].first_valid_index()

    # Slice the DataFrame from the first non-null index onward and create a copy to avoid warnings
    df = df.loc[first_non_null_index:].copy()

    return df

write_csv_file_to_aws_s3(df, path, **kwargs)

Writes an individual CSV file to AWS S3.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to write.

required
path str

The path to the CSV files on AWS S3.

required
**kwargs

Additional arguments to pass to the to_csv function.

{}
Source code in src/prediction_pipeline/pre_processing/features_zscoreweather_distanceholidays.py
187
188
189
190
191
192
193
194
195
196
197
def write_csv_file_to_aws_s3(df: pd.DataFrame, path: str, **kwargs) -> pd.DataFrame:
    """Writes an individual CSV file to AWS S3.

    Args:
        df (pd.DataFrame): The DataFrame to write.
        path (str): The path to the CSV files on AWS S3.
        **kwargs: Additional arguments to pass to the to_csv function.
    """

    wr.s3.to_csv(df, path=path, **kwargs)
    return

check_data_quality(data, sensor)

Check data quality - if the occupancy is greater than the capacity of the parking space

Source code in src/prediction_pipeline/pre_processing/impute_missing_parking_data.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def check_data_quality(data,sensor):
    """
    Check data quality - if the occupancy is greater than the capacity of the parking space

    """
    # Check if the capacity is always smaller than the occupancy
    if data['occupancy'].max() > data['capacity'].max():
        # remove the rows where the occupancy is greater than the capacity
        # print the row numbers where the occupancy is greater than the capacity
        print(f"Data quality issue for {sensor}")
        print('---------------------------------')
        print("The occupancy is greater than the capacity")
        print(f"Rows where the occupancy is greater than the capacity:")
        print(data[data['occupancy']>data['capacity']])
        # impute_occupancy_values(data)
    else:
        print(f"No data quality issue for {sensor}")


    return data

check_data_quality_occupancy_rate(data, sensor)

Check data quality - if the occupancy rate is greater than 100

Source code in src/prediction_pipeline/pre_processing/impute_missing_parking_data.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def check_data_quality_occupancy_rate(data,sensor):

    """
    Check data quality - if the occupancy rate is greater than 100

    """
    # Check if the occupancy rate is greater than 100
    if data['occupancy_rate'].max() > 100.00:
        print(f"Data quality issue for {sensor}")
        print('---------------------------------')
        print("The occupancy rate is greater than 100")
        print(f"Rows where the occupancy rate is greater than 100:")
        print(data[data['occupancy_rate']>100])
        save_higher_occupancy_rate(data[data['occupancy_rate']>100],sensor)

    else:
        print(f"No data quality issue for {sensor}")

check_missing_data_per_sensor(data, sensor)

Check missing data per sensor

Source code in src/prediction_pipeline/pre_processing/impute_missing_parking_data.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def check_missing_data_per_sensor(data,sensor):
    """
    Check missing data per sensor

    """
    missing_data = data.isnull().sum()
    print(f"Missing data for {sensor}:")
    print('---------------------------------')
    print(missing_data)

    if missing_data.sum() == 0:
        print(f"No missing data for {sensor}")
    else:
        data = fill_missing_values(data)

    return data

fill_missing_values(data)

Fill missing values in the data

Source code in src/prediction_pipeline/pre_processing/impute_missing_parking_data.py
24
25
26
27
28
29
30
31
def fill_missing_values(data):
    """
    Fill missing values in the data

    """
    # Fill all the missing values with linear interpolation
    data.interpolate(method='linear', inplace=True)
    return data

impute_occupancy_values(data)

Impute occupancy values where the occupancy is greater than the capacity

Source code in src/prediction_pipeline/pre_processing/impute_missing_parking_data.py
51
52
53
54
55
56
57
58
59
60
61
def impute_occupancy_values(data):
    """
    Impute occupancy values where the occupancy is greater than the capacity

    """
    # Impute the occupancy values where the occupancy is greater than the capacity
    data['occupancy'] = data['occupancy'].apply(lambda x: x if x < data['capacity'].max() else data['capacity'].max())
    print('---------------------------------')
    print("Imputed occupancy values:")
    print(data[data['occupancy']>data['capacity']])
    return data

main()

Main function to run the script

Source code in src/prediction_pipeline/pre_processing/impute_missing_parking_data.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def main():
    """
    Main function to run the script

    """
    for sensor, path in zip(parking_sensors, paths_to_parking_data):
        data = pd.read_csv(path)
        data['time'] = pd.to_datetime(data['time'])

        # Missing data for each sensor
        print(f"Checking missing data for {sensor}....................")
        check_missing_data_per_sensor(data,sensor)

        # check if the capacity is always smaller than the occupancy
        check_data_quality_occupancy_rate(data,sensor)

save_higher_occupancy_rate(data, sensor)

Save the rows where the occupancy rate is greater than 100

Source code in src/prediction_pipeline/pre_processing/impute_missing_parking_data.py
85
86
87
88
89
90
91
92
93
94
95
96
def save_higher_occupancy_rate(data,sensor):
    """
    Save the rows where the occupancy rate is greater than 100

    """
    save_path = os.path.join('./outputs','parking_data_final','data_quality_issues',f'{sensor}_higher_occupancy_rate.csv')

    # make the output directory if it doesn't exist
    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    # Save the rows where the occupancy rate is greater than 100
    data.to_csv(save_path,index=False)
    print(f"Saved the rows where the occupancy rate is greater than 100 for {sensor}")

create_datetimeindex(df)

Prepare DataFrame by ensuring the index is a DateTimeIndex, resampling to hourly frequency, and handling missing values.

Parameters: - df: DataFrame containing the data. - "Time": Name of the timestamp column to convert and set as the index.

Returns: - df: DataFrame resampled to hourly frequency with missing values handled.

Source code in src/prediction_pipeline/pre_processing/join_sensor_weather_visitorcenter.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def create_datetimeindex(df):
    """
    Prepare DataFrame by ensuring the index is a DateTimeIndex, resampling to hourly frequency,
    and handling missing values.

    Parameters:
    - df: DataFrame containing the data.
    - "Time": Name of the timestamp column to convert and set as the index.

    Returns:
    - df: DataFrame resampled to hourly frequency with missing values handled.
    """
    # Ensure the timestamp column is converted to datetime if it's not already the index

    df["Time"] = pd.to_datetime(df["Time"])
    df.set_index("Time", inplace=True)

    # Ensure the index is a DateTimeIndex
    if not isinstance(df.index, pd.DatetimeIndex):
        raise TypeError("Index must be a DateTimeIndex.")

    return df

get_joined_dataframe(weather_data, visitor_count_data, visitorcenter_data)

Main function to run the data joining pipeline.

This function loads the visitor count, visitor center and weather data, preprocesses them and joins them into one dataframe.

Returns:

Type Description
DataFrame

pd.DataFrame: The joined data.

Source code in src/prediction_pipeline/pre_processing/join_sensor_weather_visitorcenter.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def get_joined_dataframe(weather_data, visitor_count_data, visitorcenter_data) -> pd.DataFrame:
    """
    Main function to run the data joining pipeline.

    This function loads the visitor count, visitor center and weather data, preprocesses them and joins them into one dataframe.

    Returns:
        pd.DataFrame: The joined data.
    """
    df_list = [weather_data, visitor_count_data, visitorcenter_data]
    for df in df_list:
        create_datetimeindex(df)

    joined_data = join_dataframes(df_list)

    return joined_data

join_dataframes(df_list)

Joins a list of DataFrames using an outer join along the columns.

Parameters:

Name Type Description Default
df_list list of pd.DataFrame

A list of pandas DataFrames to join.

required

Returns:

Type Description
DataFrame

pd.DataFrame: A single DataFrame resulting from concatenating all input DataFrames along columns.

Source code in src/prediction_pipeline/pre_processing/join_sensor_weather_visitorcenter.py
32
33
34
35
36
37
38
39
40
41
42
def join_dataframes(df_list) -> pd.DataFrame:
    """
    Joins a list of DataFrames using an outer join along the columns.

    Args:
        df_list (list of pd.DataFrame): A list of pandas DataFrames to join.

    Returns:
        pd.DataFrame: A single DataFrame resulting from concatenating all input DataFrames along columns.
    """
    return reduce(lambda left, right: pd.concat([left, right], axis=1, join='outer'), df_list)

Clean historic sensor data from 2016 to 2024. In the docstring of every function you can check what it does and the assumptions that were made.

Usage: - Change the global variables section if needed - Fill your AWS credentiales

Output: - Returns the preprocessed data

calculate_traffic_metrics_abs(df)

This function calculates several traffic metrics and adds them to the DataFrame: - traffic_abs: The sum of all INs and OUTs for every sensor - sum_IN_abs: The sum of all columns containing 'IN' in their names. - sum_OUT_abs: The sum of all columns containing 'OUT' in their names. - diff_abs: The difference between sum_IN_abs and sum_OUT_abs. - occupancy_abs: The cumulative sum of diff_abs, representing the occupancy over time.

Parameters:

Name Type Description Default
df DataFrame

DataFrame containing traffic data.

required

Returns:

Type Description

pandas.DataFrame: The DataFrame with additional columns for absolute traffic metrics.

Source code in src/prediction_pipeline/pre_processing/preprocess_historic_visitor_count_data.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
def calculate_traffic_metrics_abs(df):
    """
      This function calculates several traffic metrics and adds them to the DataFrame:
    - `traffic_abs`: The sum of all INs and OUTs for every sensor
    - `sum_IN_abs`: The sum of all columns containing 'IN' in their names.
    - `sum_OUT_abs`: The sum of all columns containing 'OUT' in their names.
    - `diff_abs`: The difference between `sum_IN_abs` and `sum_OUT_abs`.
    - `occupancy_abs`: The cumulative sum of `diff_abs`, representing the occupancy over time.

    Args:
        df (pandas.DataFrame): DataFrame containing traffic data.

    Returns:
        pandas.DataFrame: The DataFrame with additional columns for absolute traffic metrics.
    """
    # Calculate total traffic
    df["traffic_abs"] = df.filter(regex='IN|OUT').sum(axis=1)

    # Calculate sum of 'IN' columns
    df["sum_IN_abs"] = df.filter(like='IN').sum(axis=1)

    # Calculate sum of 'OUT' columns
    df["sum_OUT_abs"] = df.filter(like='OUT').sum(axis=1)

    return df

correct_and_impute_times(df)

Corrects repeated timestamps caused by a 2-hour interval that is indicative of a daylight saving.

The function operates under the following assumptions: 1. By default every interval should be of 1 hour 2. If any interval differ from this, particularly the repeated timestamp is corrected by subtracting one hour. 3. The data values for the corrected timestamp are then imputed from the next available row. 4. 2017 is an odd year where the null row is not the one with the 2 hours interval, but the one with 0. We fixed this manually for this specific rows.

Parameters:

Name Type Description Default
df DataFrame

A DataFrame containing a 'Time' column with datetime-like values and other associated data columns.

required

Returns:

Type Description

pandas.DataFrame: The corrected DataFrame with timestamps set as the index and sorted chronologically.

Raises:

Type Description
ValueError

If the 'Time' column is missing from the DataFrame.

KeyError

If an index out of range occurs due to imputation attempts beyond the DataFrame bounds.

Source code in src/prediction_pipeline/pre_processing/preprocess_historic_visitor_count_data.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def correct_and_impute_times(df):

    """
    Corrects repeated timestamps caused by a 2-hour interval that is indicative of a daylight saving.

    The function operates under the following assumptions:
    1. By default every interval should be of 1 hour
    2. If any interval differ from this, particularly the repeated timestamp is corrected by subtracting one hour.
    3. The data values for the corrected timestamp are then imputed from the next available row.
    4. 2017 is an odd year where the null row is not the one with the 2 hours interval, but the one with 0. We fixed this manually for this specific rows.

    Args:
        df (pandas.DataFrame): A DataFrame containing a 'Time' column with datetime-like values and other associated data columns.

    Returns:
        pandas.DataFrame: The corrected DataFrame with timestamps set as the index and sorted chronologically.

    Raises:
        ValueError: If the 'Time' column is missing from the DataFrame.
        KeyError: If an index out of range occurs due to imputation attempts beyond the DataFrame bounds.
    """
    # Swap values of specific rows to correct data misalignment
    df.iloc[[54603, 54602]] = df.iloc[[54602, 54603]].values

    # Sort DataFrame by 'Time'
    df.sort_values("Time", ascending=True, inplace=True)

    # Identify intervals where there is a 2 hours gap
    intervals = df.Time.diff().dropna()
    index_wrong_time = intervals[intervals == "0 days 02:00:00"].index

    # Impute values from the next row and adjust 'Time' column
    for idx in index_wrong_time:
        df.loc[idx, 'Time'] = df.loc[idx, 'Time'] - pd.Timedelta(hours=1)  # Adjust for daylight saving
        df.loc[idx, df.columns != 'Time'] = df.loc[idx + 1, df.columns != 'Time']  # Impute values from the next row

    # Set 'Time' as index and sort by index
    df = df.set_index('Time').sort_index()

    return df

correct_non_replaced_sensors(df)

Replaces data with NaN for non-replaced sensors in the DataFrame based on specified timestamps. A dictionary is provided where keys are timestamps as strings and values are lists of column names that should be set to NaN if the index is earlier than the timestamp.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to be corrected.

required

Returns:

Type Description

pd.DataFrame: The DataFrame with corrected sensor data.

Source code in src/prediction_pipeline/pre_processing/preprocess_historic_visitor_count_data.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def correct_non_replaced_sensors(df):
    """
    Replaces data with NaN for non-replaced sensors in the DataFrame based on specified timestamps. A dictionary is provided where keys are timestamps as strings and values are lists of column names that should be set to NaN if the index is earlier than the timestamp.

    Args:
        df (pd.DataFrame): The DataFrame to be corrected.

    Returns:
        pd.DataFrame: The DataFrame with corrected sensor data.
    """

    dict_non_replaced = {'2020-07-30 00:00:00' : ['Lusen 1 PYRO IN', 'Lusen 1 PYRO OUT'],
                     '2022-12-20 00:00:00' : ['Lusen 3 IN', 'Lusen 3 OUT'],
                     '2022-10-12 00:00:00' : ['Gsenget IN', 'Gsenget OUT']}


    # Iterate over the dictionary of non-replaced sensors
    for key, columns in dict_non_replaced.items():
        # Convert the timestamp key from string to datetime object
        timestamp = pd.to_datetime(key)

        # Set values to NaN for specified columns where the index is earlier than the given timestamp
        df.loc[df.index < timestamp, columns] = np.nan

    print("Out of place values were turn to NaN for Lusen 1 PYRO, Lusen 3 and Gsenget")    

    return df

correct_overlapping_sensor_data(df)

Corrects sensor overlapping data by setting specific values to NaN based on replacement dates. Also filters the DataFrame to include only rows with an index timestamp on or after "2016-05-10 03:00:00". This is 3am after the installing date for the first working sensor.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame containing sensor data to be corrected.

required

Returns:

Type Description

pd.DataFrame: The DataFrame with corrected sensor data.

Source code in src/prediction_pipeline/pre_processing/preprocess_historic_visitor_count_data.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def correct_overlapping_sensor_data(df):
    """
    Corrects sensor overlapping data by setting specific values to NaN based on replacement dates. Also filters the DataFrame to include only rows with an index timestamp on or after "2016-05-10 03:00:00". This is 3am after the installing date for the first working sensor.

    Args:
        df (pd.DataFrame): The DataFrame containing sensor data to be corrected.

    Returns:
        pd.DataFrame: The DataFrame with corrected sensor data.
    """
    # Define the replacement dates and columns for different sensor types
    replacement_dates = {
        'trinkwassertalsperre': '2021-06-18 00:00:00',
        'bucina': '2021-05-28 00:00:00',
        'falkenstein 1': '2022-12-22 12:00:00'
    }

    multi_columns_dict = {
        'trinkwassertalsperre': [
            'Trinkwassertalsperre_MULTI Fußgänger IN',
            'Trinkwassertalsperre_MULTI Fußgänger OUT',
            'Trinkwassertalsperre_MULTI Fahrräder IN',
            'Trinkwassertalsperre_MULTI Fahrräder OUT',
            'Trinkwassertalsperre_MULTI IN',
            'Trinkwassertalsperre_MULTI OUT'
        ],
        'bucina': [
            'Bucina_Multi OUT',
            'Bucina_Multi Fußgänger IN',
            'Bucina_Multi Fahrräder IN',
            'Bucina_Multi Fahrräder OUT',
            'Bucina_Multi Fußgänger OUT',
            'Bucina_Multi IN'
        ],
        'falkenstein 1': [
            'Falkenstein 1 OUT',
            'Falkenstein 1 IN'
        ]
    }

    pyro_columns_dict = {
        'trinkwassertalsperre': [
            'Trinkwassertalsperre PYRO IN',
            'Trinkwassertalsperre PYRO OUT'
        ],
        'bucina': [
            'Bucina PYRO IN',
            'Bucina PYRO OUT'
        ],
        'falkenstein 1': [
            'Falkenstein 1 PYRO IN',
            'Falkenstein 1 PYRO OUT'
        ]
    }

    # Process each sensor type based on the predefined dictionaries
    for sensor_type in replacement_dates:
        replacement_date = pd.to_datetime(replacement_dates[sensor_type])
        multi_columns = multi_columns_dict.get(sensor_type, [])
        pyro_columns = pyro_columns_dict.get(sensor_type, [])

        # Set to NaN the values in 'multi_columns' for dates on or before the replacement date
        if multi_columns:
            df.loc[df.index <= replacement_date, multi_columns] = np.nan

        # Set to NaN the values in 'pyro_columns' for dates after the replacement date
        if pyro_columns:
            df.loc[df.index > replacement_date, pyro_columns] = np.nan

    # Slice data before date because  there were no sensors installed
    df = df[df.index >= "2016-05-10 03:00:00"]


    print("Fixed overlapping values for replaced sensors")
    return df

fix_columns_names(df)

Processes the given DataFrame by renaming columns, dropping specified columns, and creating a new column for Bucina_Multi IN by summing the Bucina_Multi Fahrräder IN and Bucina_Multi Fußgänger IN columns. .

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to be modified.

required
rename dict

A dictionary where the keys are existing column names and the values are the new column names.

required
drop list

A list of column names that should be removed from the DataFrame.

required
create str

The name of the new column that will be created by summing the "Bucina_Multi Fahrräder IN" and "Bucina_Multi Fußgänger IN" columns.

required

Returns:

Type Description

pd.DataFrame: The modified DataFrame with the specified changes applied.

Source code in src/prediction_pipeline/pre_processing/preprocess_historic_visitor_count_data.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def fix_columns_names(df):
    """
    Processes the given DataFrame by renaming columns, dropping specified columns, and creating a new column for Bucina_Multi IN by summing the Bucina_Multi Fahrräder IN and Bucina_Multi Fußgänger IN columns. .

    Args:
        df (pd.DataFrame): The DataFrame to be modified.
        rename (dict): A dictionary where the keys are existing column names and the values are the new column names.
        drop (list): A list of column names that should be removed from the DataFrame.
        create (str): The name of the new column that will be created by summing the "Bucina_Multi Fahrräder IN" 
                      and "Bucina_Multi Fußgänger IN" columns.

    Returns:
        pd.DataFrame: The modified DataFrame with the specified changes applied.
    """

    #lists and dictionaries for columns that need to be dropped or renamed

    drop = ['Brechhäuslau Fußgänger IN', 'Brechhäuslau Fußgänger OUT', 'Waldhausreibe Channel 1 IN', 'Waldhausreibe Channel 2 OUT'] #Waldhausreibe Channel 1 (IN and OUT) had a total sum of values of 10 and 13. Brechhäuslau columns were duplicated.

    rename = {'Bucina IN': 'Bucina PYRO IN',
          'Bucina OUT': 'Bucina PYRO OUT',
          'Gsenget IN.1': 'Gsenget Fußgänger IN',
          'Gsenget OUT.1': 'Gsenget Fußgänger OUT',
          'Gfäll Fußgänger IN' : 'Gfäll IN',
          'Gfäll Fußgänger OUT': 'Gfäll OUT',
          'Fredenbrücke Fußgänger IN' : 'Fredenbrücke IN',
          'Fredenbrücke Fußgänger OUT': 'Fredenbrücke OUT',
          'Diensthüttenstraße Fußgänger IN': 'Diensthüttenstraße IN' ,
          'Diensthüttenstraße Fußgänger OUT': 'Diensthüttenstraße OUT',
          'Racheldiensthütte Cyclist OUT' : 'Racheldiensthütte Fahrräder OUT',
          'Racheldiensthütte Pedestrian IN' : 'Racheldiensthütte Fußgänger IN',
          'Racheldiensthütte Pedestrian OUT' : 'Racheldiensthütte Fußgänger OUT',
          'Sagwassersäge Fußgänger IN' : 'Sagwassersäge IN',
          'Sagwassersäge Fußgänger OUT': 'Sagwassersäge OUT',
          'Schwarzbachbrücke Fußgänger IN' : 'Schwarzbachbrücke IN',
          'Schwarzbachbrücke Fußgänger OUT' : 'Schwarzbachbrücke OUT',
          'NPZ_Falkenstein IN' : 'Falkenstein 1 PYRO IN',
          'NPZ_Falkenstein OUT' : 'Falkenstein 1 PYRO OUT',
          'TFG_Falkenstein_1 Fußgänger zum Parkplatz' : 'Falkenstein 1 OUT',
          'TFG_Falkenstein_1 Fußgänger zum HZW' : 'Falkenstein 1 IN',
          'TFG_Falkenstein_2 Fußgänger In Richtung Parkplatz' : 'Falkenstein 2 OUT',
          'TFG_Falkenstein_2 Fußgänger In Richtung TFG' : 'Falkenstein 2 IN',
          'TFG_Lusen IN' : 'Lusen 1 PYRO IN',
          'TFG_Lusen OUT' : 'Lusen 1 PYRO OUT',
          'TFG_Lusen_1 Fußgänger Richtung TFG': 'Lusen 1 EVO IN',
          'TFG_Lusen_1 Fußgänger Richtung Parkplatz' : 'Lusen 1 EVO OUT',
          'TFG_Lusen_2 Fußgänger Richtung Vögel am Waldrand': 'Lusen 2 IN',
          'TFG_Lusen_2 Fußgänger Richtung Parkplatz' : 'Lusen 2 OUT',
          'TFG_Lusen_3 TFG Lusen 3 IN': 'Lusen 3 IN',
          'TFG_Lusen_3 TFG Lusen 3 OUT': 'Lusen 3 OUT',
          'Waldspielgelände_1 IN': 'Waldspielgelände IN',
          'Waldspielgelände_1 OUT': 'Waldspielgelände OUT',
          'Wistlberg Fußgänger IN' : 'Wistlberg IN',
          'Wistlberg Fußgänger OUT' : 'Wistlberg OUT',
          'Trinkwassertalsperre IN' : 'Trinkwassertalsperre PYRO IN', 
          'Trinkwassertalsperre OUT' : 'Trinkwassertalsperre PYRO OUT'
          }


    # Rename columns according to the provided mapping

    df.rename(columns=rename, inplace=True)
    print(len(rename), ' columns were renamed')

    # Remove the specified columns from the DataFrame
    df.drop(columns=drop, inplace=True)
    print(len(drop), ' repeated columns were dropped')

    # Add Bucina_Multi IN column by summing Fahrraeder and Fussgaenger columns
    df['Bucina_Multi IN'] = df["Bucina_Multi Fahrräder IN"] + df["Bucina_Multi Fußgänger IN"]
    print('Bucina_Multi IN column was created')

    return df

handle_outliers(df)

Transform to NaN every value higher than 800. During exploration we found that values over that are outliers. There were only 6 rows with any count over 800

Parameters:

Name Type Description Default
df DataFrame

DataFrame with values to be turned to NaN.

required

Returns:

Type Description

pandas.DataFrame: The modified DataFrame with values over 800 turned to NaN

Source code in src/prediction_pipeline/pre_processing/preprocess_historic_visitor_count_data.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def handle_outliers(df):
    """
    Transform to NaN every value higher than 800. During exploration we found that values over that are outliers. There were only 6 rows with any count over 800

    Args:
        df (pandas.DataFrame): DataFrame with values to be turned to NaN.

    Returns:
        pandas.DataFrame: The modified DataFrame with values over 800 turned to NaN
    """

    df[df > 800] = np.nan

    return df

merge_columns(df)

Merges columns from replaced sensors in the DataFrame into new combined columns based on a predefined mapping and drops the original columns after merging. Additionally, drops columns with names containing "Fahrräder" or "Fußgänger" as we will not use that distinction.

Parameters:

Name Type Description Default
df DataFrame

A DataFrame containing columns to be merged.

required

Returns:

Type Description

pandas.DataFrame: The modified DataFrame with the new merged columns, original columns removed, and Fahrräder or Fußgänger columns dropped.

Source code in src/prediction_pipeline/pre_processing/preprocess_historic_visitor_count_data.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def merge_columns(df):
    """
    Merges columns from replaced sensors in the DataFrame into new combined columns based on a predefined mapping
    and drops the original columns after merging. Additionally, drops columns with names containing "Fahrräder" or "Fußgänger" as we will not use that distinction.

    Args:
        df (pandas.DataFrame): A DataFrame containing columns to be merged.

    Returns:
        pandas.DataFrame: The modified DataFrame with the new merged columns, original columns removed, and Fahrräder or Fußgänger columns dropped.
    """
    merge_dict = {
        'Bucina MERGED IN': ['Bucina PYRO IN', 'Bucina_Multi IN'],
        'Bucina MERGED OUT': ['Bucina PYRO OUT', 'Bucina_Multi OUT'],
        'Falkenstein 1 MERGED IN': ['Falkenstein 1 PYRO IN', 'Falkenstein 1 IN'],
        'Falkenstein 1 MERGED OUT': ['Falkenstein 1 PYRO OUT', 'Falkenstein 1 OUT'],
        'Lusen 1 MERGED IN': ['Lusen 1 PYRO IN', 'Lusen 1 EVO IN'],
        'Lusen 1 MERGED OUT': ['Lusen 1 PYRO OUT', 'Lusen 1 EVO OUT'],
        'Trinkwassertalsperre MERGED IN': ['Trinkwassertalsperre PYRO IN', 'Trinkwassertalsperre_MULTI IN'],
        'Trinkwassertalsperre MERGED OUT': ['Trinkwassertalsperre PYRO OUT', 'Trinkwassertalsperre_MULTI OUT']
    }

    # Iterate over each item in the dictionary to merge columns
    for new_col, cols in merge_dict.items():
        # Combine the two columns into one using the first non-null value
        df[new_col] = df[cols[0]].combine_first(df[cols[1]])

    # Drop the original columns used for merging
    cols_to_drop = [col for cols in merge_dict.values() for col in cols]
    df = df.drop(columns=cols_to_drop)

    # Drop columns with names containing "Fahrräder" or "Fußgänger"
    df = df.loc[:, ~df.columns.str.contains("Fahrräder|Fußgänger")]

    return df

parse_german_dates(df, date_column_name)

Parses German dates in the specified date column of the DataFrame using regex, including hours and minutes if available.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame containing the date column.

required
date_column_name str

The name of the date column.

required

Returns:

Type Description
DataFrame

pd.DataFrame: The DataFrame with parsed German dates.

Source code in src/prediction_pipeline/pre_processing/preprocess_historic_visitor_count_data.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def parse_german_dates(
    df: pd.DataFrame,
    date_column_name: str
) -> pd.DataFrame:
    """
    Parses German dates in the specified date column of the DataFrame using regex,
    including hours and minutes if available.

    Args:
        df (pd.DataFrame): The DataFrame containing the date column.
        date_column_name (str): The name of the date column.

    Returns:
        pd.DataFrame: The DataFrame with parsed German dates.
    """

    # Define a mapping of German month names to their numeric values
    month_map = {
        "Jan.": "01",
        "Feb.": "02",
        "März": "03",
        "Apr.": "04",
        "Mai": "05",
        "Juni": "06",
        "Juli": "07",
        "Aug.": "08",
        "Sep.": "09",
        "Okt.": "10",
        "Nov.": "11",
        "Dez.": "12"
    }

    # Create a regex pattern for replacing months and capturing time
    pattern = re.compile(r'(\d{1,2})\.\s*(' + '|'.join(month_map.keys()) + r')\s*(\d{4})\s*(\d{2}):(\d{2})')

    # Function to replace the month in the matched string and keep the time part
    def replace_month(match):
        day = match.group(1)
        month = month_map[match.group(2)]
        year = match.group(3)
        hour = match.group(4)
        minute = match.group(5)
        return f"{year}-{month}-{day} {hour}:{minute}:00"

    # Apply regex replacement and convert to datetime
    df[date_column_name] = df[date_column_name].apply(lambda x: replace_month(pattern.search(x)) if pattern.search(x) else x)
    df[date_column_name] = pd.to_datetime(df[date_column_name], errors='coerce')

    return df

write_csv_file_to_aws_s3(df, path, **kwargs)

Writes an individual CSV file to AWS S3.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to write.

required
path str

The path to the CSV files on AWS S3.

required
**kwargs

Additional arguments to pass to the to_csv function.

{}
Source code in src/prediction_pipeline/pre_processing/preprocess_historic_visitor_count_data.py
431
432
433
434
435
436
437
438
439
440
441
def write_csv_file_to_aws_s3(df: pd.DataFrame, path: str, **kwargs) -> pd.DataFrame:
    """Writes an individual CSV file to AWS S3.

    Args:
        df (pd.DataFrame): The DataFrame to write.
        path (str): The path to the CSV files on AWS S3.
        **kwargs: Additional arguments to pass to the to_csv function.
    """

    wr.s3.to_csv(df, path=path, **kwargs)
    return

add_and_translate_day_of_week(df)

Create a new column 'Wochentag' that represents the day of the week in German.

Parameters: df (pandas.DataFrame): DataFrame containing the 'Datum' column with date information.

Returns: pandas.DataFrame: DataFrame with updated 'Wochentag' column in German.

Source code in src/prediction_pipeline/pre_processing/preprocess_visitor_center_data.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def add_and_translate_day_of_week(df):
    """
    Create a new column 'Wochentag' that represents the day of the week in German.

    Parameters:
    df (pandas.DataFrame): DataFrame containing the 'Datum' column with date information.

    Returns:
    pandas.DataFrame: DataFrame with updated 'Wochentag' column in German.
    """
    # Create a new column 'Wochentag2' with the day of the week in English
    df['Wochentag2'] = df['Datum'].dt.day_name()
    df['Wochentag2'] = df['Wochentag2'].astype('category')

    # Define the translation mapping from English to German
    translation_map = {
        'Monday': 'Montag',
        'Tuesday': 'Dienstag',
        'Wednesday': 'Mittwoch',
        'Thursday': 'Donnerstag',
        'Friday': 'Freitag',
        'Saturday': 'Samstag',
        'Sunday': 'Sonntag'
    }

    # Replace the English day names in the 'Wochentag2' column with German names
    df['Wochentag2'] = df['Wochentag2'].replace(translation_map)

    # Remove the 'Wochentag' column from the DataFrame
    df = df.drop(columns=['Wochentag'], errors='ignore')

    # Rename 'Wochentag2' to 'Wochentag'
    df = df.rename(columns={'Wochentag2': 'Wochentag'})

    return df

add_date_variables(df)

Create new columns for day, month, and year from a date column in the DataFrame.

Parameters: df (pandas.DataFrame): DataFrame containing the 'Datum' column with date information.

Returns: pandas.DataFrame: DataFrame with additional columns for day, month, and year.

Source code in src/prediction_pipeline/pre_processing/preprocess_visitor_center_data.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def add_date_variables(df):
    """
    Create new columns for day, month, and year from a date column in the DataFrame.

    Parameters:
    df (pandas.DataFrame): DataFrame containing the 'Datum' column with date information.

    Returns:
    pandas.DataFrame: DataFrame with additional columns for day, month, and year.
    """
    # Convert 'Datum' column to datetime format
    df['Datum'] = pd.to_datetime(df['Datum'])

    # Add new columns for day, month, and year
    df['Tag'] = df['Datum'].dt.day
    df['Monat'] = df['Datum'].dt.month
    df['Jahr'] = df['Datum'].dt.year

    # Change data types for modeling purposes
    df['Tag'] = df['Tag'].astype('Int64')
    df['Monat'] = df['Monat'].astype('category')
    df['Jahr'] = df['Jahr'].astype('Int64')

    return df

add_season_variable(df)

Create a new column 'Jahreszeit' in the DataFrame based on the month variable.

Parameters: df (pandas.DataFrame): DataFrame containing the 'Monat' column with month information.

Returns: pandas.DataFrame: DataFrame with an additional 'Jahreszeit' column representing the season.

Source code in src/prediction_pipeline/pre_processing/preprocess_visitor_center_data.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
def add_season_variable(df):
    """
    Create a new column 'Jahreszeit' in the DataFrame based on the month variable.

    Parameters:
    df (pandas.DataFrame): DataFrame containing the 'Monat' column with month information.

    Returns:
    pandas.DataFrame: DataFrame with an additional 'Jahreszeit' column representing the season.
    """
    # Define the seasons based on the month
    df['Jahreszeit'] = df['Monat'].apply(
        lambda x: 'Frühling' if x in [3, 4, 5] else
                  'Sommer' if x in [6, 7, 8] else
                  'Herbst' if x in [9, 10, 11] else
                  'Winter' if x in [12, 1, 2] else
                  np.nan
    )

    # Convert the 'Jahreszeit' column to category type
    df['Jahreszeit'] = df['Jahreszeit'].astype('category')

    return df

add_weekend_variable(df)

Create a new binary column 'Wochenende' indicating whether the day is a weekend.

Parameters: df (pandas.DataFrame): DataFrame containing the 'Wochentag' column with German day names.

Returns: pandas.DataFrame: DataFrame with an additional 'Wochenende' column indicating weekend status.

Source code in src/prediction_pipeline/pre_processing/preprocess_visitor_center_data.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def add_weekend_variable(df):
    """
    Create a new binary column 'Wochenende' indicating whether the day is a weekend.

    Parameters:
    df (pandas.DataFrame): DataFrame containing the 'Wochentag' column with German day names.

    Returns:
    pandas.DataFrame: DataFrame with an additional 'Wochenende' column indicating weekend status.
    """
    # Create a new binary column 'Wochenende' where True represents weekend days (Saturday, Sunday)
    df['Wochenende'] = df['Wochentag'].apply(lambda x: x in ['Samstag', 'Sonntag'])

    # Convert the 'Wochenende' column to boolean type
    df['Wochenende'] = df['Wochenende'].astype(bool)

    return df

correct_and_convert_schulferien(df_visitcenters)

Corrects a typo in the 'Schulferien_Bayern' column and converts it to boolean type.

Parameters: df (pandas.DataFrame): DataFrame containing the 'Schulferien_Bayern' column.

Returns: pandas.DataFrame: DataFrame with corrected 'Schulferien_Bayern' values and converted to boolean type.

Source code in src/prediction_pipeline/pre_processing/preprocess_visitor_center_data.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def correct_and_convert_schulferien(df_visitcenters):
    """
    Corrects a typo in the 'Schulferien_Bayern' column and converts it to boolean type.

    Parameters:
    df (pandas.DataFrame): DataFrame containing the 'Schulferien_Bayern' column.

    Returns:
    pandas.DataFrame: DataFrame with corrected 'Schulferien_Bayern' values and converted to boolean type.
    """
    # Correct the typo in specific value for column 'Schulferien_Bayern' (from `10` to `0`)
    df_visitcenters.loc[df_visitcenters['Datum'] == '2017-04-30', 'Schulferien_Bayern'] = 0

    # Change 'Schulferien_Bayern' to bool type
    df_visitcenters['Schulferien_Bayern'] = df_visitcenters['Schulferien_Bayern'].astype(bool)

    return df_visitcenters

correct_and_convert_wgm_geoeffnet(df)

Corrects the 'WGM_geoeffnet' column by replacing the value 11 with 1. Converts the column to boolean type.

Parameters: df (pandas.DataFrame): DataFrame containing the 'WGM_geoeffnet' column.

Returns: pandas.DataFrame: DataFrame with 'WGM_geoeffnet' corrected and converted to boolean type.

Source code in src/prediction_pipeline/pre_processing/preprocess_visitor_center_data.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def correct_and_convert_wgm_geoeffnet(df):
    """
    Corrects the 'WGM_geoeffnet' column by replacing the value 11 with 1.
    Converts the column to boolean type.

    Parameters:
    df (pandas.DataFrame): DataFrame containing the 'WGM_geoeffnet' column.

    Returns:
    pandas.DataFrame: DataFrame with 'WGM_geoeffnet' corrected and converted to boolean type.
    """
    # Replace single value of 11 with 1 in 'WGM_geoeffnet' column
    df['WGM_geoeffnet'] = df['WGM_geoeffnet'].replace(11, 1)

    # Convert 'WGM_geoeffnet' column to boolean type
    df['WGM_geoeffnet'] = df['WGM_geoeffnet'].astype(bool)

    return df

correct_besuchszahlen_heh(df)

Corrects the 'Besuchszahlen_HEH' column by rounding up values with non-zero fractional parts to the nearest whole number. Converts the column to Int64 type to retain NaN values.

Parameters: df (pandas.DataFrame): DataFrame containing the 'Besuchszahlen_HEH' column.

Returns: pandas.DataFrame: DataFrame with 'Besuchszahlen_HEH' corrected and converted to Int64 type.

Source code in src/prediction_pipeline/pre_processing/preprocess_visitor_center_data.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def correct_besuchszahlen_heh(df):
    """
    Corrects the 'Besuchszahlen_HEH' column by rounding up values with non-zero fractional parts to the nearest whole number.
    Converts the column to Int64 type to retain NaN values.

    Parameters:
    df (pandas.DataFrame): DataFrame containing the 'Besuchszahlen_HEH' column.

    Returns:
    pandas.DataFrame: DataFrame with 'Besuchszahlen_HEH' corrected and converted to Int64 type.
    """
    # Apply np.ceil() to round up values with non-zero fractional parts to nearest whole number
    df['Besuchszahlen_HEH'] = df['Besuchszahlen_HEH'].apply(
        lambda x: np.ceil(x) if pd.notna(x) and x % 1 != 0 else x
    )

    # Convert 'Besuchszahlen_HEH' to Int64 to retain NaN values
    df['Besuchszahlen_HEH'] = df['Besuchszahlen_HEH'].astype('Int64')

    return df

create_hourly_dataframe(df)

Expands the daily data in the DataFrame to an hourly level by duplicating each day into 24 hourly rows.

Parameters: df (pandas.DataFrame): DataFrame containing daily data with a 'Datum' column representing dates.

Returns: pandas.DataFrame: New DataFrame with an hourly level where each day is expanded into 24 hourly rows.

Source code in src/prediction_pipeline/pre_processing/preprocess_visitor_center_data.py
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def create_hourly_dataframe(df):
    """
    Expands the daily data in the DataFrame to an hourly level by duplicating each day into 24 hourly rows.

    Parameters:
    df (pandas.DataFrame): DataFrame containing daily data with a 'Datum' column representing dates.

    Returns:
    pandas.DataFrame: New DataFrame with an hourly level where each day is expanded into 24 hourly rows.
    """
    # Generate a new DataFrame where each day is expanded into 24 rows (one per hour)
    df_hourly = df.loc[df.index.repeat(24)].copy()

    # Create the hourly timestamps by adding hours to the 'Datum' column
    df_hourly['Datum'] = df_hourly['Datum'] + pd.to_timedelta(df_hourly.groupby(df_hourly.index).cumcount(), unit='h')

    # Rename columns for clarity
    df_hourly = df_hourly.rename(columns=lambda x: x.strip())
    df_hourly = df_hourly.rename(columns={'Datum': 'Time'})

    return df_hourly

detect_outliers_std(df, column, num_sd=7)

Detect outliers in a specific column of the DataFrame using the standard deviation method.

Parameters: df (pandas.DataFrame): DataFrame containing the column to check. column (str): Name of the column to check for outliers. num_sd (int): Number of standard deviations to define the outlier bounds (default is 7).

Returns: pandas.DataFrame: DataFrame containing rows with outliers in the specified column.

Source code in src/prediction_pipeline/pre_processing/preprocess_visitor_center_data.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
def detect_outliers_std(df, column, num_sd=7):
    """
    Detect outliers in a specific column of the DataFrame using the standard deviation method.

    Parameters:
    df (pandas.DataFrame): DataFrame containing the column to check.
    column (str): Name of the column to check for outliers.
    num_sd (int): Number of standard deviations to define the outlier bounds (default is 7).

    Returns:
    pandas.DataFrame: DataFrame containing rows with outliers in the specified column.
    """
    mean = df[column].mean()
    std_dev = df[column].std()

    # Define the bounds for outliers
    lower_bound = mean - num_sd * std_dev
    upper_bound = mean + num_sd * std_dev

    # Identify outliers
    outliers_mask = (df[column] < lower_bound) | (df[column] > upper_bound)
    return df[outliers_mask][['Datum', column]]

handle_outliers(df, num_sd=7)

Detect and handle outliers for a list of columns by replacing them with NaN.

Parameters: df (pandas.DataFrame): DataFrame containing the columns to check. num_sd (int): Number of standard deviations to define the outlier bounds (default is 7).

Returns: pandas.DataFrame: DataFrame with outliers replaced by NaN in the specified columns.

Source code in src/prediction_pipeline/pre_processing/preprocess_visitor_center_data.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
def handle_outliers(df, num_sd=7):
    """
    Detect and handle outliers for a list of columns by replacing them with NaN.

    Parameters:
    df (pandas.DataFrame): DataFrame containing the columns to check.
    num_sd (int): Number of standard deviations to define the outlier bounds (default is 7).

    Returns:
    pandas.DataFrame: DataFrame with outliers replaced by NaN in the specified columns.
    """
    columns = [
    'Besuchszahlen_HEH',
    'Besuchszahlen_HZW',
    'Besuchszahlen_WGM',
    'Parkpl_HEH_PKW',
    'Parkpl_HEH_BUS',
    'Parkpl_HZW_PKW',
    'Parkpl_HZW_BUS']

    #outliers = {}

    # Detect outliers and store in dictionary
    #for column in columns:
        #outliers[column] = detect_outliers_std(df, column, num_sd)

    # Handle outliers by replacing with NaN
    for column in columns:
        mean = df[column].mean()
        std_dev = df[column].std()
        lower_bound = mean - num_sd * std_dev
        upper_bound = mean + num_sd * std_dev
        df.loc[(df[column] < lower_bound) | (df[column] > upper_bound), column] = np.nan

    return df

remove_last_row_if_needed(df)

Removes the last row from the DataFrame if it has 2923 rows.

Parameters: df (pandas.DataFrame): DataFrame to be checked and modified.

Returns: pandas.DataFrame: Updated DataFrame with the last row removed if the initial length was 2923.

Source code in src/prediction_pipeline/pre_processing/preprocess_visitor_center_data.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def remove_last_row_if_needed(df):
    """
    Removes the last row from the DataFrame if it has 2923 rows.

    Parameters:
    df (pandas.DataFrame): DataFrame to be checked and modified.

    Returns:
    pandas.DataFrame: Updated DataFrame with the last row removed if the initial length was 2923.
    """
    # Check if the DataFrame has exactly 2923 rows
    if len(df) == 2923:
        # Drop the last row
        df = df.iloc[:-1]

    return df

rename_and_set_time_as_index(df)

Rename columns, convert 'time' column to datetime, and set 'time' as the index.

Parameters: df (pandas.DataFrame): DataFrame containing data with a 'Datum' column to be renamed and converted.

Returns: pandas.DataFrame: The cleaned DataFrame with 'Datum' renamed to 'time', converted to datetime, and 'time' set as index.

Source code in src/prediction_pipeline/pre_processing/preprocess_visitor_center_data.py
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
def rename_and_set_time_as_index(df):
    """
    Rename columns, convert 'time' column to datetime, and set 'time' as the index.

    Parameters:
    df (pandas.DataFrame): DataFrame containing data with a 'Datum' column to be renamed and converted.

    Returns:
    pandas.DataFrame: The cleaned DataFrame with 'Datum' renamed to 'time', converted to datetime, and 'time' set as index.
    """
    # Rename 'Datum' column to 'time'
    df.rename(columns={'Datum': 'time'}, inplace=True)
    df.index=pd.to_datetime(df.index)

    # Convert 'time' column to datetime
    #df['time'] = pd.to_datetime(df['time'])

    # Set 'time' column as index
    #df.set_index('time', inplace=True)

    return df

reorder_columns(df)

Reorder columns in the DataFrame to place date-related variables together.

Parameters: df (pandas.DataFrame): DataFrame with various columns including date-related variables.

Returns: pandas.DataFrame: DataFrame with columns reordered to place date-related variables next to each other.

Source code in src/prediction_pipeline/pre_processing/preprocess_visitor_center_data.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def reorder_columns(df):
    """
    Reorder columns in the DataFrame to place date-related variables together.

    Parameters:
    df (pandas.DataFrame): DataFrame with various columns including date-related variables.

    Returns:
    pandas.DataFrame: DataFrame with columns reordered to place date-related variables next to each other.
    """
    # Define the desired order of columns
    column_order = [
        'Datum', 'Tag', 'Monat', 'Jahr', 'Wochentag', 'Wochenende', 'Jahreszeit', 'Laubfärbung',
        'Besuchszahlen_HEH', 'Besuchszahlen_HZW', 'Besuchszahlen_WGM', 
        'Parkpl_HEH_PKW', 'Parkpl_HEH_BUS', 'Parkpl_HZW_PKW', 'Parkpl_HZW_BUS', 
        'Schulferien_Bayern', 'Schulferien_CZ', 'Feiertag_Bayern', 'Feiertag_CZ', 
        'HEH_geoeffnet', 'HZW_geoeffnet', 'WGM_geoeffnet', 'Lusenschutzhaus_geoeffnet', 
        'Racheldiensthuette_geoeffnet', 'Waldschmidthaus_geoeffnet', 
        'Falkensteinschutzhaus_geoeffnet', 'Schwellhaeusl_geoeffnet', 'Temperatur', 
        'Niederschlagsmenge', 'Schneehoehe', 'GS mit', 'GS max'
    ]

    # Reorder columns in the DataFrame
    df = df[column_order]

    return df

write_parquet_file_to_aws_s3(df, path, **kwargs)

Writes an individual Parquet file to AWS S3.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to write.

required
path str

The path to the Parquet files on AWS S3.

required
**kwargs

Additional arguments to pass to the to_parquet function.

{}
Source code in src/prediction_pipeline/pre_processing/preprocess_visitor_center_data.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
def write_parquet_file_to_aws_s3(df: pd.DataFrame, path: str, **kwargs) -> pd.DataFrame:
    """Writes an individual Parquet file to AWS S3.

    Args:
        df (pd.DataFrame): The DataFrame to write.
        path (str): The path to the Parquet files on AWS S3.
        **kwargs: Additional arguments to pass to the to_parquet function.
    """
    try:
        wr.s3.to_parquet(df, path=path, **kwargs)
        print(f"DataFrame successfully written to {path}")
    except Exception as e:
        logging.error(f"Failed to write DataFrame to S3. Error: {e}")
    return

fill_missing_values(data, parameters)

Fill missing values in the weather data using linear interpolation or zero values.

Parameters:

Name Type Description Default
data DataFrame

Processed hourly weather data.

required
parameters list

List of column names to process.

required

Returns:

Type Description

pandas.DataFrame: DataFrame with missing values filled.

Source code in src/prediction_pipeline/pre_processing/preprocess_weather_data.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def fill_missing_values(data, parameters):
    """
    Fill missing values in the weather data using linear interpolation or zero values.

    Args:
        data (pandas.DataFrame): Processed hourly weather data.
        parameters (list): List of column names to process.

    Returns:
        pandas.DataFrame: DataFrame with missing values filled.
    """
    total_rows = data.shape[0]

    for parameter in parameters:
        # Calculate missing values and their percentage
        missing_values = data[parameter].isnull().sum()
        missing_percentage = (missing_values / total_rows) * 100

        # Calculate zero values and their percentage
        zero_values = data[parameter].eq(0).sum()
        zero_percentage = (zero_values / total_rows) * 100

        # Check for missing values in the 'Time' column
        if parameter == 'Time' and missing_values > 0:
            print(f'Missing values in Time column: {missing_percentage:.2f}%')
            print('Please check the missing values in the Time column')
            exit()

        if missing_values == 0:
            print(f'No missing values in {parameter} column')
        else:
            print(f'Missing values in {parameter} column: {missing_percentage:.2f}%')

            if zero_percentage > 60:
                # Fill missing values with 0.0 if zero values are significant
                print(f'Zero values in {parameter} column: {zero_percentage:.2f}%')
                data[parameter].fillna(0.0, inplace=True)
                print(f'Missing values in {parameter} column filled with 0.0')
            else:
                if data[parameter].dtype == 'category':
                    #fill with previous row value  
                    data[parameter].fillna(method='pad', inplace=True)
                else:
                # Use linear interpolation to fill missing values
                    data[parameter].interpolate(method='linear', inplace=True)
                    # Round the interpolated values to 2 decimal places
                    data[parameter] = data[parameter].round(2)
                    print(f'Missing values in {parameter} column filled using linear interpolation')

    return data

process_weather_data(sourced_df)

This function creates a point over the Bavarian Forest National Park, retrieves hourly weather data for the specified time period, processes the data to extract necessary weather parameters, and saves the processed data to a CSV file.

Source code in src/prediction_pipeline/pre_processing/preprocess_weather_data.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def process_weather_data(sourced_df):
    """
    This function creates a point over the Bavarian Forest National Park, retrieves hourly weather data
    for the specified time period, processes the data to extract necessary weather parameters,
    and saves the processed data to a CSV file.
    """
    sourced_df["coco_2"] = sourced_df["coco_2"].astype("category")
    # Get the list of columns to process
    parameters = sourced_df.columns.to_list()

    # Fill missing values in the weather data
    imputed_data = fill_missing_values(sourced_df, parameters)

    # # Uncomment the following line to save the processed data to a CSV file
    # # Save the processed data to a CSV file
    # save_data_to_csv(imputed_data, 'outputs/weather_data_final/processed_weather_data_2016-24_forecasted_imputed.csv')


    return imputed_data

add_and_translate_day_of_week(df)

Create a new column 'Wochentag' that represents the day of the week in German.

Parameters: df (pandas.DataFrame): DataFrame containing the 'Datum' column with date information.

Returns: pandas.DataFrame: DataFrame with updated 'Wochentag' column in German.

Source code in src/prediction_pipeline/pre_processing/visitor_center_processing_script.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def add_and_translate_day_of_week(df):
    """
    Create a new column 'Wochentag' that represents the day of the week in German.

    Parameters:
    df (pandas.DataFrame): DataFrame containing the 'Datum' column with date information.

    Returns:
    pandas.DataFrame: DataFrame with updated 'Wochentag' column in German.
    """
    # Create a new column 'Wochentag2' with the day of the week in English
    df['Wochentag2'] = df['Datum'].dt.day_name()
    df['Wochentag2'] = df['Wochentag2'].astype('category')

    # Define the translation mapping from English to German
    translation_map = {
        'Monday': 'Montag',
        'Tuesday': 'Dienstag',
        'Wednesday': 'Mittwoch',
        'Thursday': 'Donnerstag',
        'Friday': 'Freitag',
        'Saturday': 'Samstag',
        'Sunday': 'Sonntag'
    }

    # Replace the English day names in the 'Wochentag2' column with German names
    df['Wochentag2'] = df['Wochentag2'].replace(translation_map)

    # Remove the 'Wochentag' column from the DataFrame
    df = df.drop(columns=['Wochentag'], errors='ignore')

    # Rename 'Wochentag2' to 'Wochentag'
    df = df.rename(columns={'Wochentag2': 'Wochentag'})

    return df

add_date_variables(df)

Create new columns for day, month, and year from a date column in the DataFrame.

Parameters: df (pandas.DataFrame): DataFrame containing the 'Datum' column with date information.

Returns: pandas.DataFrame: DataFrame with additional columns for day, month, and year.

Source code in src/prediction_pipeline/pre_processing/visitor_center_processing_script.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def add_date_variables(df):
    """
    Create new columns for day, month, and year from a date column in the DataFrame.

    Parameters:
    df (pandas.DataFrame): DataFrame containing the 'Datum' column with date information.

    Returns:
    pandas.DataFrame: DataFrame with additional columns for day, month, and year.
    """
    # Convert 'Datum' column to datetime format
    df['Datum'] = pd.to_datetime(df['Datum'])

    # Add new columns for day, month, and year
    df['Tag'] = df['Datum'].dt.day
    df['Monat'] = df['Datum'].dt.month
    df['Jahr'] = df['Datum'].dt.year

    # Change data types for modeling purposes
    df['Tag'] = df['Tag'].astype('Int64')
    df['Monat'] = df['Monat'].astype('category')
    df['Jahr'] = df['Jahr'].astype('Int64')

    return df

add_season_variable(df)

Create a new column 'Jahreszeit' in the DataFrame based on the month variable.

Parameters: df (pandas.DataFrame): DataFrame containing the 'Monat' column with month information.

Returns: pandas.DataFrame: DataFrame with an additional 'Jahreszeit' column representing the season.

Source code in src/prediction_pipeline/pre_processing/visitor_center_processing_script.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def add_season_variable(df):
    """
    Create a new column 'Jahreszeit' in the DataFrame based on the month variable.

    Parameters:
    df (pandas.DataFrame): DataFrame containing the 'Monat' column with month information.

    Returns:
    pandas.DataFrame: DataFrame with an additional 'Jahreszeit' column representing the season.
    """
    # Define the seasons based on the month
    df['Jahreszeit'] = df['Monat'].apply(
        lambda x: 'Frühling' if x in [3, 4, 5] else
                  'Sommer' if x in [6, 7, 8] else
                  'Herbst' if x in [9, 10, 11] else
                  'Winter' if x in [12, 1, 2] else
                  np.nan
    )

    # Convert the 'Jahreszeit' column to category type
    df['Jahreszeit'] = df['Jahreszeit'].astype('category')

    return df

add_weekend_variable(df)

Create a new binary column 'Wochenende' indicating whether the day is a weekend.

Parameters: df (pandas.DataFrame): DataFrame containing the 'Wochentag' column with German day names.

Returns: pandas.DataFrame: DataFrame with an additional 'Wochenende' column indicating weekend status.

Source code in src/prediction_pipeline/pre_processing/visitor_center_processing_script.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def add_weekend_variable(df):
    """
    Create a new binary column 'Wochenende' indicating whether the day is a weekend.

    Parameters:
    df (pandas.DataFrame): DataFrame containing the 'Wochentag' column with German day names.

    Returns:
    pandas.DataFrame: DataFrame with an additional 'Wochenende' column indicating weekend status.
    """
    # Create a new binary column 'Wochenende' where True represents weekend days (Saturday, Sunday)
    df['Wochenende'] = df['Wochentag'].apply(lambda x: x in ['Samstag', 'Sonntag'])

    # Convert the 'Wochenende' column to boolean type
    df['Wochenende'] = df['Wochenende'].astype(bool)

    return df

correct_and_convert_schulferien(df_visitcenters)

Corrects a typo in the 'Schulferien_Bayern' column and converts it to boolean type.

Parameters: df (pandas.DataFrame): DataFrame containing the 'Schulferien_Bayern' column.

Returns: pandas.DataFrame: DataFrame with corrected 'Schulferien_Bayern' values and converted to boolean type.

Source code in src/prediction_pipeline/pre_processing/visitor_center_processing_script.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def correct_and_convert_schulferien(df_visitcenters):
    """
    Corrects a typo in the 'Schulferien_Bayern' column and converts it to boolean type.

    Parameters:
    df (pandas.DataFrame): DataFrame containing the 'Schulferien_Bayern' column.

    Returns:
    pandas.DataFrame: DataFrame with corrected 'Schulferien_Bayern' values and converted to boolean type.
    """
    # Correct the typo in specific value for column 'Schulferien_Bayern' (from `10` to `0`)
    df_visitcenters.loc[df_visitcenters['Datum'] == '2017-04-30', 'Schulferien_Bayern'] = 0

    # Change 'Schulferien_Bayern' to bool type
    df_visitcenters['Schulferien_Bayern'] = df_visitcenters['Schulferien_Bayern'].astype(bool)

    return df_visitcenters

correct_and_convert_wgm_geoeffnet(df)

Corrects the 'WGM_geoeffnet' column by replacing the value 11 with 1. Converts the column to boolean type.

Parameters: df (pandas.DataFrame): DataFrame containing the 'WGM_geoeffnet' column.

Returns: pandas.DataFrame: DataFrame with 'WGM_geoeffnet' corrected and converted to boolean type.

Source code in src/prediction_pipeline/pre_processing/visitor_center_processing_script.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def correct_and_convert_wgm_geoeffnet(df):
    """
    Corrects the 'WGM_geoeffnet' column by replacing the value 11 with 1.
    Converts the column to boolean type.

    Parameters:
    df (pandas.DataFrame): DataFrame containing the 'WGM_geoeffnet' column.

    Returns:
    pandas.DataFrame: DataFrame with 'WGM_geoeffnet' corrected and converted to boolean type.
    """
    # Replace single value of 11 with 1 in 'WGM_geoeffnet' column
    df['WGM_geoeffnet'] = df['WGM_geoeffnet'].replace(11, 1)

    # Convert 'WGM_geoeffnet' column to boolean type
    df['WGM_geoeffnet'] = df['WGM_geoeffnet'].astype(bool)

    return df

correct_besuchszahlen_heh(df)

Corrects the 'Besuchszahlen_HEH' column by rounding up values with non-zero fractional parts to the nearest whole number. Converts the column to Int64 type to retain NaN values.

Parameters: df (pandas.DataFrame): DataFrame containing the 'Besuchszahlen_HEH' column.

Returns: pandas.DataFrame: DataFrame with 'Besuchszahlen_HEH' corrected and converted to Int64 type.

Source code in src/prediction_pipeline/pre_processing/visitor_center_processing_script.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def correct_besuchszahlen_heh(df):
    """
    Corrects the 'Besuchszahlen_HEH' column by rounding up values with non-zero fractional parts to the nearest whole number.
    Converts the column to Int64 type to retain NaN values.

    Parameters:
    df (pandas.DataFrame): DataFrame containing the 'Besuchszahlen_HEH' column.

    Returns:
    pandas.DataFrame: DataFrame with 'Besuchszahlen_HEH' corrected and converted to Int64 type.
    """
    # Apply np.ceil() to round up values with non-zero fractional parts to nearest whole number
    df['Besuchszahlen_HEH'] = df['Besuchszahlen_HEH'].apply(
        lambda x: np.ceil(x) if pd.notna(x) and x % 1 != 0 else x
    )

    # Convert 'Besuchszahlen_HEH' to Int64 to retain NaN values
    df['Besuchszahlen_HEH'] = df['Besuchszahlen_HEH'].astype('Int64')

    return df

create_hourly_dataframe(df)

Expands the daily data in the DataFrame to an hourly level by duplicating each day into 24 hourly rows.

Parameters: df (pandas.DataFrame): DataFrame containing daily data with a 'Datum' column representing dates.

Returns: pandas.DataFrame: New DataFrame with an hourly level where each day is expanded into 24 hourly rows.

Source code in src/prediction_pipeline/pre_processing/visitor_center_processing_script.py
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def create_hourly_dataframe(df):
    """
    Expands the daily data in the DataFrame to an hourly level by duplicating each day into 24 hourly rows.

    Parameters:
    df (pandas.DataFrame): DataFrame containing daily data with a 'Datum' column representing dates.

    Returns:
    pandas.DataFrame: New DataFrame with an hourly level where each day is expanded into 24 hourly rows.
    """
    # Generate a new DataFrame where each day is expanded into 24 rows (one per hour)
    df_hourly = df.loc[df.index.repeat(24)].copy()

    # Create the hourly timestamps by adding hours to the 'Datum' column
    df_hourly['Datum'] = df_hourly['Datum'] + pd.to_timedelta(df_hourly.groupby(df_hourly.index).cumcount(), unit='h')

    # Rename columns for clarity
    df_hourly = df_hourly.rename(columns=lambda x: x.strip())
    df_hourly = df_hourly.rename(columns={'Datum': 'Time'})

    return df_hourly

detect_outliers_std(df, column, num_sd=7)

Detect outliers in a specific column of the DataFrame using the standard deviation method.

Parameters: df (pandas.DataFrame): DataFrame containing the column to check. column (str): Name of the column to check for outliers. num_sd (int): Number of standard deviations to define the outlier bounds (default is 7).

Returns: pandas.DataFrame: DataFrame containing rows with outliers in the specified column.

Source code in src/prediction_pipeline/pre_processing/visitor_center_processing_script.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
def detect_outliers_std(df, column, num_sd=7):
    """
    Detect outliers in a specific column of the DataFrame using the standard deviation method.

    Parameters:
    df (pandas.DataFrame): DataFrame containing the column to check.
    column (str): Name of the column to check for outliers.
    num_sd (int): Number of standard deviations to define the outlier bounds (default is 7).

    Returns:
    pandas.DataFrame: DataFrame containing rows with outliers in the specified column.
    """
    mean = df[column].mean()
    std_dev = df[column].std()

    # Define the bounds for outliers
    lower_bound = mean - num_sd * std_dev
    upper_bound = mean + num_sd * std_dev

    # Identify outliers
    outliers_mask = (df[column] < lower_bound) | (df[column] > upper_bound)
    return df[outliers_mask][['Datum', column]]

handle_outliers(df, num_sd=7)

Detect and handle outliers for a list of columns by replacing them with NaN.

Parameters: df (pandas.DataFrame): DataFrame containing the columns to check. num_sd (int): Number of standard deviations to define the outlier bounds (default is 7).

Returns: pandas.DataFrame: DataFrame with outliers replaced by NaN in the specified columns.

Source code in src/prediction_pipeline/pre_processing/visitor_center_processing_script.py
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def handle_outliers(df, num_sd=7):
    """
    Detect and handle outliers for a list of columns by replacing them with NaN.

    Parameters:
    df (pandas.DataFrame): DataFrame containing the columns to check.
    num_sd (int): Number of standard deviations to define the outlier bounds (default is 7).

    Returns:
    pandas.DataFrame: DataFrame with outliers replaced by NaN in the specified columns.
    """
    columns = [
    'Besuchszahlen_HEH',
    'Besuchszahlen_HZW',
    'Besuchszahlen_WGM',
    'Parkpl_HEH_PKW',
    'Parkpl_HEH_BUS',
    'Parkpl_HZW_PKW',
    'Parkpl_HZW_BUS']

    #outliers = {}

    # Detect outliers and store in dictionary
    #for column in columns:
        #outliers[column] = detect_outliers_std(df, column, num_sd)

    # Handle outliers by replacing with NaN
    for column in columns:
        mean = df[column].mean()
        std_dev = df[column].std()
        lower_bound = mean - num_sd * std_dev
        upper_bound = mean + num_sd * std_dev
        df.loc[(df[column] < lower_bound) | (df[column] > upper_bound), column] = np.nan

    return df

remove_last_row_if_needed(df)

Removes the last row from the DataFrame if it has 2923 rows.

Parameters: df (pandas.DataFrame): DataFrame to be checked and modified.

Returns: pandas.DataFrame: Updated DataFrame with the last row removed if the initial length was 2923.

Source code in src/prediction_pipeline/pre_processing/visitor_center_processing_script.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def remove_last_row_if_needed(df):
    """
    Removes the last row from the DataFrame if it has 2923 rows.

    Parameters:
    df (pandas.DataFrame): DataFrame to be checked and modified.

    Returns:
    pandas.DataFrame: Updated DataFrame with the last row removed if the initial length was 2923.
    """
    # Check if the DataFrame has exactly 2923 rows
    if len(df) == 2923:
        # Drop the last row
        df = df.iloc[:-1]

    return df

rename_and_set_time_as_index(df)

Rename columns, convert 'time' column to datetime, and set 'time' as the index.

Parameters: df (pandas.DataFrame): DataFrame containing data with a 'Datum' column to be renamed and converted.

Returns: pandas.DataFrame: The cleaned DataFrame with 'Datum' renamed to 'time', converted to datetime, and 'time' set as index.

Source code in src/prediction_pipeline/pre_processing/visitor_center_processing_script.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
def rename_and_set_time_as_index(df):
    """
    Rename columns, convert 'time' column to datetime, and set 'time' as the index.

    Parameters:
    df (pandas.DataFrame): DataFrame containing data with a 'Datum' column to be renamed and converted.

    Returns:
    pandas.DataFrame: The cleaned DataFrame with 'Datum' renamed to 'time', converted to datetime, and 'time' set as index.
    """
    # Rename 'Datum' column to 'time'
    df.rename(columns={'Datum': 'time'}, inplace=True)
    df.index=pd.to_datetime(df.index)

    # Convert 'time' column to datetime
    #df['time'] = pd.to_datetime(df['time'])

    # Set 'time' column as index
    #df.set_index('time', inplace=True)

    return df

reorder_columns(df)

Reorder columns in the DataFrame to place date-related variables together.

Parameters: df (pandas.DataFrame): DataFrame with various columns including date-related variables.

Returns: pandas.DataFrame: DataFrame with columns reordered to place date-related variables next to each other.

Source code in src/prediction_pipeline/pre_processing/visitor_center_processing_script.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def reorder_columns(df):
    """
    Reorder columns in the DataFrame to place date-related variables together.

    Parameters:
    df (pandas.DataFrame): DataFrame with various columns including date-related variables.

    Returns:
    pandas.DataFrame: DataFrame with columns reordered to place date-related variables next to each other.
    """
    # Define the desired order of columns
    column_order = [
        'Datum', 'Tag', 'Monat', 'Jahr', 'Wochentag', 'Wochenende', 'Jahreszeit', 'Laubfärbung',
        'Besuchszahlen_HEH', 'Besuchszahlen_HZW', 'Besuchszahlen_WGM', 
        'Parkpl_HEH_PKW', 'Parkpl_HEH_BUS', 'Parkpl_HZW_PKW', 'Parkpl_HZW_BUS', 
        'Schulferien_Bayern', 'Schulferien_CZ', 'Feiertag_Bayern', 'Feiertag_CZ', 
        'HEH_geoeffnet', 'HZW_geoeffnet', 'WGM_geoeffnet', 'Lusenschutzhaus_geoeffnet', 
        'Racheldiensthuette_geoeffnet', 'Waldschmidthaus_geoeffnet', 
        'Falkensteinschutzhaus_geoeffnet', 'Schwellhaeusl_geoeffnet', 'Temperatur', 
        'Niederschlagsmenge', 'Schneehoehe', 'GS mit', 'GS max'
    ]

    # Reorder columns in the DataFrame
    df = df[column_order]

    return df

source_data_from_aws_s3(path, **kwargs)

Loads individual or multiple CSV files from an AWS S3 bucket. Args: path (str): The path to the CSV files on AWS S3. **kwargs: Additional arguments to pass to the read_csv function. Returns: pd.DataFrame: The DataFrame containing the data from the CSV files.

Source code in src/prediction_pipeline/pre_processing/visitor_center_processing_script.py
27
28
29
30
31
32
33
34
35
36
def source_data_from_aws_s3(path: str, **kwargs) -> pd.DataFrame:
    """Loads individual or multiple CSV files from an AWS S3 bucket.
    Args:
        path (str): The path to the CSV files on AWS S3.
        **kwargs: Additional arguments to pass to the read_csv function.
    Returns:
        pd.DataFrame: The DataFrame containing the data from the CSV files.
    """
    df = wr.s3.read_excel(path=path, **kwargs)
    return df

write_parquet_file_to_aws_s3(df, path, **kwargs)

Writes an individual Parquet file to AWS S3.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to write.

required
path str

The path to the Parquet files on AWS S3.

required
**kwargs

Additional arguments to pass to the to_parquet function.

{}
Source code in src/prediction_pipeline/pre_processing/visitor_center_processing_script.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
def write_parquet_file_to_aws_s3(df: pd.DataFrame, path: str, **kwargs) -> pd.DataFrame:
    """Writes an individual Parquet file to AWS S3.

    Args:
        df (pd.DataFrame): The DataFrame to write.
        path (str): The path to the Parquet files on AWS S3.
        **kwargs: Additional arguments to pass to the to_parquet function.
    """
    try:
        wr.s3.to_parquet(df, path=path, **kwargs)
        print(f"DataFrame successfully written to {path}")
    except Exception as e:
        logging.error(f"Failed to write DataFrame to S3. Error: {e}")
    return

load_latest_models(bucket_name, folder_prefix, models_names)

Load the latest files from an S3 folder based on the model names, and dynamically create variables with 'loaded_' as prefix.

Parameters: - bucket_name (str): The name of the S3 bucket. - folder_prefix (str): The folder path within the bucket. - models (list): List of model names without the 'extra_trees_' prefix.

Returns: - dict: A dictionary containing the loaded models with keys prefixed by 'loaded_'.

Source code in src/prediction_pipeline/modeling/create_inference_dfs.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@st.cache_resource(max_entries=1)
def load_latest_models(bucket_name, folder_prefix, models_names):
    """
    Load the latest files from an S3 folder based on the model names, 
    and dynamically create variables with 'loaded_' as prefix.

    Parameters:
    - bucket_name (str): The name of the S3 bucket.
    - folder_prefix (str): The folder path within the bucket.
    - models (list): List of model names without the 'extra_trees_' prefix.

    Returns:
    - dict: A dictionary containing the loaded models with keys prefixed by 'loaded_'.
    """

    # Dictionary to store loaded models
    loaded_models = {}

    # Loop through each model to get the latest pickle (.pkl) file
    for model in models_names:

        # Create an S3 client
        s3 = boto3.client('s3')

        s3_key = folder_prefix + model + '.pkl'
        print(f"Retrieving the trained model {model} saved under AWS S3 in bucket {bucket_name} with key {s3_key}")

        # Get the object from S3
        response = s3.get_object(Bucket=bucket_name, Key=s3_key)

        # Load the pickled model from the response object using joblib
        loaded_regressor_model = joblib.load(io.BytesIO(response['Body'].read()))

        # Store the loaded model in the dictionary
        loaded_models[f'{model}'] = loaded_regressor_model

    return loaded_models

predict_with_models(loaded_models, df_features)

Given a dictionary of models and a DataFrame of features, this function predicts the target values using each model and saves the inference predictions to AWS S3 (to be further loaded from Streamlit).

Parameters: - loaded_models (dict): A dictionary of models where keys are model names and values are the trained models. - df_features (pd.DataFrame): A DataFrame containing the features to make predictions on.

Returns: - pd.DataFrame: A DataFrame containing the predictions of all models per region.

Source code in src/prediction_pipeline/modeling/create_inference_dfs.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def predict_with_models(loaded_models, df_features):
    """
    Given a dictionary of models and a DataFrame of features, this function predicts the target
    values using each model and saves the inference predictions to AWS S3 (to be further loaded from Streamlit).

    Parameters:
    - loaded_models (dict): A dictionary of models where keys are model names and values are the trained models.
    - df_features (pd.DataFrame): A DataFrame containing the features to make predictions on.

    Returns:
    - pd.DataFrame: A DataFrame containing the predictions of all models per region.
    """

    overall_predictions = pd.DataFrame()

    # Iterate through the loaded models
    for model_name, model in loaded_models.items():
        # Check if the model has a predict method
        if hasattr(model, 'predict'):
            # Make predictions
            predictions = model.predict(df_features)

            # Create a new DataFrame for the predictions with the time column
            df_predictions = pd.DataFrame(predictions, columns=['predictions'])

            # Make the index column 'Time'
            df_predictions['Time'] = df_features.index

            # Make sure predictions are integers and not floats
            df_predictions['predictions'] = df_predictions['predictions'].astype(int)

            # save the prediction dataframe as a parquet file in aws
            wr.s3.to_parquet(df_predictions,path = f"s3://{aws_s3_bucket}/models/inference_data_outputs/{model_name}.parquet")

            print(f"Predictions for {model_name} stored successfully")
            df_predictions["region"] = model_name.split('extra_trees_')[1].split('.parquet')[0]

            # Append the predictions to the overall_predictions DataFrame
            overall_predictions = pd.concat([overall_predictions, df_predictions])

        else:
           print(f"Error: {model_name} is not a valid model. It is of type {type(model)}")

    return overall_predictions

join_inference_data(weather_data_inference, visitor_centers_data)

Merge weather data with visitor centers data.

Parameters:

Name Type Description Default
weather_data_inference DataFrame

DataFrame containing weather data.

required
visitor_centers_data DataFrame

DataFrame containing visitor centers data.

required

Returns:

Type Description

pd.DataFrame: Merged DataFrame with selected columns from visitor centers data.

Source code in src/prediction_pipeline/modeling/preprocess_inference_features.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def join_inference_data(weather_data_inference, visitor_centers_data):

    """Merge weather data with visitor centers data.

    Args:
        weather_data_inference (pd.DataFrame): DataFrame containing weather data.
        visitor_centers_data (pd.DataFrame): DataFrame containing visitor centers data.

    Returns:
        pd.DataFrame: Merged DataFrame with selected columns from visitor centers data.
    """

    # Define the columns you want to bring from visitor_centers_data
    columns_to_add = ['Time','Tag', 'Hour', 'Monat','Wochentag',  'Wochenende',  'Jahreszeit',  'Laubfärbung',
                    'Schulferien_Bayern', 'Schulferien_CZ','Feiertag_Bayern',  'Feiertag_CZ',
                    'HEH_geoeffnet',  'HZW_geoeffnet',  'WGM_geoeffnet', 'Lusenschutzhaus_geoeffnet',  'Racheldiensthuette_geoeffnet', 'Falkensteinschutzhaus_geoeffnet', 'Schwellhaeusl_geoeffnet']  

    # Perform the merge, keep the min and max values of the visitor center data
    merged_data = visitor_centers_data[columns_to_add].merge(weather_data_inference, on='Time', how='left')

    return merged_data

source_preprocess_inference_data(weather_data_inference, hourly_visitor_center_data, start_time, end_time)

Source and preprocess inference data from weather and visitor center sources.

This function fetches weather and visitor center data, merges them, and computes additional features such as nearest holiday distance, daily max values, and moving z-scores.

Returns:

Type Description

pd.DataFrame: DataFrame containing preprocessed inference data.

Source code in src/prediction_pipeline/modeling/preprocess_inference_features.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@st.cache_data(max_entries=1)
def source_preprocess_inference_data(weather_data_inference, hourly_visitor_center_data, start_time, end_time):

    """Source and preprocess inference data from weather and visitor center sources.

    This function fetches weather and visitor center data, merges them, and computes additional features
    such as nearest holiday distance, daily max values, and moving z-scores.

    Returns:
        pd.DataFrame: DataFrame containing preprocessed inference data.
    """
    print(f"Sourcing and preprocessing inference data at {datetime.now()}...")    

    join_df = join_inference_data(weather_data_inference, hourly_visitor_center_data)


    # Get z scores for the weather columns
    inference_data_with_distances = add_nearest_holiday_distance(join_df)


    inference_data_with_daily_max = add_daily_max_values(inference_data_with_distances, weather_columns_for_zscores)

    inference_data_with_new_features = add_moving_z_scores(inference_data_with_daily_max, 
                                                           weather_columns_for_zscores, 
                                                           window_size_for_zscores)



    # Apply the cyclic and categorical trasformations from the training dataset (same as the training dataset)
    inference_data_with_coco_encoding = process_transformations(inference_data_with_new_features)

    # Slice the data to keep only rows within the next 10 days
    inference_data_with_coco_encoding = inference_data_with_coco_encoding[
        (inference_data_with_coco_encoding["Time"] >= start_time) & 
        (inference_data_with_coco_encoding["Time"] < end_time)

        ]

    #set Time column as index   
    inference_data_with_coco_encoding = inference_data_with_coco_encoding.set_index('Time')
    #drop column named Date
    inference_data_with_coco_encoding = inference_data_with_coco_encoding.drop(columns=['Date'])


    return inference_data_with_coco_encoding

run_inference(preprocessed_hourly_visitor_center_data)

Run the inference pipeline. Fetches the latest weather forecasts, preprocesses data, and makes predictions.

Parameters:

Name Type Description Default
preprocessed_hourly_visitor_center_data DataFrame

The preprocessed hourly visitor center data.

required

Returns:

Type Description

None

Source code in src/prediction_pipeline/modeling/run_inference.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@st.fragment(run_every="3h")
def run_inference(preprocessed_hourly_visitor_center_data):

    """
    Run the inference pipeline. Fetches the latest weather forecasts, preprocesses data, and makes predictions.

    Args:
        preprocessed_hourly_visitor_center_data (pd.DataFrame): The preprocessed hourly visitor center data.

    Returns:
        None
    """

    # get the weather data for inference
    def get_today_midnight_berlin():
        # Set the timezone to Berlin (CET or CEST)
        berlin_tz = pytz.timezone('Europe/Berlin')

        # Get the current time in Berlin
        now_berlin = datetime.now(berlin_tz)

        # Replace the hour, minute, second, and microsecond with 0 to get today at 00:00
        day_today_berlin = now_berlin.date()

        # Convert day_today_berlin to datetime
        day_today_berlin = datetime.combine(day_today_berlin, datetime.min.time())

        return day_today_berlin

    today = get_today_midnight_berlin()
    start_inference_time = today - pd.Timedelta(days=10)
    end_inference_time = today + pd.Timedelta(days=7)
    print(f"Running inference part from {start_inference_time} to {end_inference_time}...")

    weather_data_inference = source_weather_data(start_time=start_inference_time, end_time=end_inference_time)

    # preprocess the inference data
    inference_df = source_preprocess_inference_data(weather_data_inference, preprocessed_hourly_visitor_center_data, start_time=today, end_time=end_inference_time)

    # make predictions
    overall_visitor_predictions = visitor_predictions(inference_df) 

    return overall_visitor_predictions

change_datatypes(df, dtype_dict)

Change column datatypes based on dtype_dict.

Source code in src/prediction_pipeline/modeling/source_and_feature_selection.py
269
270
271
272
273
274
275
276
def change_datatypes(df: pd.DataFrame, dtype_dict: dict) -> pd.DataFrame:
    """Change column datatypes based on dtype_dict."""
    for dtype, columns in dtype_dict.items():
        df[columns] = df[columns].astype(dtype)

    # set time as the index
    df = df.set_index('Time')
    return df

filter_features_for_modelling(df)

Filter the features for modelling.

Source code in src/prediction_pipeline/modeling/source_and_feature_selection.py
391
392
393
394
395
396
def filter_features_for_modelling(df: pd.DataFrame) -> pd.DataFrame:
    """Filter the features for modelling."""
    # Filter the features for modelling
    df = df[numeric_features_for_modelling + categorical_features_for_modelling + target_vars_et]

    return df

get_regionwise_IN_and_OUT_columns(df)

Preprocess the data by summing IN and OUT columns for each region.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame containing the data to preprocess.

required

Returns:

Type Description
DataFrame

pd.DataFrame: The preprocessed DataFrame.

Source code in src/prediction_pipeline/modeling/source_and_feature_selection.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def get_regionwise_IN_and_OUT_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Preprocess the data by summing IN and OUT columns for each region.

    Args:
        df (pd.DataFrame): The DataFrame containing the data to preprocess.

    Returns:
        pd.DataFrame: The preprocessed DataFrame.
    """

    # Iterate over the regionwise sensor mapping
    for region, sensors in regionwise_sensor_mapping.items():
        # Sum the values for all IN columns of the current region
        if sensors['IN']:
            df[f'{region} IN'] = df[sensors['IN']].sum(axis=1, min_count=1)

        # Sum the values for all OUT columns of the current region
        if sensors['OUT']:
            df[f'{region} OUT'] = df[sensors['OUT']].sum(axis=1, min_count=1)

    return df

load_csv_files_from_aws_s3(path, **kwargs)

Loads individual or multiple CSV files from an AWS S3 bucket. Args: path (str): The path to the CSV files on AWS S3. **kwargs: Additional arguments to pass to the read_csv function. Returns: pd.DataFrame: The DataFrame containing the data from the CSV files.

Source code in src/prediction_pipeline/modeling/source_and_feature_selection.py
222
223
224
225
226
227
228
229
230
231
232
233
def load_csv_files_from_aws_s3(path: str, **kwargs) -> pd.DataFrame:
    """ 
    Loads individual or multiple CSV files from an AWS S3 bucket.
    Args:
        path (str): The path to the CSV files on AWS S3.
        **kwargs: Additional arguments to pass to the read_csv function.
    Returns:
        pd.DataFrame: The DataFrame containing the data from the CSV files.
    """
    df = wr.s3.read_csv(path=path, **kwargs)
    # set time to datetime
    return df

merge_new_features(df, df_newfeatures)

Merges new features from df_newfeatures into df on 'Time' column.

Source code in src/prediction_pipeline/modeling/source_and_feature_selection.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def merge_new_features(df: pd.DataFrame, df_newfeatures: pd.DataFrame) -> pd.DataFrame:
    """Merges new features from df_newfeatures into df on 'Time' column."""

    # Ensure 'Time' columns are datetime
    df, df_newfeatures = map(lambda x: convert_to_datetime(x, ['Time']), [df, df_newfeatures])

    # Columns to merge
    columns_to_add = [
        'ZScore_Daily_Max_Temperature (°C)',
        'ZScore_Daily_Max_Relative Humidity (%)',
        'ZScore_Daily_Max_Wind Speed (km/h)',
        'Distance_to_Nearest_Holiday_Bayern',
        'Distance_to_Nearest_Holiday_CZ'
    ]

    # Select only existing columns for merging
    selected_columns = [col for col in columns_to_add if col in df_newfeatures.columns]

    # Merge on 'Time' column
    df = df.merge(df_newfeatures[['Time'] + selected_columns], on='Time', how='left')

   #add the hour column additionally
    df['Hour'] = df['Time'].dt.hour

    return df

process_transformations(df)

Process the transformations on the DataFrame.

Source code in src/prediction_pipeline/modeling/source_and_feature_selection.py
381
382
383
384
385
386
387
388
389
def process_transformations(df: pd.DataFrame) -> pd.DataFrame:
    """Process the transformations on the DataFrame."""
    df = apply_cliclic_tranformations(df, cyclic_features = ['Tag','Hour', 'Monat', 'Wochentag'])
    df = standardize_numeric_features(df, standardize_features = ['Temperature (°C)', 'Relative Humidity (%)', 'Wind Speed (km/h)',
                                                                  'Distance_to_Nearest_Holiday_Bayern','Distance_to_Nearest_Holiday_CZ'])
    df = get_dummy_encodings(df, columns_to_use = ['Jahreszeit', 'coco_2'])
    df = handle_binary_values(df)

    return df

build_lstm_model(input_shape, output_size)

Build and compile the LSTM model.

Parameters:

Name Type Description Default
input_shape tuple

Shape of the input data (time_steps, features).

required
output_size int

Number of target variables.

required

Returns:

Name Type Description
model Model

Compiled LSTM model.

Source code in src/prediction_pipeline/modeling/train_lstm.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def build_lstm_model(input_shape, output_size):
    """
    Build and compile the LSTM model.

    Parameters:
        input_shape (tuple): Shape of the input data (time_steps, features).
        output_size (int): Number of target variables.

    Returns:
        model (tensorflow.keras.Model): Compiled LSTM model.
    """
    model = Sequential()
    model.add(Input(shape=input_shape))
    model.add(Bidirectional(LSTM(128, return_sequences=True)))
    model.add(Dropout(0.3))
    model.add(Bidirectional(LSTM(64)))
    model.add(Dropout(0.3))
    model.add(Dense(output_size))  # Output layer with the number of target variables
    model.compile(optimizer='adam', loss='mse')
    return model

create_sequences(X, y, window_size)

Create sequences from the data for LSTM input.

Parameters:

Name Type Description Default
X ndarray

Feature array.

required
y ndarray

Target variable array.

required
window_size int

The number of time steps to include in each sequence.

required

Returns:

Name Type Description
X_seq ndarray

Sequence features.

y_seq ndarray

Sequence targets.

Source code in src/prediction_pipeline/modeling/train_lstm.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def create_sequences(X, y, window_size):
    """
    Create sequences from the data for LSTM input.

    Parameters:
        X (np.ndarray): Feature array.
        y (np.ndarray): Target variable array.
        window_size (int): The number of time steps to include in each sequence.

    Returns:
        X_seq (np.ndarray): Sequence features.
        y_seq (np.ndarray): Sequence targets.
    """
    X_seq, y_seq = [], []
    for i in range(len(X) - window_size):
        X_seq.append(X[i:i + window_size])
        y_seq.append(y[i + window_size])
    return np.array(X_seq), np.array(y_seq)

load_data()

Load preprocessed data from the feature_selection_and_preprocessing module. Returns: df (pd.DataFrame): The preprocessed dataframe with features and target variables.

Source code in src/prediction_pipeline/modeling/train_lstm.py
 9
10
11
12
13
14
15
16
def load_data():
    """
    Load preprocessed data from the feature_selection_and_preprocessing module.
    Returns:
        df (pd.DataFrame): The preprocessed dataframe with features and target variables.
    """
    from feature_selection_and_preprocessing import get_preprocessed_data
    return get_preprocessed_data()

save_model(model, model_path='lstm_model.h5')

Save the trained model to a file.

Parameters:

Name Type Description Default
model Model

The trained LSTM model.

required
model_path str

Path to save the model file.

'lstm_model.h5'
Source code in src/prediction_pipeline/modeling/train_lstm.py
109
110
111
112
113
114
115
116
117
def save_model(model, model_path='lstm_model.h5'):
    """
    Save the trained model to a file.

    Parameters:
        model (tensorflow.keras.Model): The trained LSTM model.
        model_path (str): Path to save the model file.
    """
    model.save(model_path)

split_features_targets(df, target_vars)

Split the dataframe into features and target variables.

Parameters:

Name Type Description Default
df DataFrame

The dataframe with features and target variables.

required
target_vars list

List of target variable column names.

required

Returns:

Name Type Description
X ndarray

Feature array.

y ndarray

Target variable array.

Source code in src/prediction_pipeline/modeling/train_lstm.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def split_features_targets(df, target_vars):
    """
    Split the dataframe into features and target variables.

    Parameters:
        df (pd.DataFrame): The dataframe with features and target variables.
        target_vars (list): List of target variable column names.

    Returns:
        X (np.ndarray): Feature array.
        y (np.ndarray): Target variable array.
    """
    X = df.drop(columns=target_vars).values  # Features
    y = df[target_vars].values  # Target variables
    return X, y

split_train_test(X, y, test_size=0.1)

Split data into training and test sets.

Parameters:

Name Type Description Default
X ndarray

Feature array.

required
y ndarray

Target variable array.

required
test_size float

Proportion of the dataset to include in the test split.

0.1

Returns:

Name Type Description
X_train ndarray

Training features.

X_eval ndarray

Evaluation features.

y_train ndarray

Training targets.

y_eval ndarray

Evaluation targets.

Source code in src/prediction_pipeline/modeling/train_lstm.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def split_train_test(X, y, test_size=0.1):
    """
    Split data into training and test sets.

    Parameters:
        X (np.ndarray): Feature array.
        y (np.ndarray): Target variable array.
        test_size (float): Proportion of the dataset to include in the test split.

    Returns:
        X_train (np.ndarray): Training features.
        X_eval (np.ndarray): Evaluation features.
        y_train (np.ndarray): Training targets.
        y_eval (np.ndarray): Evaluation targets.
    """
    return train_test_split(X, y, test_size=test_size, shuffle=False)

train_model(model, X_train_seq, y_train_seq, epochs=50, batch_size=64)

Train the LSTM model.

Parameters:

Name Type Description Default
model Model

The compiled LSTM model.

required
X_train_seq ndarray

Training sequence features.

required
y_train_seq ndarray

Training sequence targets.

required
epochs int

Number of epochs to train the model.

50
batch_size int

Batch size for training.

64

Returns:

Name Type Description
history History

Training history.

Source code in src/prediction_pipeline/modeling/train_lstm.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def train_model(model, X_train_seq, y_train_seq, epochs=50, batch_size=64):
    """
    Train the LSTM model.

    Parameters:
        model (tensorflow.keras.Model): The compiled LSTM model.
        X_train_seq (np.ndarray): Training sequence features.
        y_train_seq (np.ndarray): Training sequence targets.
        epochs (int): Number of epochs to train the model.
        batch_size (int): Batch size for training.

    Returns:
        history (tensorflow.keras.callbacks.History): Training history.
    """
    early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
    history = model.fit(X_train_seq, y_train_seq, epochs=epochs, batch_size=batch_size, validation_split=0.1, callbacks=[early_stopping])
    return history

training_pipeline()

Main function to execute the training pipeline for the LSTM model.

Source code in src/prediction_pipeline/modeling/train_lstm.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def training_pipeline():
    """
    Main function to execute the training pipeline for the LSTM model.
    """
    # Load preprocessed data
    df = load_data()

    # Define target variables
    target_vars = ['traffic_abs', 'sum_IN_abs', 'sum_OUT_abs', 
                   'Lusen-Mauth-Finsterau IN', 'Lusen-Mauth-Finsterau OUT', 
                   'Falkenstein-Schwellhäusl IN', 'Falkenstein-Schwellhäusl OUT', 
                   'Rachel-Spiegelau IN', 'Rachel-Spiegelau OUT',  
                   'Nationalparkzentrum Lusen IN', 'Nationalparkzentrum Lusen OUT', 
                   'Scheuereck-Schachten-Trinkwassertalsperre IN', 
                   'Scheuereck-Schachten-Trinkwassertalsperre OUT', 
                   'Nationalparkzentrum Falkenstein IN', 'Nationalparkzentrum Falkenstein OUT']

    # Split features and target variables
    X, y = split_features_targets(df, target_vars)

    # Split into training and evaluation sets
    X_train, X_eval, y_train, y_eval = split_train_test(X, y)

    # Create sequences
    window_size = 168  # 7 days, assuming data is hourly
    X_train_seq, y_train_seq = create_sequences(X_train, y_train, window_size)
    X_eval_seq, y_eval_seq = create_sequences(X_eval, y_eval, window_size)

    # Build the LSTM model
    input_shape = (X_train_seq.shape[1], X_train_seq.shape[2])
    output_size = y_train_seq.shape[1]
    model = build_lstm_model(input_shape, output_size)

    # Train the model
    train_model(model, X_train_seq, y_train_seq)

    # Save the trained model
    save_model(model)

create_uuid()

Creates a unique identifier string.

Returns:

Name Type Description
str str

A unique identifier string.

Source code in src/prediction_pipeline/modeling/train_regressor.py
34
35
36
37
38
39
40
41
42
def create_uuid() -> str:
    """Creates a unique identifier string.

    Returns:
        str: A unique identifier string.
    """
    unique_id = str(uuid.uuid4())

    return unique_id

save_models_to_aws_s3(model, save_path_models, model_name, local_path, uuid)

Save the model to AWS S3.

Parameters:

Name Type Description Default
model

The model to save.

required
save_path_models str

The path to the CSV files on AWS S3.

required
model_name str

The name of the model.

required
local_path str

The local path to the model.

required
uuid str

The unique identifier string.

required

Returns:

Type Description
None

None

Source code in src/prediction_pipeline/modeling/train_regressor.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def save_models_to_aws_s3(model, save_path_models: str, model_name: str, local_path: str, uuid: str) -> None:
    """Save the model to AWS S3.

    Args:
        model: The model to save.
        save_path_models (str): The path to the CSV files on AWS S3.
        model_name (str): The name of the model.
        local_path (str): The local path to the model.
        uuid (str): The unique identifier string.

    Returns:
        None
    """

    # make the save path if it does not exist
    if not os.path.exists(local_path):
        os.makedirs(local_path)

    save_model_path = os.path.join(local_path, model_name)
    save_model(model, save_model_path, model_only=True)

    save_path_aws = f"s3://{aws_s3_bucket}/{save_path_models}/{uuid}/{model_name}.pkl"

    wr.s3.upload(f"{save_model_path}.pkl",save_path_aws)
    print(f"Model saved in AWS S3 under {save_path_aws}")
    return

save_predictions_to_aws_s3(df, save_path_predictions, filename, uuid)

Writes an individual CSV file to AWS S3.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to write.

required
save_path_predictions str

The path to the CSV files on AWS S3.

required
filename str

The name of the CSV file.

required
uuid str

The unique identifier string.

required

Returns:

Type Description
None

None

Source code in src/prediction_pipeline/modeling/train_regressor.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def save_predictions_to_aws_s3(df: pd.DataFrame, save_path_predictions: str, filename: str, uuid: str) -> None:
    """Writes an individual CSV file to AWS S3.

    Args:
        df (pd.DataFrame): The DataFrame to write.
        save_path_predictions (str): The path to the CSV files on AWS S3.
        filename (str): The name of the CSV file.
        uuid (str): The unique identifier string.

    Returns:
        None
    """

    aws_s3_path = f"s3://{aws_s3_bucket}/{save_path_predictions}/{uuid}/{filename}"

    wr.s3.to_parquet(df, path=aws_s3_path,index= True)
    print(f"Predictions for test data saved in AWS S3 under {aws_s3_path}")
    return