Source code for pybbda.data.sources.statcast.data

import logging
import datetime

import pandas as pd

from .constants import (
    STATCAST_PBP_DAILY_URL_FORMAT,
    STATCAST_PBP_PLAYER_URL_FORMAT,
    STATCAST_PBP_DAILY_DF_DATA_TYPES,
    STATCAST_QUERY_DATA_SIZE_LIMIT,
)
from .utils import get_statcast_tables
from pybbda import PYBBDA_DATA_ROOT
from pybbda.data.sources.data_source.base import DataSource

STATCAST_DATA_PATH = PYBBDA_DATA_ROOT / "statcast"

STATCAST_TABLES = get_statcast_tables(min_year=2016, max_year=2019)

STATCAST_URLS = {"statcast_daily": STATCAST_PBP_DAILY_URL_FORMAT}

logger = logging.getLogger(__name__)


[docs]class StatcastData(DataSource): _SOURCE_DATA_PATH = STATCAST_DATA_PATH _SOURCE_TABLES = STATCAST_TABLES _SOURCE_URLS = STATCAST_URLS
[docs] def get_statcast_daily(self, player_type, start_date, end_date, player_id=""): """ Gets pitch level data from baseball savant query page. player_type can be batter or pitcher. The fetched data are the same but the player that the player_name column refers to is different. start_date and end_date are inclusive, for example to fetch data from 2019-05-01 only, set start_date = end_date = "2018-05-01". If player_id is specified. then only data for pitches involving that player will be returned. The id here is the MLBAM player id which is an integer. :param player_type: str. can be batter or pitcher :param start_date: str in %Y-%m-%d format :param end_date: str in %Y-%m-%d format :param player_id: str or int. mlbam player id :return: pandas data frame """ self._validate_dates(start_date, end_date) if player_type == "pitcher": player_id_var = "pitchers_lookup%5B%5D" elif player_type == "batter": player_id_var = "batters_lookup%5B%5D" else: raise ValueError(f"player_type must be (pitcher, batter) not {player_type}") url_formatter = ( STATCAST_PBP_PLAYER_URL_FORMAT if player_id else STATCAST_PBP_DAILY_URL_FORMAT ) url = url_formatter.format( **{ "player_id_var": player_id_var, "player_id": player_id, "player_type": player_type, "season": start_date[0:4], "start_date": start_date, "end_date": end_date, } ) daily_df = pd.read_csv(url) if len(daily_df) == STATCAST_QUERY_DATA_SIZE_LIMIT: logger.warning( "Statcast query returned %d rows which probably " "means you've exceeded the data limit. " "You should try to break up the query.", STATCAST_QUERY_DATA_SIZE_LIMIT, ) if daily_df.shape == (1, 1): logger.warning( "only one column was returned " "which probably means there was a query error." ) return None return self._format_daily_df(daily_df)
# TODO: refactor to make more general or use pychadwick version
[docs] @staticmethod def convert_data_frame_types(df, data_type_mapping): for column_name, data_type_conversion in data_type_mapping.items(): if column_name in df: try: df.loc[:, column_name] = df.loc[:, column_name].astype( data_type_conversion ) except TypeError: raise TypeError(f"Cannot convert column {column_name}") return df
def _format_daily_df(self, daily_df, data_type_mapping=None): data_type_mapping = data_type_mapping or STATCAST_PBP_DAILY_DF_DATA_TYPES return self.convert_data_frame_types(daily_df, data_type_mapping) def _validate_dates(self, start_date, end_date): if start_date > end_date: raise ValueError( f"start_date ({start_date}) cannot be " f"greater than end_date ({end_date})" ) start_dt = datetime.datetime.strptime(start_date, "%Y-%m-%d") end_dt = datetime.datetime.strptime(end_date, "%Y-%m-%d") dt_days = (end_dt - start_dt).days if dt_days >= 7: logger.warning( "end_date is greater than 6 days ahead of start date (%d days). " "This is likely to cause the query to exceed data limits of the API.", dt_days, )