data/init.py

95 lines
3.1 KiB
Python
Raw Permalink Normal View History

2024-12-30 22:42:53 +00:00
import os
import pandas as pd
from sqlalchemy.orm import Session
from sqlalchemy import create_engine
from app.database import SessionLocal, Base, engine
from app.schema import ADAUSDT, SHIBUSDT, BTCUSDT # Dodaj inne modele, jeśli istnieją
# Mapowanie nazwy pary na model
COIN_MODELS = {
"ADAUSDT": ADAUSDT,
"SHIBUSDT": SHIBUSDT,
"BTCUSDT": BTCUSDT,
}
# Ścieżka do katalogu z danymi
DATA_DIR = "./data"
def load_csv_to_db(session: Session, coin: str, csv_file: str):
"""Wczytuje dane z pliku CSV do bazy danych."""
print(f"Przetwarzanie pliku: {csv_file} dla {coin}")
# Wczytaj dane z pliku CSV
data = pd.read_csv(csv_file)
# Oczekiwane kolumny w CSV
expected_columns = [
"timestamp", "open", "high", "low", "close", "volume",
"close_time", "quote_asset_volume", "number_of_trades",
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
]
# Sprawdź, czy wszystkie kolumny istnieją
if not all(col in data.columns for col in expected_columns):
print(f"Błąd: Plik {csv_file} nie zawiera wszystkich wymaganych kolumn.")
return
# Przekształć dane do listy obiektów SQLAlchemy
rows = [
COIN_MODELS[coin](
timestamp=row["timestamp"],
open=row["open"],
high=row["high"],
low=row["low"],
close=row["close"],
volume=row["volume"],
close_time=row["close_time"],
quote_asset_volume=row["quote_asset_volume"],
number_of_trades=row["number_of_trades"],
taker_buy_base_asset_volume=row["taker_buy_base_asset_volume"],
taker_buy_quote_asset_volume=row["taker_buy_quote_asset_volume"],
ignore=row["ignore"],
)
for _, row in data.iterrows()
]
# Wstaw dane do bazy
session.bulk_save_objects(rows)
session.commit()
print(f"Zapisano dane z {csv_file} do tabeli {coin.lower()}.")
def process_coin_data():
"""Główna funkcja przetwarzająca dane dla wszystkich kryptowalut."""
# Stwórz sesję bazy danych
session = SessionLocal()
try:
for coin, model in COIN_MODELS.items():
coin_dir = os.path.join(DATA_DIR, coin)
if not os.path.exists(coin_dir):
print(f"Katalog danych dla {coin} nie istnieje: {coin_dir}")
continue
# Pobierz listę plików CSV i posortuj je
csv_files = sorted(
[
os.path.join(coin_dir, f) for f in os.listdir(coin_dir)
if f.endswith(".csv")
]
)
# Przetwarzaj pliki CSV w kolejności
for csv_file in csv_files:
load_csv_to_db(session, coin, csv_file)
print("Przetwarzanie danych zakończone.")
finally:
session.close()
if __name__ == "__main__":
# Upewnij się, że wszystkie tabele istnieją w bazie
Base.metadata.create_all(bind=engine)
# Rozpocznij przetwarzanie danych
process_coin_data()