Skip to content
Snippets Groups Projects
DLC_data_gatherer.py 4 KiB
Newer Older
import os
from concurrent.futures import ThreadPoolExecutor

import numpy
import pandas

import pandas as pd

from main import get_steam_API_response, get_steamspy_API_response, STEAM_SPY_GAME_INFO, STEAM_API_LANGUAGE, \
    STEAM_GAME_INFO_URL, replace_owner_number_with_symbol


def get_dlc_data_steam(id):
    url = STEAM_GAME_INFO_URL + str(id) + STEAM_API_LANGUAGE
    response = get_steam_API_response(url, str(id))
    if not response:
        return pandas.Series({"appid": numpy.NaN, "name": numpy.NaN, "data_type": numpy.NaN, "price_eur": numpy.NaN,
                              "category": numpy.NaN, "main_publisher": numpy.NaN, "main_developer": numpy.NaN,
                              "release_date": numpy.NaN})
    data = response[str(id)]["data"]
    data_type = data["type"]
    name = data["name"]
    appid = data["steam_appid"]
    price_eur = data["price_overview"]["final"] if "price_overview" in data.keys() else ""
    category = [category["description"] for category in data["categories"]] if "categories" in data.keys() else []
    main_developer = data["developers"][0] if "developers" in data.keys() else ""
    main_publisher = data["publishers"][0] if "publishers" in data.keys() else ""
    release_date = data["release_date"]["date"] if "release_date" in data.keys() and "date" \
                                                   in data["release_date"].keys() else ""
    pass
    return pandas.Series({"appid": appid, "name": name, "data_type": data_type, "price_eur": price_eur,
                          "category": category, "main_publisher": main_publisher, "main_developer": main_developer,
                          "release_date": release_date})


def get_dlc_data_steamspy(id):
    url = STEAM_SPY_GAME_INFO + str(id)
    response = get_steamspy_API_response(url)
    if not response:
        return pandas.Series({"owners": numpy.NaN})
    owners = response["owners"]

    return pandas.Series({"owners": owners})


def get_dlc_data_for_list(dlc_list, num_threads=4):
    def get_combined_dlc_data(id):
        return get_dlc_data_steam(id).combine_first(get_dlc_data_steamspy(id))

    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        combined_results = list(executor.map(get_combined_dlc_data, [id for id in dlc_list]))
    df = pandas.concat(combined_results, axis=1, ignore_index=True).T
    return df

def get_dlc_for_df(df, **kwargs):
    dlc_column = df["dlc"].to_list()
    dlc_list = []
    for val in dlc_column:
        if val and (isinstance(val, str) and len(val) > 2):
            entry_str = val[1:-1]
            entries = [int(entry) for entry in entry_str.split(", ")]
            dlc_list += entries

    return get_dlc_data_for_list(dlc_list, **kwargs)


# def get_dlc_for_files_in_dir(path):
#     files = os.listdir(path)
#     df = None
#     for file_name in files:
#         # new_df = pandas.read_csv(os.path.join(os.getcwd(), path, os))
#         df = pandas.concat([df, pandas.read_csv(os.path.join(os.getcwd(), path, file_name), index_col=0)], axis=0,
#                            ignore_index=True) if df is not None \
#             else pandas.read_csv(os.path.join(os.getcwd(), path, file_name), index_col=0)
#
#     print(files)
#     return df


if __name__ == "__main__":

    dlc_path = "dlc_segments"
    path = "file_segments"
    files = os.listdir(path)
    files.sort(key=lambda f: int(''.join(filter(str.isdigit, f))))
    get_values = [int(''.join(filter(str.isdigit, f))) for f in files]
    for file_index, file_name in zip(get_values, files):
        if file_index not in [0, 1, 10, 11, 12]:
            data = pandas.read_csv(os.path.join(os.getcwd(), path, file_name), index_col=0)
            # data = data.iloc[0:10] # for testing
            full_df = get_dlc_for_df(data)
            full_df = replace_owner_number_with_symbol(full_df)
            file_name = "".join(["dlc_data", "_", str(file_index), ".csv"])
            file_path = os.path.join(os.getcwd(), dlc_path, file_name)

            full_df.to_csv(file_path)
            print(f"Saved file {file_name}!")