123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926 |
- """
- Experimental SCIM 2.0 Implementation for Open WebUI
- Provides System for Cross-domain Identity Management endpoints for users and groups
- NOTE: This is an experimental implementation and may not fully comply with SCIM 2.0 standards, and is subject to change.
- """
- import logging
- import uuid
- import time
- from typing import Optional, List, Dict, Any
- from datetime import datetime, timezone
- from fastapi import APIRouter, Depends, HTTPException, Request, Query, Header, status
- from fastapi.responses import JSONResponse
- from pydantic import BaseModel, Field, ConfigDict
- from open_webui.models.users import Users, UserModel
- from open_webui.models.groups import Groups, GroupModel
- from open_webui.utils.auth import (
- get_admin_user,
- get_current_user,
- decode_token,
- get_verified_user,
- )
- from open_webui.constants import ERROR_MESSAGES
- from open_webui.env import SRC_LOG_LEVELS
- log = logging.getLogger(__name__)
- log.setLevel(SRC_LOG_LEVELS["MAIN"])
- router = APIRouter()
- # SCIM 2.0 Schema URIs
- SCIM_USER_SCHEMA = "urn:ietf:params:scim:schemas:core:2.0:User"
- SCIM_GROUP_SCHEMA = "urn:ietf:params:scim:schemas:core:2.0:Group"
- SCIM_LIST_RESPONSE_SCHEMA = "urn:ietf:params:scim:api:messages:2.0:ListResponse"
- SCIM_ERROR_SCHEMA = "urn:ietf:params:scim:api:messages:2.0:Error"
- # SCIM Resource Types
- SCIM_RESOURCE_TYPE_USER = "User"
- SCIM_RESOURCE_TYPE_GROUP = "Group"
- def scim_error(status_code: int, detail: str, scim_type: Optional[str] = None):
- """Create a SCIM-compliant error response"""
- error_body = {
- "schemas": [SCIM_ERROR_SCHEMA],
- "status": str(status_code),
- "detail": detail,
- }
- if scim_type:
- error_body["scimType"] = scim_type
- elif status_code == 404:
- error_body["scimType"] = "invalidValue"
- elif status_code == 409:
- error_body["scimType"] = "uniqueness"
- elif status_code == 400:
- error_body["scimType"] = "invalidSyntax"
- return JSONResponse(status_code=status_code, content=error_body)
- class SCIMError(BaseModel):
- """SCIM Error Response"""
- schemas: List[str] = [SCIM_ERROR_SCHEMA]
- status: str
- scimType: Optional[str] = None
- detail: Optional[str] = None
- class SCIMMeta(BaseModel):
- """SCIM Resource Metadata"""
- resourceType: str
- created: str
- lastModified: str
- location: Optional[str] = None
- version: Optional[str] = None
- class SCIMName(BaseModel):
- """SCIM User Name"""
- formatted: Optional[str] = None
- familyName: Optional[str] = None
- givenName: Optional[str] = None
- middleName: Optional[str] = None
- honorificPrefix: Optional[str] = None
- honorificSuffix: Optional[str] = None
- class SCIMEmail(BaseModel):
- """SCIM Email"""
- value: str
- type: Optional[str] = "work"
- primary: bool = True
- display: Optional[str] = None
- class SCIMPhoto(BaseModel):
- """SCIM Photo"""
- value: str
- type: Optional[str] = "photo"
- primary: bool = True
- display: Optional[str] = None
- class SCIMGroupMember(BaseModel):
- """SCIM Group Member"""
- value: str # User ID
- ref: Optional[str] = Field(None, alias="$ref")
- type: Optional[str] = "User"
- display: Optional[str] = None
- class SCIMUser(BaseModel):
- """SCIM User Resource"""
- model_config = ConfigDict(populate_by_name=True)
- schemas: List[str] = [SCIM_USER_SCHEMA]
- id: str
- externalId: Optional[str] = None
- userName: str
- name: Optional[SCIMName] = None
- displayName: str
- emails: List[SCIMEmail]
- active: bool = True
- photos: Optional[List[SCIMPhoto]] = None
- groups: Optional[List[Dict[str, str]]] = None
- meta: SCIMMeta
- class SCIMUserCreateRequest(BaseModel):
- """SCIM User Create Request"""
- model_config = ConfigDict(populate_by_name=True)
- schemas: List[str] = [SCIM_USER_SCHEMA]
- externalId: Optional[str] = None
- userName: str
- name: Optional[SCIMName] = None
- displayName: str
- emails: List[SCIMEmail]
- active: bool = True
- password: Optional[str] = None
- photos: Optional[List[SCIMPhoto]] = None
- class SCIMUserUpdateRequest(BaseModel):
- """SCIM User Update Request"""
- model_config = ConfigDict(populate_by_name=True)
- schemas: List[str] = [SCIM_USER_SCHEMA]
- id: Optional[str] = None
- externalId: Optional[str] = None
- userName: Optional[str] = None
- name: Optional[SCIMName] = None
- displayName: Optional[str] = None
- emails: Optional[List[SCIMEmail]] = None
- active: Optional[bool] = None
- photos: Optional[List[SCIMPhoto]] = None
- class SCIMGroup(BaseModel):
- """SCIM Group Resource"""
- model_config = ConfigDict(populate_by_name=True)
- schemas: List[str] = [SCIM_GROUP_SCHEMA]
- id: str
- displayName: str
- members: Optional[List[SCIMGroupMember]] = []
- meta: SCIMMeta
- class SCIMGroupCreateRequest(BaseModel):
- """SCIM Group Create Request"""
- model_config = ConfigDict(populate_by_name=True)
- schemas: List[str] = [SCIM_GROUP_SCHEMA]
- displayName: str
- members: Optional[List[SCIMGroupMember]] = []
- class SCIMGroupUpdateRequest(BaseModel):
- """SCIM Group Update Request"""
- model_config = ConfigDict(populate_by_name=True)
- schemas: List[str] = [SCIM_GROUP_SCHEMA]
- displayName: Optional[str] = None
- members: Optional[List[SCIMGroupMember]] = None
- class SCIMListResponse(BaseModel):
- """SCIM List Response"""
- schemas: List[str] = [SCIM_LIST_RESPONSE_SCHEMA]
- totalResults: int
- itemsPerPage: int
- startIndex: int
- Resources: List[Any]
- class SCIMPatchOperation(BaseModel):
- """SCIM Patch Operation"""
- op: str # "add", "replace", "remove"
- path: Optional[str] = None
- value: Optional[Any] = None
- class SCIMPatchRequest(BaseModel):
- """SCIM Patch Request"""
- schemas: List[str] = ["urn:ietf:params:scim:api:messages:2.0:PatchOp"]
- Operations: List[SCIMPatchOperation]
- def get_scim_auth(
- request: Request, authorization: Optional[str] = Header(None)
- ) -> bool:
- """
- Verify SCIM authentication
- Checks for SCIM-specific bearer token configured in the system
- """
- if not authorization:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Authorization header required",
- headers={"WWW-Authenticate": "Bearer"},
- )
- try:
- parts = authorization.split()
- if len(parts) != 2:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid authorization format. Expected: Bearer <token>",
- )
- scheme, token = parts
- if scheme.lower() != "bearer":
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid authentication scheme",
- )
- # Check if SCIM is enabled
- scim_enabled = getattr(request.app.state, "SCIM_ENABLED", False)
- log.info(
- f"SCIM auth check - raw SCIM_ENABLED: {scim_enabled}, type: {type(scim_enabled)}"
- )
- # Handle both PersistentConfig and direct value
- if hasattr(scim_enabled, "value"):
- scim_enabled = scim_enabled.value
- log.info(f"SCIM enabled status after conversion: {scim_enabled}")
- if not scim_enabled:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="SCIM is not enabled",
- )
- # Verify the SCIM token
- scim_token = getattr(request.app.state, "SCIM_TOKEN", None)
- # Handle both PersistentConfig and direct value
- if hasattr(scim_token, "value"):
- scim_token = scim_token.value
- log.debug(f"SCIM token configured: {bool(scim_token)}")
- if not scim_token or token != scim_token:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid SCIM token",
- )
- return True
- except HTTPException:
- # Re-raise HTTP exceptions as-is
- raise
- except Exception as e:
- log.error(f"SCIM authentication error: {e}")
- import traceback
- log.error(f"Traceback: {traceback.format_exc()}")
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Authentication failed",
- )
- def user_to_scim(user: UserModel, request: Request) -> SCIMUser:
- """Convert internal User model to SCIM User"""
- # Parse display name into name components
- name_parts = user.name.split(" ", 1) if user.name else ["", ""]
- given_name = name_parts[0] if name_parts else ""
- family_name = name_parts[1] if len(name_parts) > 1 else ""
- # Get user's groups
- user_groups = Groups.get_groups_by_member_id(user.id)
- groups = [
- {
- "value": group.id,
- "display": group.name,
- "$ref": f"{request.base_url}api/v1/scim/v2/Groups/{group.id}",
- "type": "direct",
- }
- for group in user_groups
- ]
- return SCIMUser(
- id=user.id,
- userName=user.email,
- name=SCIMName(
- formatted=user.name,
- givenName=given_name,
- familyName=family_name,
- ),
- displayName=user.name,
- emails=[SCIMEmail(value=user.email)],
- active=user.role != "pending",
- photos=(
- [SCIMPhoto(value=user.profile_image_url)]
- if user.profile_image_url
- else None
- ),
- groups=groups if groups else None,
- meta=SCIMMeta(
- resourceType=SCIM_RESOURCE_TYPE_USER,
- created=datetime.fromtimestamp(
- user.created_at, tz=timezone.utc
- ).isoformat(),
- lastModified=datetime.fromtimestamp(
- user.updated_at, tz=timezone.utc
- ).isoformat(),
- location=f"{request.base_url}api/v1/scim/v2/Users/{user.id}",
- ),
- )
- def group_to_scim(group: GroupModel, request: Request) -> SCIMGroup:
- """Convert internal Group model to SCIM Group"""
- members = []
- for user_id in group.user_ids:
- user = Users.get_user_by_id(user_id)
- if user:
- members.append(
- SCIMGroupMember(
- value=user.id,
- ref=f"{request.base_url}api/v1/scim/v2/Users/{user.id}",
- display=user.name,
- )
- )
- return SCIMGroup(
- id=group.id,
- displayName=group.name,
- members=members,
- meta=SCIMMeta(
- resourceType=SCIM_RESOURCE_TYPE_GROUP,
- created=datetime.fromtimestamp(
- group.created_at, tz=timezone.utc
- ).isoformat(),
- lastModified=datetime.fromtimestamp(
- group.updated_at, tz=timezone.utc
- ).isoformat(),
- location=f"{request.base_url}api/v1/scim/v2/Groups/{group.id}",
- ),
- )
- # SCIM Service Provider Config
- @router.get("/ServiceProviderConfig")
- async def get_service_provider_config():
- """Get SCIM Service Provider Configuration"""
- return {
- "schemas": ["urn:ietf:params:scim:schemas:core:2.0:ServiceProviderConfig"],
- "patch": {"supported": True},
- "bulk": {"supported": False, "maxOperations": 1000, "maxPayloadSize": 1048576},
- "filter": {"supported": True, "maxResults": 200},
- "changePassword": {"supported": False},
- "sort": {"supported": False},
- "etag": {"supported": False},
- "authenticationSchemes": [
- {
- "type": "oauthbearertoken",
- "name": "OAuth Bearer Token",
- "description": "Authentication using OAuth 2.0 Bearer Token",
- }
- ],
- }
- # SCIM Resource Types
- @router.get("/ResourceTypes")
- async def get_resource_types(request: Request):
- """Get SCIM Resource Types"""
- return [
- {
- "schemas": ["urn:ietf:params:scim:schemas:core:2.0:ResourceType"],
- "id": "User",
- "name": "User",
- "endpoint": "/Users",
- "schema": SCIM_USER_SCHEMA,
- "meta": {
- "location": f"{request.base_url}api/v1/scim/v2/ResourceTypes/User",
- "resourceType": "ResourceType",
- },
- },
- {
- "schemas": ["urn:ietf:params:scim:schemas:core:2.0:ResourceType"],
- "id": "Group",
- "name": "Group",
- "endpoint": "/Groups",
- "schema": SCIM_GROUP_SCHEMA,
- "meta": {
- "location": f"{request.base_url}api/v1/scim/v2/ResourceTypes/Group",
- "resourceType": "ResourceType",
- },
- },
- ]
- # SCIM Schemas
- @router.get("/Schemas")
- async def get_schemas():
- """Get SCIM Schemas"""
- return [
- {
- "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Schema"],
- "id": SCIM_USER_SCHEMA,
- "name": "User",
- "description": "User Account",
- "attributes": [
- {
- "name": "userName",
- "type": "string",
- "required": True,
- "uniqueness": "server",
- },
- {"name": "displayName", "type": "string", "required": True},
- {
- "name": "emails",
- "type": "complex",
- "multiValued": True,
- "required": True,
- },
- {"name": "active", "type": "boolean", "required": False},
- ],
- },
- {
- "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Schema"],
- "id": SCIM_GROUP_SCHEMA,
- "name": "Group",
- "description": "Group",
- "attributes": [
- {"name": "displayName", "type": "string", "required": True},
- {
- "name": "members",
- "type": "complex",
- "multiValued": True,
- "required": False,
- },
- ],
- },
- ]
- # Users endpoints
- @router.get("/Users", response_model=SCIMListResponse)
- async def get_users(
- request: Request,
- startIndex: int = Query(1, ge=1),
- count: int = Query(20, ge=1, le=100),
- filter: Optional[str] = None,
- _: bool = Depends(get_scim_auth),
- ):
- """List SCIM Users"""
- skip = startIndex - 1
- limit = count
- # Get users from database
- if filter:
- # Simple filter parsing - supports userName eq "email"
- # In production, you'd want a more robust filter parser
- if "userName eq" in filter:
- email = filter.split('"')[1]
- user = Users.get_user_by_email(email)
- users_list = [user] if user else []
- total = 1 if user else 0
- else:
- response = Users.get_users(skip=skip, limit=limit)
- users_list = response["users"]
- total = response["total"]
- else:
- response = Users.get_users(skip=skip, limit=limit)
- users_list = response["users"]
- total = response["total"]
- # Convert to SCIM format
- scim_users = [user_to_scim(user, request) for user in users_list]
- return SCIMListResponse(
- totalResults=total,
- itemsPerPage=len(scim_users),
- startIndex=startIndex,
- Resources=scim_users,
- )
- @router.get("/Users/{user_id}", response_model=SCIMUser)
- async def get_user(
- user_id: str,
- request: Request,
- _: bool = Depends(get_scim_auth),
- ):
- """Get SCIM User by ID"""
- user = Users.get_user_by_id(user_id)
- if not user:
- return scim_error(
- status_code=status.HTTP_404_NOT_FOUND, detail=f"User {user_id} not found"
- )
- return user_to_scim(user, request)
- @router.post("/Users", response_model=SCIMUser, status_code=status.HTTP_201_CREATED)
- async def create_user(
- request: Request,
- user_data: SCIMUserCreateRequest,
- _: bool = Depends(get_scim_auth),
- ):
- """Create SCIM User"""
- # Check if user already exists
- existing_user = Users.get_user_by_email(user_data.userName)
- if existing_user:
- raise HTTPException(
- status_code=status.HTTP_409_CONFLICT,
- detail=f"User with email {user_data.userName} already exists",
- )
- # Create user
- user_id = str(uuid.uuid4())
- email = user_data.emails[0].value if user_data.emails else user_data.userName
- # Parse name if provided
- name = user_data.displayName
- if user_data.name:
- if user_data.name.formatted:
- name = user_data.name.formatted
- elif user_data.name.givenName or user_data.name.familyName:
- name = f"{user_data.name.givenName or ''} {user_data.name.familyName or ''}".strip()
- # Get profile image if provided
- profile_image = "/user.png"
- if user_data.photos and len(user_data.photos) > 0:
- profile_image = user_data.photos[0].value
- # Create user
- new_user = Users.insert_new_user(
- id=user_id,
- name=name,
- email=email,
- profile_image_url=profile_image,
- role="user" if user_data.active else "pending",
- )
- if not new_user:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail="Failed to create user",
- )
- return user_to_scim(new_user, request)
- @router.put("/Users/{user_id}", response_model=SCIMUser)
- async def update_user(
- user_id: str,
- request: Request,
- user_data: SCIMUserUpdateRequest,
- _: bool = Depends(get_scim_auth),
- ):
- """Update SCIM User (full update)"""
- user = Users.get_user_by_id(user_id)
- if not user:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"User {user_id} not found",
- )
- # Build update dict
- update_data = {}
- if user_data.userName:
- update_data["email"] = user_data.userName
- if user_data.displayName:
- update_data["name"] = user_data.displayName
- elif user_data.name:
- if user_data.name.formatted:
- update_data["name"] = user_data.name.formatted
- elif user_data.name.givenName or user_data.name.familyName:
- update_data["name"] = (
- f"{user_data.name.givenName or ''} {user_data.name.familyName or ''}".strip()
- )
- if user_data.emails and len(user_data.emails) > 0:
- update_data["email"] = user_data.emails[0].value
- if user_data.active is not None:
- update_data["role"] = "user" if user_data.active else "pending"
- if user_data.photos and len(user_data.photos) > 0:
- update_data["profile_image_url"] = user_data.photos[0].value
- # Update user
- updated_user = Users.update_user_by_id(user_id, update_data)
- if not updated_user:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail="Failed to update user",
- )
- return user_to_scim(updated_user, request)
- @router.patch("/Users/{user_id}", response_model=SCIMUser)
- async def patch_user(
- user_id: str,
- request: Request,
- patch_data: SCIMPatchRequest,
- _: bool = Depends(get_scim_auth),
- ):
- """Update SCIM User (partial update)"""
- user = Users.get_user_by_id(user_id)
- if not user:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"User {user_id} not found",
- )
- update_data = {}
- for operation in patch_data.Operations:
- op = operation.op.lower()
- path = operation.path
- value = operation.value
- if op == "replace":
- if path == "active":
- update_data["role"] = "user" if value else "pending"
- elif path == "userName":
- update_data["email"] = value
- elif path == "displayName":
- update_data["name"] = value
- elif path == "emails[primary eq true].value":
- update_data["email"] = value
- elif path == "name.formatted":
- update_data["name"] = value
- # Update user
- if update_data:
- updated_user = Users.update_user_by_id(user_id, update_data)
- if not updated_user:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail="Failed to update user",
- )
- else:
- updated_user = user
- return user_to_scim(updated_user, request)
- @router.delete("/Users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
- async def delete_user(
- user_id: str,
- request: Request,
- _: bool = Depends(get_scim_auth),
- ):
- """Delete SCIM User"""
- user = Users.get_user_by_id(user_id)
- if not user:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"User {user_id} not found",
- )
- success = Users.delete_user_by_id(user_id)
- if not success:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail="Failed to delete user",
- )
- return None
- # Groups endpoints
- @router.get("/Groups", response_model=SCIMListResponse)
- async def get_groups(
- request: Request,
- startIndex: int = Query(1, ge=1),
- count: int = Query(20, ge=1, le=100),
- filter: Optional[str] = None,
- _: bool = Depends(get_scim_auth),
- ):
- """List SCIM Groups"""
- # Get all groups
- groups_list = Groups.get_groups()
- # Apply pagination
- total = len(groups_list)
- start = startIndex - 1
- end = start + count
- paginated_groups = groups_list[start:end]
- # Convert to SCIM format
- scim_groups = [group_to_scim(group, request) for group in paginated_groups]
- return SCIMListResponse(
- totalResults=total,
- itemsPerPage=len(scim_groups),
- startIndex=startIndex,
- Resources=scim_groups,
- )
- @router.get("/Groups/{group_id}", response_model=SCIMGroup)
- async def get_group(
- group_id: str,
- request: Request,
- _: bool = Depends(get_scim_auth),
- ):
- """Get SCIM Group by ID"""
- group = Groups.get_group_by_id(group_id)
- if not group:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Group {group_id} not found",
- )
- return group_to_scim(group, request)
- @router.post("/Groups", response_model=SCIMGroup, status_code=status.HTTP_201_CREATED)
- async def create_group(
- request: Request,
- group_data: SCIMGroupCreateRequest,
- _: bool = Depends(get_scim_auth),
- ):
- """Create SCIM Group"""
- # Extract member IDs
- member_ids = []
- if group_data.members:
- for member in group_data.members:
- member_ids.append(member.value)
- # Create group
- from open_webui.models.groups import GroupForm
- form = GroupForm(
- name=group_data.displayName,
- description="",
- )
- # Need to get the creating user's ID - we'll use the first admin
- admin_user = Users.get_super_admin_user()
- if not admin_user:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail="No admin user found",
- )
- new_group = Groups.insert_new_group(admin_user.id, form)
- if not new_group:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail="Failed to create group",
- )
- # Add members if provided
- if member_ids:
- from open_webui.models.groups import GroupUpdateForm
- update_form = GroupUpdateForm(
- name=new_group.name,
- description=new_group.description,
- user_ids=member_ids,
- )
- Groups.update_group_by_id(new_group.id, update_form)
- new_group = Groups.get_group_by_id(new_group.id)
- return group_to_scim(new_group, request)
- @router.put("/Groups/{group_id}", response_model=SCIMGroup)
- async def update_group(
- group_id: str,
- request: Request,
- group_data: SCIMGroupUpdateRequest,
- _: bool = Depends(get_scim_auth),
- ):
- """Update SCIM Group (full update)"""
- group = Groups.get_group_by_id(group_id)
- if not group:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Group {group_id} not found",
- )
- # Build update form
- from open_webui.models.groups import GroupUpdateForm
- update_form = GroupUpdateForm(
- name=group_data.displayName if group_data.displayName else group.name,
- description=group.description,
- )
- # Handle members if provided
- if group_data.members is not None:
- member_ids = [member.value for member in group_data.members]
- update_form.user_ids = member_ids
- # Update group
- updated_group = Groups.update_group_by_id(group_id, update_form)
- if not updated_group:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail="Failed to update group",
- )
- return group_to_scim(updated_group, request)
- @router.patch("/Groups/{group_id}", response_model=SCIMGroup)
- async def patch_group(
- group_id: str,
- request: Request,
- patch_data: SCIMPatchRequest,
- _: bool = Depends(get_scim_auth),
- ):
- """Update SCIM Group (partial update)"""
- group = Groups.get_group_by_id(group_id)
- if not group:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Group {group_id} not found",
- )
- from open_webui.models.groups import GroupUpdateForm
- update_form = GroupUpdateForm(
- name=group.name,
- description=group.description,
- user_ids=group.user_ids.copy() if group.user_ids else [],
- )
- for operation in patch_data.Operations:
- op = operation.op.lower()
- path = operation.path
- value = operation.value
- if op == "replace":
- if path == "displayName":
- update_form.name = value
- elif path == "members":
- # Replace all members
- update_form.user_ids = [member["value"] for member in value]
- elif op == "add":
- if path == "members":
- # Add members
- if isinstance(value, list):
- for member in value:
- if isinstance(member, dict) and "value" in member:
- if member["value"] not in update_form.user_ids:
- update_form.user_ids.append(member["value"])
- elif op == "remove":
- if path and path.startswith("members[value eq"):
- # Remove specific member
- member_id = path.split('"')[1]
- if member_id in update_form.user_ids:
- update_form.user_ids.remove(member_id)
- # Update group
- updated_group = Groups.update_group_by_id(group_id, update_form)
- if not updated_group:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail="Failed to update group",
- )
- return group_to_scim(updated_group, request)
- @router.delete("/Groups/{group_id}", status_code=status.HTTP_204_NO_CONTENT)
- async def delete_group(
- group_id: str,
- request: Request,
- _: bool = Depends(get_scim_auth),
- ):
- """Delete SCIM Group"""
- group = Groups.get_group_by_id(group_id)
- if not group:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Group {group_id} not found",
- )
- success = Groups.delete_group_by_id(group_id)
- if not success:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail="Failed to delete group",
- )
- return None
|