sso_client.py 1.0 KB

1234567891011121314151617181920212223242526272829303132
  1. import httpx
  2. from app.config import get_settings
  3. settings = get_settings()
  4. async def exchange_code_for_token(code: str) -> dict:
  5. async with httpx.AsyncClient() as client:
  6. resp = await client.post(
  7. f"{settings.sso_base_url}/oauth/token",
  8. data={
  9. "grant_type": "authorization_code",
  10. "code": code,
  11. "redirect_uri": settings.sso_redirect_uri,
  12. "client_id": settings.sso_client_id,
  13. "client_secret": settings.sso_client_secret,
  14. },
  15. headers={"Content-Type": "application/x-www-form-urlencoded"},
  16. )
  17. resp.raise_for_status()
  18. return resp.json()
  19. async def fetch_sso_userinfo(access_token: str) -> dict:
  20. async with httpx.AsyncClient() as client:
  21. resp = await client.get(
  22. f"{settings.sso_base_url}/oauth/userinfo",
  23. headers={"Authorization": f"Bearer {access_token}"},
  24. )
  25. resp.raise_for_status()
  26. return resp.json()