"""Base module for fyda."""
import json
import os
import pickle
import warnings
from configparser import ConfigParser
import yaml
from io import BytesIO
import numpy as np
import pandas as pd
from . import options
from .errorhandling import NoShortcutError
# TODO
# Option values for behavior with duplicates. (Overwrite/keep/rename)
# Sanity checks for file assignment in .fydarc. Possibly get flexible there.
# Future idea: some way to search through files/shortcuts; like fuzzy search
# Integrate cloud-based file loading directly into load(). i.e. point ``root``
# to a bucket, and have fyda work its magic from there.
# Better path handling in .fydarc. e.g. when quotation marks appear in the path
# Update displayed shortcuts using values from .fydarc
# Automatic recursive directory shortcuts, much like how files work.
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
def _get_conf(): # Allows the user to change configuration path dynamically
if not options.CONFIG_LOCATION:
return options.locate_config()
return options.CONFIG_LOCATION
# TODO mention these in docs about writing .fydarc
LOCATION = 0 # For accessing filepaths under "data" in config
KWARGS = 1 # For accessing keyword arguments under "data" in config
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
[docs]class ProjectConfig(ConfigParser):
"""
Configuration manager.
Notes
-----
This class is a wrapper around :class:`configparser.ConfigParser` with
the added benefit of automatically reading the configuration file on
instantiation. If the configuration file doesn't exist when
:class:`ConfigParser` is called, it is created in the environment
automatically.
See also
--------
:py:class:`configparser.ConfigParser` : configuration parsing class
"""
def __init__(self, make_config=True):
super().__init__()
if (not os.path.exists(_get_conf())) and make_config:
self.add_section('directories')
self.add_section('data')
_write_config(self)
self.read(_get_conf())
[docs]class DataBank:
"""
Interact with the system's data automatically.
Parameters
----------
root : str
Path to root data folder. If none is provided, uses the default from
``.fydarc`` given by the ``conf_path`` parameter.
"""
def __init__(self, root=None, error='ignore'):
if root is None:
pc = load_config()
try:
self.root = os.path.abspath(
os.path.join(os.path.dirname(_get_conf()),
pc['directories']['root']))
except KeyError:
self.root = os.path.join(os.getcwd(), 'data')
else:
self.root = root
self._root = self.root # For legacy API support
self._data = {}
self._reader_map = {}
self._forbid = {}
self._tree = self.root_to_dict(self.root, error=error)
# TODO rcusers information to avoid overwriting values set in config
# We access attributes this way because dict is mutable
# TODO: any way to warn people when they try to change these?
@property
def tree(self):
"""Full tree of data root directory in python dictionary form."""
return self.root_to_dict(self.root)
@property
def shortcuts(self):
"""Mapping of shortcuts to absolute paths."""
# TODO .fydarc data shortcuts should be in here as well.
return self._data.copy()
@property
def readers(self):
"""Mapping of shortcuts to their respective readers."""
return self._reader_map.copy()
def _determine_path(self, input_string):
"""Determine the actual file location, based on input string."""
pc = load_config()
# .fydarc takes priority
if input_string in pc['data'].keys():
return os.path.abspath(
os.path.join(self.root, _get_data_location(
input_string, pc)))
try: # Second check shortcuts
filename = self.shortcuts[input_string]
except KeyError:
if os.path.splitext(input_string)[1] == '':
raise NoShortcutError(input_string)
filename = os.path.join(self.root, input_string)
try: # Then see if it is a path relative to data root
with open(filename):
pass
except (FileNotFoundError, PermissionError):
# Otherwise, just take original string
filename = input_string
return filename
def _kill_check(self, filepath):
"""Use to stop a process if filepath is already in data dict."""
return os.path.abspath(filepath) in self._data.values()
[docs] def deposit(self, filepath, shortcut=None, reader=None, error='raise'):
"""
Store a shortcut and reader reference for the given file name.
Parameters
----------
filepath : str
Name of the file to store.
shortcut : str
Shortcut to deposit.
reader : callable, (optional)
If provided, maps shortcut to this callable whenever the data is
called to open. If none is provided, a reader will be automatically
assigned.
error : str
If set to 'ignore', ignores any errors when picking a file reader.
"""
# If we don't check, rebase recursion will ruin everything
if self._kill_check(filepath):
warnings.warn('Attempted to add already existing file "{}" to '
'DataBank. Killing process.'.format(filepath))
return
# Shortcut determination
if shortcut is None:
shortcut, rebase = self.determine_shortcut(filepath)
while rebase:
self.rebase_shortcuts(filepath)
shortcut, rebase = self.determine_shortcut(filepath)
elif shortcut in self.shortcuts.keys():
raise ValueError('Shortcut `{}` already in use.'.format(filepath))
# Reader determination
if reader is None:
reader = _pick_reader(filepath, error=error)
# Update user list
default = _default_shortcut(filepath)
# TODO make this better?
if default in self._forbid:
new_userlist = self._forbid[default]['in_use'] + [shortcut]
else:
new_userlist = [shortcut]
# Deposit new information
self._forbid.update({
default: {
'encode_level': self.encoding_level(default),
'in_use': new_userlist}})
self._reader_map.update({shortcut: reader})
self._data.update({shortcut: filepath})
[docs] def determine_shortcut(self, filepath):
"""
Get the shortcut for a filepath based on already deposited values.
Parameters
----------
filepath : str
Absolute path to the file in question.
Returns
-------
shortcut : str
String value to use for shortcut
rebase : bool
Whether or not we need to rebase the users of the default shortcut
code. This happens only if the current encoding level would create
a new duplicate value.
"""
# Base name without extension
default = _default_shortcut(filepath)
if default not in self._forbid:
return default, False
encode_level = self._forbid[default]['encode_level']
users = self._forbid[default]['in_use']
shortcut = _encode_shortcut(filepath, encode_level)
if shortcut not in users:
return shortcut, False
return default, True
[docs] def encoding_level(self, fileref):
"""Get the encoding level for given file reference."""
default = _default_shortcut(fileref)
if default in self._forbid:
return self._forbid[default]['encode_level']
return 0
[docs] def rebase_shortcuts(self, filepath):
"""
Detect if the filepath will cause a duplication conflict, and rebase if
necessary.
Parameters
----------
filepath : str
Absolute path to the file in question.
"""
if self._kill_check(filepath):
warnings.warn('Attempted to add already existing file "{}" to '
'DataBank. Killing process.'.format(filepath))
return
default = _default_shortcut(filepath)
encode_level = self._forbid[default]['encode_level']
users = self._forbid[default]['in_use']
shortcut = _encode_shortcut(filepath, encode_level)
conflict_exists = shortcut in users
# TODO preserve .fydarc users. i.e. don't rebase anything set by the
# rc file.
while conflict_exists:
encode_level += 1
self._forbid[default]['encode_level'] = encode_level
for user in users:
file_string = self._data.pop(user)
new_shortcut = _encode_shortcut(file_string, encode_level)
self._data[new_shortcut] = file_string
users = list(set(users) - {user}) + [new_shortcut]
conflict_exists = shortcut in users
self._forbid[default]['in_use'] = users
[docs] def root_to_dict(self, root, auto_deposit=True, error='raise'):
"""
Recursively convert root folder to native Python dictionary.
Parameters
----------
root : str or path-like
Path to root folder.
auto_deposit : bool
If True, automatically call :meth:`DataBank.deposit` on any files
found through the recursion to the ``shortcuts`` dict.
error : str, {'raise', 'ignore'}
Whether to ignore filetype errors or raise a
``NotImplementedError``.
Notes
-----
Modified from `this`<https://btmiller.com/2015/03/17/represent-file-structure-as-yaml-with-python.html>_
example by Blake Miller.
"""
directory = {}
for root_dir, dirnames, filenames in os.walk(root):
# Iterate through objects in this directory...
dn = os.path.basename(root_dir)
directory[dn] = {}
# If it's a file, set "basename": "abspath to file"
for f in filenames:
filepath = os.path.join(root, f)
directory[dn].update({_default_shortcut(f): f})
if auto_deposit:
self.deposit(filepath, error=error)
# If it's a directory, go down a level and start over
if dirnames:
for d in dirnames:
directory[dn].update(
self.root_to_dict(os.path.join(root, d), error=error))
break # We break here to stop the os.walk from doubling back
return directory
[docs] def withdraw(self, data_name, reader=None, kwarg_update_method='update',
**kwargs):
"""
Automatically load data, given shortcut to file.
Parameters
----------
data_name : str
Shortcut or filename.
reader : callable, (optional)
A function that takes either a string or object with a "read"
method.
kwarg_update_method : str, optional {'update', 'overwrite', 'rc'}
Returns
-------
data
Data as read by ``reader``.
"""
filename = self._determine_path(data_name)
if kwarg_update_method != 'overwrite':
try:
rckwargs = _get_data_kwargs(data_name, load_config())
except (IndexError, KeyError):
rckwargs = {}
if kwarg_update_method == 'update':
kwargs.update(rckwargs)
elif kwarg_update_method == 'rc':
kwargs = rckwargs
if reader is None:
try:
reader = self.readers[data_name]
except KeyError:
reader = _pick_reader(filename)
return _decode(reader, filename, **kwargs)
# -----------------------------------------------------------------------------
# Module-level library
# -----------------------------------------------------------------------------
def _check_bucket(bucket_name):
"""Sanity check on S3 bucket configuration."""
if bucket_name is None:
try:
pc = load_config()
bucket_name = pc['directories']['s3_bucket'][LOCATION]
except KeyError:
msg = ("Can't determine s3 bucket name. Either pass the "
"bucket_name explicitly, or add an s3_bucket "
"configuration value under ['directories'] in your .fydarc")
raise TypeError(msg)
return bucket_name
def _decode(reader, filename, **kwargs):
"""Successively try different methods to open ``filename`` with
``reader``."""
try: # First check if the reader is an open ``read`` method.
return reader(**kwargs)
except TypeError:
pass
try: # Next possibility is that the reader just needs a string reference
return reader(filename, **kwargs)
except TypeError:
pass
try: # Finally, check if we can open the string reference to read.
with open(filename, 'r') as fileobj:
return reader(fileobj, **kwargs)
except UnicodeDecodeError:
with open(filename, 'rb') as fileobj:
return reader(fileobj, **kwargs)
def _default_shortcut(filepath):
"""Get the default shortcut name for a file."""
return os.path.splitext(os.path.basename(filepath))[0]
def _encode_shortcut(filepath, encoding_level=0):
"""Get shortcut from filepath at given encoding level. 0 = base,
1 = base.ext, 2 = folder/base.ext, 3 = folder_up/folder/base.ext,
... etc."""
if not isinstance(encoding_level, int):
raise ValueError("Encoding level for shortcut not understood.")
elif encoding_level < 0:
raise ValueError("Encoding level for shortcut must be a positive "
"integer.")
elif encoding_level == 0:
return _default_shortcut(filepath)
shortcut = os.path.basename(filepath)
upstream = os.path.dirname(filepath)
for i in range(encoding_level - 1):
# Move shortcut up one folder
upfolder = os.path.basename(upstream)
shortcut = os.path.join(upfolder, shortcut)
# Move upstream up a folder
upstream = os.path.dirname(upstream)
return shortcut
def _check_location(directory_container):
"""Decide between str and list operations"""
if isinstance(directory_container, list):
return directory_container[LOCATION]
elif isinstance(directory_container, str):
return directory_container
else:
raise ValueError('Directory container type not understood.')
def _get_directory(shortcut, config):
"""Get the directory path with all the crazy required checks."""
return _check_location(config['directories'][shortcut])
def _get_data_location(shortcut, config):
"""Get the location of a data shortcut."""
return _check_location(config['data'][shortcut])
def _get_data_kwargs(shortcut, config):
"""Get the kwargs for a data shortcut, if they exist."""
directory_container = config['data'][shortcut]
if not isinstance(directory_container, list):
return {}
elif len(directory_container) < 2:
return {}
else:
return directory_container[KWARGS]
def _load_config(filepath=None):
"""For legacy support, new function is :meth:`load_config`"""
return load_config(filepath)
def _pick_reader(filename, error='raise'):
"""Reader selection based on ``filename`` extension."""
extension = os.path.splitext(filename)[-1]
if extension in ['.xlsx']:
return pd.read_excel
if extension == '.csv':
return pd.read_csv
if extension in ['.pickle', '.pkl']:
return pickle.load
if extension in ['.npy', '.npz']:
return np.load
if extension == '.json':
return json.load
if extension in ['.sas7bdat', '.xport']:
return pd.read_sas
if extension in ['.yml', '.yaml']:
def open_reader(x):
with open(x, 'r') as fileobj:
return yaml.safe_load(fileobj)
return open_reader
if extension == '.txt':
def open_reader(x):
with open(x, 'r') as fileobj:
return fileobj.read()
return open_reader
if error == 'ignore':
return
# TODO sometimes incorrect shortcut settings get found here, saying
# "extension '' not implemented yet". This kind of error should be found
# earlier than here.
raise NotImplementedError("Extension '%s' not implemented yet."
% extension)
def _write_config(config):
"""Writes config to .ini file"""
with open(_get_conf(), 'w') as configfile:
config.write(configfile)
# -----------------------------------------------------------------------------
# Public library
# -----------------------------------------------------------------------------
[docs]def data_path(shortcut, root=None):
"""
Return the absolute path to the file referenced by ``shortcut``.
Parameters
----------
shortcut : str
Shortcut reference for the file.
root : str
Root directory to use with :class:`DataBank`.
Returns
-------
path : str
Absolute path to file.
"""
db = DataBank(root)
# TODO all this logic should be inside the DataBank
if shortcut in db.shortcuts:
return os.path.abspath(db.shortcuts[shortcut])
else:
try:
return os.path.abspath(
os.path.join(db.root,
_get_data_location(shortcut, load_config())))
except KeyError:
raise NoShortcutError(shortcut)
def dir_path(shortcut, root=None):
"""
Return absolute path to the directory referenced by ``shortcut``.
Parameters
----------
shortcut : str
Shortcut reference for the folder/directory.
root : str
Root directory to use with :class:`DataBank`.
Returns
-------
path : str
Absolute path to directory.
"""
db = DataBank(root)
pc = load_config()
path = _get_directory(shortcut, pc)
if path[0] == '~':
return os.path.abspath(os.path.expanduser(path))
return os.path.abspath(os.path.join(db.root, path))
[docs]def load(file_name, **kwargs):
"""
Load data intelligently.
Parameters
----------
file_name : str or path-like
Files to load. These can be shortcuts or file paths.
"""
db = DataBank()
return db.withdraw(file_name, **kwargs)
def load_config(filepath=None):
"""Return fyda configuration file ('.fydarc') using YAML."""
if filepath is None:
filepath = _get_conf()
with open(filepath, 'r') as stream:
conf = yaml.safe_load(stream)
return conf
[docs]def load_s3(file_name, bucket_name=None, reader=None, **kwargs):
"""
Read a file from S3.
Parameters
----------
file_name : str
Absolute name of the file object as represented in S3 bucket.
bucket_name : str, (optional)
Bucket to load object from. If none used, uses bucket specification in
``.fydarc``.
reader : callable, (optional)
Function capable of reading the file. If none is passed, one will be
automatically assigned based on the file extension.
kwargs
Additional keyword arguments to pass to file reader.
Returns
-------
data
As read by reader object
"""
import boto3
bucket_name = _check_bucket(bucket_name)
if reader is None:
reader = _pick_reader(file_name)
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
with BytesIO() as data:
bucket.download_fileobj(file_name, data)
data.seek(0) # move back to the beginning after writing
obj = reader(data, **kwargs)
return obj