import os import pandas as pd from sqlalchemy.orm import Session from sqlalchemy import create_engine from app.database import SessionLocal, Base, engine from app.schema import ADAUSDT, SHIBUSDT, BTCUSDT # Dodaj inne modele, jeśli istnieją # Mapowanie nazwy pary na model COIN_MODELS = { "ADAUSDT": ADAUSDT, "SHIBUSDT": SHIBUSDT, "BTCUSDT": BTCUSDT, } # Ścieżka do katalogu z danymi DATA_DIR = "./data" def load_csv_to_db(session: Session, coin: str, csv_file: str): """Wczytuje dane z pliku CSV do bazy danych.""" print(f"Przetwarzanie pliku: {csv_file} dla {coin}") # Wczytaj dane z pliku CSV data = pd.read_csv(csv_file) # Oczekiwane kolumny w CSV expected_columns = [ "timestamp", "open", "high", "low", "close", "volume", "close_time", "quote_asset_volume", "number_of_trades", "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore" ] # Sprawdź, czy wszystkie kolumny istnieją if not all(col in data.columns for col in expected_columns): print(f"Błąd: Plik {csv_file} nie zawiera wszystkich wymaganych kolumn.") return # Przekształć dane do listy obiektów SQLAlchemy rows = [ COIN_MODELS[coin]( timestamp=row["timestamp"], open=row["open"], high=row["high"], low=row["low"], close=row["close"], volume=row["volume"], close_time=row["close_time"], quote_asset_volume=row["quote_asset_volume"], number_of_trades=row["number_of_trades"], taker_buy_base_asset_volume=row["taker_buy_base_asset_volume"], taker_buy_quote_asset_volume=row["taker_buy_quote_asset_volume"], ignore=row["ignore"], ) for _, row in data.iterrows() ] # Wstaw dane do bazy session.bulk_save_objects(rows) session.commit() print(f"Zapisano dane z {csv_file} do tabeli {coin.lower()}.") def process_coin_data(): """Główna funkcja przetwarzająca dane dla wszystkich kryptowalut.""" # Stwórz sesję bazy danych session = SessionLocal() try: for coin, model in COIN_MODELS.items(): coin_dir = os.path.join(DATA_DIR, coin) if not os.path.exists(coin_dir): print(f"Katalog danych dla {coin} nie istnieje: {coin_dir}") continue # Pobierz listę plików CSV i posortuj je csv_files = sorted( [ os.path.join(coin_dir, f) for f in os.listdir(coin_dir) if f.endswith(".csv") ] ) # Przetwarzaj pliki CSV w kolejności for csv_file in csv_files: load_csv_to_db(session, coin, csv_file) print("Przetwarzanie danych zakończone.") finally: session.close() if __name__ == "__main__": # Upewnij się, że wszystkie tabele istnieją w bazie Base.metadata.create_all(bind=engine) # Rozpocznij przetwarzanie danych process_coin_data()