zacetni test

This commit is contained in:
2025-05-14 18:22:18 +02:00
commit af4656db68
6 changed files with 330 additions and 0 deletions

149
main.py Normal file
View File

@@ -0,0 +1,149 @@
from fastapi import FastAPI
import psutil
import time
import datetime
from pydantic import BaseModel
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.orm import sessionmaker, declarative_base
from fastapi import HTTPException
import os
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
if os.environ.get("DATABASE_URL") == None:
os.environ["DATABASE_URL"] = "sqlite:///./wall_messages.db"
app = FastAPI()
start_time = time.time()
@app.get("/uptime")
def get_uptime():
uptime_seconds = int(time.time() - start_time)
uptime_str = str(datetime.timedelta(seconds=uptime_seconds))
return {"uptime_seconds": uptime_seconds, "uptime": uptime_str}
@app.get("/disk")
def get_disk_usage():
usage = psutil.disk_usage('/')
return {
"total": usage.total,
"used": usage.used,
"free": usage.free,
"percent": usage.percent
}
@app.get("/ram")
def get_ram_usage():
mem = psutil.virtual_memory()
return {
"total": mem.total,
"available": mem.available,
"used": mem.used,
"percent": mem.percent
}
@app.get("/cpu")
def get_cpu_usage():
cpu_percent = psutil.cpu_percent(interval=1)
return {"cpu_percent": cpu_percent}
DATABASE_URL = "sqlite:///./wall_messages.db"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(bind=engine)
Base = declarative_base()
class WallMessageDB(Base):
__tablename__ = "wall_messages"
id = Column(Integer, primary_key=True, index=True)
text = Column(String, nullable=False)
timestamp = Column(DateTime, default=datetime.datetime.utcnow(), nullable=False)
Base.metadata.create_all(bind=engine)
class WallMessageCreate(BaseModel):
text: str
class WallMessageRead(BaseModel):
id: int
text: str
timestamp: datetime.datetime
class Config:
orm_mode = True
@app.post("/wall", response_model=WallMessageRead)
def post_wall_message(msg: WallMessageCreate):
db = SessionLocal()
db_msg = WallMessageDB(text=msg.text)
db.add(db_msg)
db.commit()
db.refresh(db_msg)
db.close()
return db_msg
@app.get("/wall", response_model=list[WallMessageRead])
def get_wall_messages():
db = SessionLocal()
messages = db.query(WallMessageDB).order_by(WallMessageDB.timestamp.desc()).all()
db.close()
return messages
@app.get("/wall/{msg_id}", response_model=WallMessageRead)
def get_wall_message(msg_id: int):
db = SessionLocal()
msg = db.query(WallMessageDB).filter(WallMessageDB.id == msg_id).first()
db.close()
if not msg:
raise HTTPException(status_code=404, detail="Message not found")
return msg
@app.put("/wall/{msg_id}", response_model=WallMessageRead)
def update_wall_message(msg_id: int, msg: WallMessageCreate):
db = SessionLocal()
db_msg = db.query(WallMessageDB).filter(WallMessageDB.id == msg_id).first()
if not db_msg:
db.close()
raise HTTPException(status_code=404, detail="Message not found")
db_msg.text = msg.text
db.commit()
db.refresh(db_msg)
db.close()
return db_msg
@app.delete("/wall/{msg_id}")
def delete_wall_message(msg_id: int):
db = SessionLocal()
db_msg = db.query(WallMessageDB).filter(WallMessageDB.id == msg_id).first()
if not db_msg:
db.close()
raise HTTPException(status_code=404, detail="Message not found")
db.delete(db_msg)
db.commit()
db.close()
return {"status": "success", "message": f"Deleted message {msg_id}"}
class WallMessage(BaseModel):
message: str
wall_messages = []
@app.post("/wall")
def post_wall_message(msg: WallMessage):
wall_messages.append(msg.message)
return {"status": "success", "message": msg.message}
@app.get("/wall")
def get_wall_messages():
return wall_messages
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.mount("/", StaticFiles(directory=".", html=True), name="static")