repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | from typing import Tuple |
| #2 | |
| #3 | from app.models import App, User |
| #4 | from sqlalchemy.orm import Session |
| #5 | |
| #6 | |
| #7 | def get_or_create_user(db: Session, user_id: str) -> User: |
| #8 | """Get or create a user with the given user_id""" |
| #9 | user = db.query(User).filter(User.user_id == user_id).first() |
| #10 | if not user: |
| #11 | user = User(user_id=user_id) |
| #12 | db.add(user) |
| #13 | db.commit() |
| #14 | db.refresh(user) |
| #15 | return user |
| #16 | |
| #17 | |
| #18 | def get_or_create_app(db: Session, user: User, app_id: str) -> App: |
| #19 | """Get or create an app for the given user""" |
| #20 | app = db.query(App).filter(App.owner_id == user.id, App.name == app_id).first() |
| #21 | if not app: |
| #22 | app = App(owner_id=user.id, name=app_id) |
| #23 | db.add(app) |
| #24 | db.commit() |
| #25 | db.refresh(app) |
| #26 | return app |
| #27 | |
| #28 | |
| #29 | def get_user_and_app(db: Session, user_id: str, app_id: str) -> Tuple[User, App]: |
| #30 | """Get or create both user and their app""" |
| #31 | user = get_or_create_user(db, user_id) |
| #32 | app = get_or_create_app(db, user, app_id) |
| #33 | return user, app |
| #34 |