95 lines
3.1 KiB
Python
95 lines
3.1 KiB
Python
import os
|
|
import pandas as pd
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import create_engine
|
|
from app.database import SessionLocal, Base, engine
|
|
from app.models import ADAUSDT, SHIBUSDT, BTCUSDT # Dodaj inne modele, jeśli istnieją
|
|
|
|
# Mapowanie nazwy pary na model
|
|
COIN_MODELS = {
|
|
"ADAUSDT": ADAUSDT,
|
|
"SHIBUSDT": SHIBUSDT,
|
|
"BTCUSDT": BTCUSDT,
|
|
}
|
|
|
|
# Ścieżka do katalogu z danymi
|
|
DATA_DIR = "./data"
|
|
|
|
def load_csv_to_db(session: Session, coin: str, csv_file: str):
|
|
"""Wczytuje dane z pliku CSV do bazy danych."""
|
|
print(f"Przetwarzanie pliku: {csv_file} dla {coin}")
|
|
# Wczytaj dane z pliku CSV
|
|
data = pd.read_csv(csv_file)
|
|
|
|
# Oczekiwane kolumny w CSV
|
|
expected_columns = [
|
|
"timestamp", "open", "high", "low", "close", "volume",
|
|
"close_time", "quote_asset_volume", "number_of_trades",
|
|
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
|
|
]
|
|
|
|
# Sprawdź, czy wszystkie kolumny istnieją
|
|
if not all(col in data.columns for col in expected_columns):
|
|
print(f"Błąd: Plik {csv_file} nie zawiera wszystkich wymaganych kolumn.")
|
|
return
|
|
|
|
# Przekształć dane do listy obiektów SQLAlchemy
|
|
rows = [
|
|
COIN_MODELS[coin](
|
|
timestamp=row["timestamp"],
|
|
open=row["open"],
|
|
high=row["high"],
|
|
low=row["low"],
|
|
close=row["close"],
|
|
volume=row["volume"],
|
|
close_time=row["close_time"],
|
|
quote_asset_volume=row["quote_asset_volume"],
|
|
number_of_trades=row["number_of_trades"],
|
|
taker_buy_base_asset_volume=row["taker_buy_base_asset_volume"],
|
|
taker_buy_quote_asset_volume=row["taker_buy_quote_asset_volume"],
|
|
ignore=row["ignore"],
|
|
)
|
|
for _, row in data.iterrows()
|
|
]
|
|
|
|
# Wstaw dane do bazy
|
|
session.bulk_save_objects(rows)
|
|
session.commit()
|
|
print(f"Zapisano dane z {csv_file} do tabeli {coin.lower()}.")
|
|
|
|
def process_coin_data():
|
|
"""Główna funkcja przetwarzająca dane dla wszystkich kryptowalut."""
|
|
# Stwórz sesję bazy danych
|
|
session = SessionLocal()
|
|
|
|
try:
|
|
for coin, model in COIN_MODELS.items():
|
|
coin_dir = os.path.join(DATA_DIR, coin)
|
|
if not os.path.exists(coin_dir):
|
|
print(f"Katalog danych dla {coin} nie istnieje: {coin_dir}")
|
|
continue
|
|
|
|
# Pobierz listę plików CSV i posortuj je
|
|
csv_files = sorted(
|
|
[
|
|
os.path.join(coin_dir, f) for f in os.listdir(coin_dir)
|
|
if f.endswith(".csv")
|
|
]
|
|
)
|
|
|
|
# Przetwarzaj pliki CSV w kolejności
|
|
for csv_file in csv_files:
|
|
load_csv_to_db(session, coin, csv_file)
|
|
|
|
print("Przetwarzanie danych zakończone.")
|
|
finally:
|
|
session.close()
|
|
|
|
if __name__ == "__main__":
|
|
# Upewnij się, że wszystkie tabele istnieją w bazie
|
|
Base.metadata.create_all(bind=engine)
|
|
|
|
# Rozpocznij przetwarzanie danych
|
|
process_coin_data()
|
|
|