Module Atlassian
Expand source code
#!/usr/bin/env python3
import requests, os, json, time, re, gc, getpass
import urllib.parse
from datetime import datetime
from bs4 import BeautifulSoup
from urllib import parse
from urllib.parse import parse_qs
from urllib.parse import urlparse
from macwinnie_pyhelpers.Browser import Browser
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
class Atlassian ( Browser ):
"""Special functions for Atlassian systems – they need to have `nosso` active at least ..."""
def webSudo( self, authUrl, sysInfoUrl, adjustCookies = True ):
"""run the application webSudo"""
if not self.checkLogin():
self.login()
self.browser.get( sysInfoUrl )
pwFieldXPath = '//input[@type="password"]'
doWebSudo = (
self.browser.current_url != sysInfoUrl
) or (
self.checkElementByXpath( pwFieldXPath )
)
if doWebSudo:
self.browser.get( authUrl )
pwfield = WebDriverWait( self.browser, 10 ).until( EC.presence_of_element_located( ( By.XPATH, pwFieldXPath ) ) )
pwfield.send_keys( Keys.CONTROL, "a" )
pwfield.send_keys( self.app[ 'passwd' ] )
pwfield.send_keys( Keys.ENTER )
time.sleep( 5 )
if adjustCookies:
self.refresh()
def getToken( self ):
"""get CSRF token of Atlassian App"""
self.get( self.app['url'] )
return self.browser.find_element( By.ID, 'atlassian-token' ).get_attribute( 'content' )
class Jira ( Atlassian ):
"""Jira Browser – recommended to use administrative user; otherwise most functions won't work properly"""
appInfo = {
"url": {
"env": "JIRA_URL",
"url": True,
"description": "Enter Jira URL",
},
"user": {
"env": "JIRA_USER",
"description": "Enter Jira username",
},
"passwd": {
"password": True,
"env": "JIRA_PASS",
"description": "Enter Jira password",
},
}
projectKeys = []
permissions = {}
projectLeads = {}
def __init__( self ):
super( Jira, self ).__init__()
def setSessionBaseHeaders( self, api=False ):
"""Api requests need special headers in Jira ... sometimes ..."""
if not api:
self.sessionBaseHeaders = None
else:
self.sessionBaseHeaders = {
"Content-Type": "application/json",
"X-Atlassian-Token": "no-check"
}
def login( self, skipSSO=True ):
"""Run Jira login"""
loginUrl = self.app[ 'url' ] + "login.jsp"
if skipSSO:
loginUrl += '?nosso'
self.loadCookies( url=loginUrl )
self.setSessionBaseHeaders()
userObject = { "os_username": self.app[ 'user' ], "os_password": self.app[ 'passwd' ], }
if os.getenv( "PRINT_DEBUG_INFO", "False" ).lower() in [ "1", "true", "t", "y", "yes" ]:
print( 'User is being logged in with those credentials:' )
print( userObject )
print()
self.sessionPostRequest( loginUrl, userObject, True, urlOverride=True )
self.get( self.app[ 'url' ] , False )
def checkLogin( self ):
"""Check if login is needed"""
currentUrl = self.browser.current_url
urlLen = len( self.app[ 'url' ] )
if currentUrl[ 0:urlLen ] != self.app[ 'url' ]:
self.get( self.app[ 'url' ], False )
try:
self.browser.find_element( By.ID, 'login' )
return False
except:
return True
def webSudo( self, adjustCookies = True ):
"""Run the Jira WebSudo"""
authUrl = '{url}secure/admin/WebSudoAuthenticate!default.jspa'.format( url=self.app[ 'url' ] )
sysInfoUrl = '{url}secure/admin/ViewApplicationProperties.jspa'.format( url=self.app[ 'url' ] )
super( Jira, self ).webSudo( authUrl, sysInfoUrl, adjustCookies )
def getProjectKeys( self ):
"""Get a list of Project Keys of those Jira projects visible to logged in user"""
self.webSudo()
self.get( self.app[ 'url' ] + 'secure/project/BrowseProjects.jspa' )
while True:
soup = self.toSoup()
self.projectKeys += [ item.text for item in soup.select( 'tbody tr .cell-type-key' ) ]
try:
nextBtn = soup.select( '.aui-nav-next a' )[0]
check = nextBtn[ 'data-page' ]
self.browser.find_element( By.XPATH, '//*[contains(concat(" ", normalize-space(@class), " "), " aui-nav-next ")]//a' ).click()
time.sleep(5)
except:
break
return self.projectKeys
def getUserGroups( self, username, session=None ):
"""Function to gather groups of user"""
apiUrl = '{url}rest/api/2/user?username={user}&expand=groups'.format( url=self.app['url'], user=urllib.parse.quote( username ) )
groups = self.apiGetInfo( apiUrl, session=session )
return [ g['name'] for g in groups['groups']['items'] ]
def getUserApiInfo( self, username, returnResponse=False, session=None ):
"""Function to gather information about a user from API – if `returnResponse` is True, the whole request response is returned, otherwise only the decoded answer object"""
apiUrl = '{url}rest/api/2/user?username={user}'.format( url=self.app['url'], user=urllib.parse.quote( username ) )
return self.apiGetInfo( apiUrl, returnResponse, session )
def getGroupApiInfo( self, groupname, returnResponse=False, session=None ):
"""Function to gather information about a group from API – if `returnResponse` is True, the whole request response is returned, otherwise only the decoded answer object"""
apiUrl = '{url}rest/api/2/group?groupname={group}'.format( url=self.app['url'], group=urllib.parse.quote( groupname ) )
return self.apiGetInfo( apiUrl, returnResponse, session )
def gatherProjectPermissions( self, projectKey, session=None ):
"""Function to gather permissions for a specific Jira project"""
rolesDropdownXpath = '//*[contains(concat(" ", @class, " "), " iLKsB ")]'
projectPermissions = None
self.get( '{url}plugins/servlet/project-config/{pjkey}/roles'.format( url=self.app['url'], pjkey=projectKey ) )
skip = True
try:
WebDriverWait( self.browser, 10 ).until( EC.presence_of_element_located( ( By.XPATH, '//*[contains(concat(" ", @class, " "), " EKGYc ")]' ) ) )
skip = False
except:
print( '{p} skipped'.format( p = projectKey ) )
if not skip:
projectPermissions = {}
continueLookup = True
# retrieve possible roles
roleBtn = WebDriverWait( self.browser, 10 ).until(EC.element_to_be_clickable((By.XPATH, '//*[contains(concat(" ", @class, " "), " css-18u3ks8 ")]')))
roleBtn.click()
# wait until dropdown is visible
WebDriverWait( self.browser, 10 ).until(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath)))
# kteeYD (selected, single one), goEvqh (selected, multiple ones), eJTYOK (not selected)
roleElements = self.browser.find_elements( By.XPATH, '//*[contains(concat(" ", @class, " "), " eJTYOk ") or contains(concat(" ", @class, " "), " goEvqh ") or contains(concat(" ", @class, " "), " kteeYD ")]' )
for rl in roleElements:
role = re.match( r'^(.+?)(\s\([0-9]+\))?$' ,rl.text ).group( 1 )
projectPermissions[ role ] = {'USERS':[], 'GROUPS':[]}
try:
roleBtn.click()
WebDriverWait( self.browser, 10 ).until_not(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath)))
except:
actions = ActionChains( self.browser )
actions.send_keys( Keys.ESCAPE )
actions.perform()
WebDriverWait( self.browser, 10 ).until_not(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath)))
while continueLookup:
# find all username and groupname fields on page
nameFields = self.browser.find_elements( By.XPATH, '//*[contains(concat(" ", @class, " "), " EKGYc ")]' )
# fetch users for roles
for x in nameFields:
n = x.text
trigger = x.find_element( By.XPATH, './parent::*/parent::*/parent::*/parent::*//*[contains(concat(" ", @class, " "), " css-8xpfx5 ")]' )
resp = self.getUserApiInfo( n, True, session )
index = None
if ( resp.status_code == 200 ):
index = 'USERS'
else:
resp = self.getGroupApiInfo( n, True, session )
if ( resp.status_code == 200 ):
index = 'GROUPS'
if index != None:
try:
trigger.click()
WebDriverWait( self.browser, 10 ).until(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath)))
clicked = True
except:
actions = ActionChains( self.browser )
actions.send_keys( Keys.ESCAPE )
actions.perform()
try:
WebDriverWait( self.browser, 10 ).until(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath)))
except Exception as e:
self.browser.get_screenshot_as_file('./data/debug/error_{index}_open_{date}.png'.format( date = datetime.now().strftime("%Y%m%d_%H%M%S"), index=index ))
raise e
roleElements = self.browser.find_elements( By.XPATH, '//*[contains(concat(" ", @class, " "), " kteeYD ") or contains(concat(" ", @class, " "), " goEvqh ")]' )
for rl in roleElements:
projectPermissions[ rl.text ][ index ].append( n )
try:
trigger.click()
WebDriverWait( self.browser, 10 ).until_not(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath)))
clicked = True
except:
actions = ActionChains( self.browser )
actions.send_keys( Keys.ESCAPE )
actions.perform()
try:
WebDriverWait( self.browser, 10 ).until_not(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath)))
except Exception as e:
self.browser.get_screenshot_as_file('./data/debug/error_{index}_close_{date}.png'.format( date = datetime.now().strftime("%Y%m%d_%H%M%S"), index=index ))
raise e
try:
self.browser.find_element( By.XPATH, '//*[@aria-label="Next" and not(@disabled)]' ).click()
except:
continueLookup = False
return projectPermissions
def getGroups( self ):
"""get list of groupnames existing in Jira"""
session = self.toSession()
rspJ = session.get( '{url}rest/api/2/groups/picker?maxResults={limit}'.format( url=self.app[ 'url' ], limit=5000 ) )
jgroups = []
if rspJ.status_code == 200:
rsp = json.loads( rspJ.text )
for g in rsp['groups']:
jgroups.append( g[ 'name' ] )
return jgroups
def removeUserFromGroup( self, group, user, session=None ):
"""Remove user from group"""
if session == None:
session = self.toSession( curl = True )
requestData = {
'groupname': group,
'username': user
}
rsp = session.delete( '{url}rest/api/2/group/user?{params}'.format( url=self.app[ 'url' ], params=urllib.parse.urlencode( requestData ) ) )
return rsp
def addUserToGroup( self, group, user, session=None ):
"""Add user to group"""
if session == None:
session = self.toSession( curl = True )
rsp = session.post( '{url}rest/api/2/group/user?groupname={group}'.format(url=self.app['url'], group=group), data = json.dumps({'name':user}), headers={'content-type':'application/json'} )
return rsp
def getGroupMembers( self, group, activeFilter='true' ):
"""get group members from GUI"""
self.webSudo()
self.setSessionBaseHeaders()
self.sessionPostRequest( '{url}secure/admin/user/UserBrowser.jspa'.format( url=self.app['url'] ), { 'userSearchFilter': '', 'group': group, 'applicationFilter': '', 'activeFilter': activeFilter, 'max': 1000000 } , True )
should = WebDriverWait( self.browser, 10 ).until( EC.presence_of_element_located( ( By.XPATH, '//*[contains(concat(" ", @class, " "), " results-count ")]//*[contains(concat(" ", @class, " "), " results-count-total ")]' ) ) ).text
gms = self.browser.find_elements( By.XPATH, '//*[@data-cell-type="username"]/ancestor::tr[position() = 1]' )
if len(gms) != int(should):
raise Exception('Count-Missmatch with group members of "{g}": {f} found, but {s} should have!'.format(g=group, f=len(gms), s=should))
users = []
for i in gms:
users.append( i.get_attribute( "data-user" ) )
return users
def getGroupMembersAPI( self, group ):
"""get group members from API – sometimes a little bit buggy ..."""
sessionJ = self.toSession()
stop = False
i = 0
users = []
while True:
limit = 50
parameters = {
'groupname': group,
'startAt': ( i * limit ),
'maxResults': limit,
'includeInactiveUsers': 'false',
}
memberUrl = '{url}rest/api/2/group/member?{params}'.format( url=self.app[ 'url' ], params=urllib.parse.urlencode( parameters ) )
rspJ = sessionJ.get( memberUrl )
if rspJ.status_code == 200:
rsp = json.loads( rspJ.text )
if rsp[ 'isLast' ] == True:
stop = True
for u in rsp['values']:
users.append( u[ 'name' ] )
else:
print('rc: ' + str(rspJ.status_code))
print(parameters)
stop = True
if stop or len( rsp['values'] ) == 0:
len( rsp['values'] )
break
else:
i += 1
return users
def getActiveProjectLead( self, projectKey, session=None ):
"""Function to retrieve project lead of Jira project, if the user is still active"""
if session == None:
session = self.toSession()
self.browser.get( '{url}plugins/servlet/project-config/{pjkey}/roles'.format( url=self.app['url'], pjkey=projectKey ) )
plo = WebDriverWait( self.browser, 10 ).until( EC.presence_of_element_located( ( By.XPATH, '//*[contains(concat(" ", @class, " "), " jKQrhX ")]//a' ) ) )
parsedProfile = urlparse( plo.get_attribute('href') )
ploUser = parse_qs(parsedProfile.query)['name'][0]
resp = session.get( '{url}rest/api/2/user?username={user}'.format( url=self.app['url'], user=urllib.parse.quote( ploUser ) ) )
try:
userJson = json.loads( resp.text )
active = userJson['active']
except:
active = False
if ( resp.status_code == 200 and active):
return ploUser
else:
return None
def getProjectLeads( self ):
"""Function to retrieve active project leads for all Jira projects"""
if len( self.projectKeys ) == 0:
self.getProjectKeys()
if len( self.projectLeads ) == 0:
session = self.toSession()
for pk in self.projectKeys:
self.projectLeads[ projectKey ] = self.getActiveProjectLead( pk, session )
return self.projectLeads
def gatherAllProjectPermissions( self, deletePermissionHelperFile=True ):
"""Function to retrieve project permissions for all projects"""
if len( self.projectKeys ) == 0:
self.getProjectKeys()
if len( self.permissions ) == 0:
permissionHelperFile = './data/projectPermissionsHelper.json'
session = self.toSession()
if not os.path.exists( os.path.dirname( permissionHelperFile ) ):
os.makedirs( os.path.dirname( permissionHelperFile ), exist_ok=True)
if not os.path.exists( permissionHelperFile ):
with open( permissionHelperFile, 'w+'):
pass
with open( permissionHelperFile ) as json_file:
self.permissions = json.load( json_file )
for pk in self.projectKeys:
if pk not in self.permissions:
print( pk )
self.permissions[ pk ] = self.gatherProjectPermissions( pk, session )
with open( permissionHelperFile, 'w' ) as json_file:
json.dump( self.permissions, json_file )
print()
if deletePermissionHelperFile:
os.remove( permissionHelperFile )
return self.permissions
def createGroup( self, groupName ):
"""Create a local user group"""
groupUrl = self.app[ 'url' ] + '/secure/admin/user/GroupBrowser.jspa'
self.get( groupUrl )
field = self.browser.find_element( By.XPATH, '//input[@name="addName"]' )
field.send_keys( Keys.CONTROL, "a" )
field.send_keys( groupName )
field.send_keys( Keys.ENTER )
session = self.toSession()
groupCreated = False
while not groupCreated:
time.sleep(1)
rsp = session.get( '{url}rest/api/latest/group?groupname={group}'.format( url=self.app[ 'url' ], group=urllib.parse.quote( groupName ) ) )
groupCreated = ( rsp.status_code == 200 )
class Confluence ( Atlassian ):
"""Confluence Browser – recommended to use administrative user; otherwise most functions won't work properly"""
spaceKeys = []
spaceNames = {}
permissions = {}
permissionsNames = []
appInfo = {
"url": {
"env": "CONFLUENCE_URL",
"url": True,
"description": "Enter Confluence URL",
},
"user": {
"env": "CONFLUENCE_USER",
"description": "Enter Confluence username",
},
"passwd": {
"password": True,
"env": "CONFLUENCE_PASS",
"description": "Enter Confluence password",
},
}
def __init__( self ):
super( Confluence, self ).__init__()
def login( self, skipSSO=True ):
"""Run Confluence login"""
loginUrl = self.app[ 'url' ] + "dologin.action"
if skipSSO:
loginUrl += '?nosso'
self.loadCookies( url=loginUrl )
self.setSessionBaseHeaders()
userObject = { "os_username": self.app[ 'user' ], "os_password": self.app[ 'passwd' ], "os_cookie": "true", "login": "Log in", "os_destination": "", }
if os.getenv( "PRINT_DEBUG_INFO", "False" ).lower() in [ "1", "true", "t", "y", "yes" ]:
print( 'User is being logged in with those credentials:' )
print( userObject )
print()
self.sessionPostRequest( loginUrl, userObject, True, urlOverride=True )
self.get( self.app[ 'url' ] , False )
def checkLogin( self ):
"""Check if login is needed"""
currentUrl = self.browser.current_url
urlLen = len( self.app[ 'url' ] )
if currentUrl[ 0:urlLen ] != self.app[ 'url' ]:
self.get( self.app[ 'url' ], False )
try:
self.browser.find_element( By.ID, 'loginButton' )
return False
except:
return True
def setSessionBaseHeaders( self, api=False ):
"""Api requests need special headers in Confluence ... sometimes ..."""
if not api:
self.sessionBaseHeaders = None
else:
self.sessionBaseHeaders = {
"Content-Type": "application/json",
"X-Atlassian-Token": "no-check"
}
def getPageInfo( self, spaceKey, title, session=None ):
"""Function to get page information by Confluence space key and page title"""
self.setSessionBaseHeaders( api=True )
parameters = {
'spaceKey': spaceKey,
'title': title,
'expand': 'version,space',
}
apiUrl = '{url}rest/api/content?{params}'.format( url=self.app[ 'url' ], params=urllib.parse.urlencode( parameters ) )
results = self.apiGetInfo( apiUrl, session=session )
if results[ 'size' ] > 1:
raise Exception( 'More than one page found – cannot proceed properly ...' )
return results[ 'results' ][ 0 ]
def getPageInfoByID( self, pageID, session=None ):
"""Function to get page information by page ID"""
self.setSessionBaseHeaders( api=True )
parameters = {
'expand': 'version,space',
}
apiUrl = '{url}rest/api/content/{id}?{params}'.format( url=self.app[ 'url' ], id=pageID, params=urllib.parse.urlencode( parameters ) )
result = self.apiGetInfo( apiUrl, session=session )
return result
def getSpaceName( self, spaceKey, session=None ):
"""Helper function to retrieve the space name of a Confluence Space"""
self.setSessionBaseHeaders( api=True )
apiUrl = '{url}rest/api/space/{spaceKey}'.format( url=self.app[ 'url' ], spaceKey=spaceKey )
result = self.apiGetInfo( apiUrl, session=session )
if result != None:
return result[ 'name' ]
def getSpaceKeyFromName( self, spaceName, force=False, session=None ):
"""Helper function to retrieve space key by space name"""
if len( self.spaceNames ) == 0 or force == True:
sks = self.getSpaceKeys()
for sk in sks:
self.spaceNames[ self.getSpaceName( sk ) ] = sk
if spaceName in self.spaceNames:
return self.spaceNames[ spaceName ]
def getSpaceHomepageID( self, spaceKey, session=None ):
"""Helper function to get page ID of homepage of Space"""
self.setSessionBaseHeaders( api=True )
parameters = {
'expand': 'homepage',
'spaceKey': spaceKey,
}
apiUrl = '{url}rest/api/space?{params}'.format( url=self.app[ 'url' ], id=pageID, params=urllib.parse.urlencode( parameters ) )
result = self.apiGetInfo( apiUrl, session=session )
if result != None:
return result[ 'results' ][ 0 ][ 'homepage' ][ 'id' ]
def createPage( self, title, contentHtml, spaceKey, parent=None, session=None ):
"""helper function to create new pages within Confluence"""
if parent == None:
parent = getSpaceHomepageID( spaceKey )
createUrl = '{url}rest/api/content'.format( url=self.app[ 'url' ] )
createObject = {
"type": "page",
"title": title,
"space": {
"key": spaceKey
},
"body": {
"storage": {
"value": contentHtml,
"representation": "storage"
}
},
"ancestors": [{
"id": parent
}]
}
response = self.sessionPostRequest( createUrl, createObject, session )
rc = response.status_code
i = 1
title = createObject[ 'title' ]
while rc != 200 and 'already exists' in response.json()['message'].lower():
print( ' ' + response.json()['message'] )
createObject[ 'title' ] = '{} ({})'.format( title, i )
response = self.sessionPostRequest( createUrl, createObject, session )
rc = response.status_code
i += 1
if rc == 200:
return [ response.json()['id'], createObject[ 'title' ] ]
else:
raise Exception ( "API Response ({}) was:\n\n{}".format( rc, response.content ) )
def updatePage( self, newTitle, contentHtml, pageID, spaceKey, version=None, session=None ):
"""helper function to update a pages content"""
if version == None:
version = self.getPageInfoByID( pageID )[ 'version' ][ 'number' ] + 1
createObject = {
"id": pageID,
"type": "page",
"title": newTitle,
"space": {
"key": spaceKey
},
"body": {
"storage": {
"value": contentHtml,
"representation": "storage"
}
},
"version": {
"number": version
}
}
createUrl = '{url}rest/api/content/{pid}'.format( url=self.app[ 'url' ], pid=pageID )
response = self.sessionPutRequest( createUrl, createObject, session=session )
rc = response.status_code
if rc == 200:
return response.json()['id']
else:
raise Exception ( "API Response ({}) was:\n\n{}".format( rc, response.content ) )
def getPageContent( self, pageID, session=None ):
"""helper function to retrieve content of page by pageID"""
apiUrl = '{url}rest/api/content/{pid}?expand=body.storage'.format( url=self.app[ 'url' ], pid=pid )
if session == None:
session = self.toSession( curl = True )
rspC = session.get( apiUrl )
if rspC.status_code == 200:
rsp = json.loads( rspC.text )
return rsp[ 'body' ][ 'storage' ][ 'value' ]
else:
return None
def deletePage( self, pageID, session=None ):
"""helper function to delete a given page"""
if session == None:
session = self.toSession( curl = True )
rsp = session.delete( '{url}rest/api/content/{id}'.format( url=self.app[ 'url' ], id=pageID ) )
return rsp
def getGroups( self ):
"""Retrieve Confluence local groups"""
cgroups = []
session = self.toSession()
stop = False
i = 0
while True:
limit = 1000
rspC = session.get('{url}rest/api/group?start={start}&limit={limit}'.format( url=self.app[ 'url' ], limit=limit, start=( i * limit ) ) )
if rspC.status_code == 200:
rsp = json.loads( rspC.text )
for g in rsp['results']:
cgroups.append( g['name'] )
else:
stop = True
if stop or len( rsp['results'] ) == 0:
break
else:
i += 1
return cgroups
def webSudo( self, adjustCookies = True ):
"""run Confluence websudo"""
authUrl = '{url}authenticate.action'.format( url=self.app[ 'url' ] )
sysInfoUrl = '{url}admin/viewgeneralconfig.action'.format( url=self.app[ 'url' ] )
super( Confluence, self ).webSudo( authUrl, sysInfoUrl, adjustCookies )
def getSpaceKeys( self, force = False, personalSpaces = False ):
"""get all space keys of Confluence spaces through API"""
# self.webSudo()
session = self.toSession( curl=True )
if force or self.spaceKeys == []:
self.spaceKeys = []
stop = False
i = 0
users = []
while True:
limit = 50
start = i * limit
apiUrl = '{url}rest/api/space?start={start}&limit={limit}'.format( url=self.app[ 'url' ], start=start, limit=limit )
rspJ = session.get( apiUrl )
if rspJ.status_code == 200:
rsp = json.loads( rspJ.text )
if rsp[ 'size' ] < limit:
stop = True
for k in rsp[ 'results' ]:
if personalSpaces or k[ 'type' ] not in [ 'personal' ]:
self.spaceKeys.append( k[ 'key' ] )
else:
print('start: {s}, limit: {l}, rc: {r}'.format( s=start, l=limit, r=str(rspJ.status_code) ) )
stop = True
if stop or len( rsp['results'] ) == 0:
break
else:
i += 1
return self.spaceKeys
def getSpaceKeysGUI( self, force=False ):
"""get all space keys of Confluence spaces through GUI"""
if force or self.spaceKeys == []:
self.spaceKeys = []
self.get( self.app[ 'url' ] + 'spacedirectory/view.action' )
while True:
soup = self.toSoup()
self.spaceKeys += [ item[ 'data-spacekey' ] for item in soup.select( 'tbody tr' ) ]
try:
nextBtn = soup.select( '.aui-nav-next a' )[0]
check = nextBtn[ 'href' ]
self.browser.find_element( By.XPATH, '//*[contains(concat(" ", normalize-space(@class), " "), " aui-nav-next ")]//a' ).click()
time.sleep( 5 )
except:
break
return self.spaceKeys
def APIFileRequest( self, url, mime, filename, filecontent, transferCookies = False ):
"""send a file through POST request into space"""
session = self.toSession( curl=True )
attachData = {
"bodyType": "storage",
"supportedContainerTypes": [
"space",
"page"
],
"supportedChildTypes": [
"attachment",
"comment"
],
"supportedSpacePermissions": [],
"preventDuplicateTitle": False,
"indexing": {
"enabled": True
},
"files": {
'file': ( filename, filecontent, mime )
}
}
request = session.post( url, data = attachData, headers = { "Content-Type": "multipart/form-data", "X-Atlassian-Token": "no-check" } )
if transferCookies:
# write Cookies from POST request to Selenium Browser
new_cookies = session.cookies.get_dict()
self.browser.get( request.url )
for key, value in new_cookies.items():
self.browser.add_cookie( { "name": key, "value": value } )
self.get( request.url )
return request
def cleanupPermissions( self ):
"""function to clean up permissions in spaces"""
editPermissionUrl = self.app[ 'url' ] + 'spaces/editspacepermissions.action?edit=Edit+Permissions&key='
userCheckApiUrl = self.app[ 'url' ] + 'rest/api/user?username='
admingroupname = os.getenv( 'CONFLUENCE_ADMIN_GROUP', 'confluence-administrators' )
self.getSpaceKeys()
session = self.toSession()
resp = session.get( userCheckApiUrl + self.app[ 'user' ] )
if resp.status_code == 403:
self.login()
self.webSudo()
session = self.toSession()
for space in self.spaceKeys:
self.get( editPermissionUrl + space )
rows = self.browser.find_elements( By.XPATH, '//*[@id="uPermissionsTable"]//tr')[2:]
removedUsers = []
for row in rows:
username = row.find_element( By.XPATH,'.//span').text[1:-1]
userApiUrl = userCheckApiUrl + username
resp = session.get( userApiUrl )
if resp.status_code == 200:
# User exists, all fine
pass
elif resp.status_code == 404:
removedUsers += [ username ]
row.find_element( By.XPATH,'.//td[1]//button').click()
# User does not exist, remove all permissions
else:
raise APIError( resp.status_code, userApiUrl )
time.sleep(4)
self.browser.find_element( By.XPATH,'//input[@name="save" and @type="submit"]').click()
time.sleep(2)
self.get( editPermissionUrl + space )
rows = self.browser.find_elements( By.XPATH, '//*[@id="uPermissionsTable"]//tr')[2:]
for row in rows:
username = row.find_element( By.XPATH,'.//span').text[1:-1]
if username in removedUsers:
row.find_element( By.XPATH,'.//td[1]//button').click()
self.browser.find_element( By.XPATH,'//input[@name="save" and @type="submit"]').click()
time.sleep(2)
self.get( editPermissionUrl + space )
rows = self.browser.find_elements( By.XPATH, '//*[@id="uPermissionsTable"]//tr')[2:]
admins_added = False
for row in rows:
username = row.find_element( By.XPATH,'.//span').text[1:-1]
if username in removedUsers:
if not admins_added:
groupaddfield = self.browser.find_element( By.ID,'groups-to-add-autocomplete')
groupaddfield.send_keys( Keys.CONTROL, "a" )
groupaddfield.send_keys( admingroupname )
self.browser.find_element( By.XPATH, '//*[@name="groupsToAddButton" and @type="submit"]' ).click()
time.sleep(2)
self.browser.find_element( By.XPATH, '//*[@id="gPermissionsTable"]//*[contains(text(), "' + admingroupname + '")]//button' ).click()
admins_added = True
break
if admins_added:
rows = self.browser.find_elements( By.XPATH, '//*[@id="uPermissionsTable"]//tr')[2:]
for row in rows:
username = row.find_element( By.XPATH,'.//span').text[1:-1]
if username in removedUsers:
row.find_element( By.XPATH,'.//td[1]//button').click()
self.browser.find_element( By.XPATH,'//input[@name="save" and @type="submit"]').click()
time.sleep(2)
def getPermissionRepresentation( self, boolList, trueVal='X', falseVal='-' ):
"""generate a simple representation of a permission row for Confluence space"""
rep = ''
for x in boolList:
if x:
rep += trueVal
else:
rep += falseVal
return rep
def getBoolListFromRepresentation( self, representation, trueVal='X', falseVal='-' ):
"""revert representation of a permission row for Confluence space to boolean list"""
boolList = []
for c in representation:
if c == trueVal:
boolList.append( True )
elif c == falseVal:
boolList.append( False )
else:
raise Exception( 'Check your representation – trueVal (\'{trueVal}\') and falseVal (\'{falseVal}\') are not matched by \'{val}\'!'.format( trueVal=trueVal, falseVal=falseVal, val=c ) )
return boolList
def getAllSpacePermissions( self, force=False, debug=True ):
"""function to retrieve all space permissions in Confluence"""
if force or self.permissions == {}:
self.permissions = {}
self.getSpaceKeys()
for space in self.spaceKeys:
self.permissions[ space ] = self.getSpacePermissions( space )
return self.permissions
def getSpacePermissions( self, spaceKey, force=False, debug=False ):
"""function to retrieve space permissions for one Confluence space specified through Space Key"""
if force or spaceKey not in self.permissions:
permissionUrl = '{url}spaces/spacepermissions.action?key={space}'.format( url=self.app[ 'url' ], space=spaceKey )
if debug:
print( spaceKey )
self.get( permissionUrl )
soup = self.toSoup()
if self.permissionsNames == []:
mainPerms = [ item.text for item in soup.select('table.permissions tr:first-of-type')[0].find_all('th')[1:] ]
i = 0
for item in soup.select('table.permissions tr:nth-of-type(2)')[0].find_all('th')[1:]:
try:
if 'permissions-group-start' in item[ 'class' ]:
i += 1
except:
pass
self.permissionsNames += [ mainPerms[ i ] + ' (' + item.text + ')' ]
tempPermissions = {}
anonymous = soup.select('#aPermissionsTable tr:not(:first-of-type):not(:nth-of-type(2))')[0].select( 'td' )
tempPermissions[ 'ANONYMOUS' ] = { 'anonymous': self.fetchPermissionsFromRow( anonymous[1:] ) }
users = soup.select('#uPermissionsTable tr:not(:first-of-type):not(:nth-of-type(2))')
tempPermissions[ 'USERS' ] = {}
for row in users:
tds = row.select( 'td' )
tempPermissions[ 'USERS' ][ tds[0].select( 'span' )[0].text.strip()[1:-1] ] = self.fetchPermissionsFromRow( tds[1:] )
groups = soup.select('#gPermissionsTable tr:not(:first-of-type):not(:nth-of-type(2))')
tempPermissions[ 'GROUPS' ] = {}
for row in groups:
tds = row.select( 'td' )
tempPermissions[ 'GROUPS' ][ tds[0].text.strip() ] = self.fetchPermissionsFromRow( tds[1:] )
return tempPermissions
else:
return self.permissions[ spaceKey ]
def fetchPermissionsFromRow( self, row ):
"""helper function to fetch permissions from one table row in permission view of Confluence space"""
return [ item[ 'data-permission-set' ].lower() != 'false' for item in row ]
def getGroupMembers( self, group ):
"""Retrieve group member usernames from Confluence"""
sessionC = self.toSession()
stop = False
i = 0
users = []
limit = 50
firstRun = True
while True:
parameters = {
'start': ( i * limit ),
'limit': limit,
}
memberUrl = '{url}rest/api/group/{group}/member?{params}'.format( url=self.app[ 'url' ], group=group, params=urllib.parse.urlencode( parameters ) )
rspC = sessionC.get( memberUrl )
if rspC.status_code == 200:
rsp = json.loads( rspC.text )
if firstRun and ( rsp[ 'size' ] < rsp[ 'limit' ] or limit != rsp[ 'limit' ] ):
limit = rsp[ 'size' ]
elif rsp[ 'size' ] < limit:
stop = True
for u in rsp['results']:
users.append( u[ 'username' ] )
else:
print('rc: ' + str(rspC.status_code))
print(parameters)
stop = True
if stop or len( rsp['results'] ) == 0:
len( rsp['results'] )
break
else:
i += 1
firstRun = False
return users
def getUserApiInfo( self, username, returnResponse=False, session=None ):
"""Function to gather information about a user from API – if `returnResponse` is True, the whole request response is returned, otherwise only the decoded answer object"""
apiUrl = '{url}rest/mobile/latest/profile/{user}'.format( url=self.app['url'], user=urllib.parse.quote( username ) )
return self.apiGetInfo( apiUrl, returnResponse, session )
def recoverConfluencePermissions( self ):
"""Function to recover confluence permissions if a system administrator has no space access. ENSURE YOUR SYSTEM LANGUAGE TO BE ENGLISH!"""
permsurl = '{url}/admin/permissions/viewdefaultspacepermissions.action'.format( url=self.app['url'] )
self.browser.get( permsurl )
xpath = '//a[text()="Recover Permissions"]'
xpathOk = '//button[text()="OK"]'
okJS = 'xPathRes = document.evaluate (\'{xpath}\', document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null); xPathRes.singleNodeValue.click();'.format( xpath=xpathOk )
listOfSpaces = []
try:
element = self.browser.find_element( By.XPATH, xpath )
element.location_once_scrolled_into_view
except:
element = False
while element:
listOfSpaces += [ element.get_attribute( 'data-space-key' ) ]
element.click()
WebDriverWait( self.browser, 10 ).until( EC.presence_of_element_located( ( By.XPATH, xpathOk ) ) )
self.browser.execute_script( okJS )
WebDriverWait( self.browser, 10 ).until_not( EC.presence_of_element_located( ( By.ID, 'recover-permissions-dialog' ) ) )
time.sleep( 0.5 )
try:
element = self.browser.find_element( By.XPATH, xpath )
element.location_once_scrolled_into_view
except:
element = False
return listOfSpaces
def getAllSpacePermissionsAsRepresentation( self, force=False, debug=True, trueVal='X', falseVal='-' ):
"""
Function to retrieve dictionary of permission representations.
First level keys are space keys followed by second key as (like in getAllSpacePermissions) categories `GROUPS`, `USERS` and `ANONYMOUS`.
The last key-level of the result dictionary is the granted object identifier (group name, user name or `anonymous`) followed by the representation.
"""
perms = self.getAllSpacePermissions( force, debug )
vperms = {}
for spacekey, permissions in perms.items():
vperms[ spacekey ] = {}
for category, specificperms in permissions.items():
vperms[ spacekey ][ category ] = {}
for key, permarray in specificperms.items():
vperms[ spacekey ][ category ][ key ] = self.getPermissionRepresentation( permarray, trueVal, falseVal )
return vperms
def addNewSpacePermissions( self, spaceKey, category, objectName, representation, isRepresentation=True, submitChanges=True ):
"""Function to add new space permissions – if `isRepresentation` is `False`, a boolean list for permissions is expected."""
self.callPermissionChangePage( spaceKey )
if category == 'GROUPS':
gfield = self.browser.find_element_by_xpath( '//input[@name="groupsToAdd"]' )
gfield.send_keys( Keys.CONTROL, "a" )
gfield.send_keys( objectName )
time.sleep(1)
self.browser.find_element( By.XPATH, '//input[@name="groupsToAddButton"]' ).click()
elif category == 'USERS':
gfield = self.browser.find_element_by_xpath( '//input[@name="usersToAdd"]' )
gfield.send_keys( Keys.CONTROL, "a" )
gfield.send_keys( objectName )
time.sleep(1)
self.browser.find_element( By.XPATH, '//input[@name="usersToAddButton"]' ).click()
elif category == 'ANONYMOUS':
# ANONYMOUS has only one row which is always present
pass
else:
raise Exception( '{val} is an invalid category!'.format( val=category ) )
time.sleep(1)
self.adjustSpacePermissions( spaceKey, category, objectName, representation, isRepresentation, submitChanges )
def adjustSpacePermissions( self, spaceKey, category, objectName, representation, isRepresentation=True, submitChanges=True ):
"""Function to change existing space permissions – if `isRepresentation` is `False`, a boolean list for permissions is expected."""
self.callPermissionChangePage( spaceKey )
if isRepresentation:
representation = self.getBoolListFromRepresentation( representation )
try:
permissionCheckboxes = self.selectObjectRowForSpacePermissionChange( category, objectName, spaceKey )
if os.getenv( "PRINT_DEBUG_INFO", "False" ).lower() in [ "1", "true", "t", "y", "yes" ]:
rc = len( representation )
pcc = len( permissionCheckboxes )
if rc != pcc:
print( 'Representation length ({rc}) and permission checkboxes lenght ({pcc}) do not match! Values are mapped as far as possible.'.format( rc=rc, pcc=pcc ) )
print()
for i, granted in enumerate( representation ):
try:
element = permissionCheckboxes[ i ].find_element( By.XPATH, './/input[@type="checkbox"]' )
while element.is_selected() != granted:
element.click()
except:
pass
if submitChanges:
self.submitSpacePermissionChange()
except:
if os.getenv( "PRINT_DEBUG_INFO", "False" ).lower() in [ "1", "true", "t", "y", "yes" ]:
print( 'Object {obj} in category {cat} could not be found for space with key {sk} ...'.format( obj=objectName, cat=category, sk=spaceKey ) )
def removeSpacePermissions( self, spaceKey, category, objectName, submitChanges=True ):
"""Function to remove permissions from space"""
self.callPermissionChangePage( spaceKey )
try:
permissionCheckboxes = self.selectObjectRowForSpacePermissionChange( category, objectName, spaceKey )
for element in permissionCheckboxes:
checkbox = element.find_element( By.XPATH, './/input[@type="checkbox"]' )
while checkbox.is_selected():
checkbox.click()
if submitChanges:
self.submitSpacePermissionChange()
except:
if os.getenv( "PRINT_DEBUG_INFO", "False" ).lower() in [ "1", "true", "t", "y", "yes" ]:
print( 'Object {obj} in category {cat} could not be found for space with key {sk} ...'.format( obj=objectName, cat=category, sk=spaceKey ) )
def selectObjectRowForSpacePermissionChange( self, category, objectName, spaceKey ):
"""Helper function to select the correct row for changing permissions"""
xpathPermissionCell = './/*[contains(concat(" ", normalize-space(@class), " "), " permissionCell ")]'
if category == 'GROUPS':
permissionRow = self.browser.find_element( By.XPATH, '//td[@data-permission-group=\'{obj}\'][1]/parent::tr'.format( obj=objectName ) )
elif category == 'USERS':
permissionRow = self.browser.find_element( By.XPATH, '//td[@data-permission-user=\'{obj}\'][1]/parent::tr'.format( obj=objectName ) )
elif category == 'ANONYMOUS':
permissionRow = self.browser.find_element( By.ID, 'aPermissionsTable' )
permissionRow = permissionRow.find_element( By.XPATH, '{xpath}[1]/parent::tr'.format( xpath=xpathPermissionCell) )
else:
raise Exception( '{val} is an invalid category!'.format( val=category ) )
permissionCheckboxes = permissionRow.find_elements( By.XPATH, xpathPermissionCell )
pcc = len( permissionCheckboxes )
if pcc != 14:
raise Exception( 'There are {c} permission checkboxes found for object {obj} in space permissions for space with key {sk}! (14 expected ...)'.format( c=pcc, obj=objectName, sk=spaceKey ) )
return permissionCheckboxes
def callPermissionChangePage( self, spaceKey ):
"""Helper function to call permission change page for a specified Confluence space"""
url = '{url}spaces/editspacepermissions.action?key={space}'.format( url=self.app['url'], space=spaceKey )
if self.browser.current_url != url:
self.webSudo()
self.get( url )
def submitSpacePermissionChange( self ):
"""Submit the permission changes on space permission change page of confluence"""
time.sleep(1)
self.browser.find_element( By.XPATH, '//input[@name="save"]' ).click()
def setSpacePermissions( self, spaceKey, representationDictionary, forceRefresh=False ):
"""Function meant to adjust space permissions to match a representation dictionary like returned by `getAllSpacePermissionsAsRepresentation`."""
currentPermissions = self.getSpacePermissions( spaceKey, forceRefresh )
for category in representationDictionary:
for objectName, newPermissions in representationDictionary[ category ].items():
if objectName not in currentPermissions[ category ]:
self.addNewSpacePermissions( spaceKey, category, objectName, newPermissions, submitChanges=False )
else:
self.adjustSpacePermissions( spaceKey, category, objectName, newPermissions, submitChanges=False )
# removing permissions has to be the last step since it could be that space permissions then lock out the current user
for category in representationDictionary:
for objectName in currentPermissions[ category ]:
if objectName not in representationDictionary[ category ]:
self.removeSpacePermissions( spaceKey, category, objectName, submitChanges=False )
self.submitSpacePermissionChange()
Classes
class Atlassian-
Special functions for Atlassian systems – they need to have
nossoactive at least …function to initiate the systems as needed
Expand source code
class Atlassian ( Browser ): """Special functions for Atlassian systems – they need to have `nosso` active at least ...""" def webSudo( self, authUrl, sysInfoUrl, adjustCookies = True ): """run the application webSudo""" if not self.checkLogin(): self.login() self.browser.get( sysInfoUrl ) pwFieldXPath = '//input[@type="password"]' doWebSudo = ( self.browser.current_url != sysInfoUrl ) or ( self.checkElementByXpath( pwFieldXPath ) ) if doWebSudo: self.browser.get( authUrl ) pwfield = WebDriverWait( self.browser, 10 ).until( EC.presence_of_element_located( ( By.XPATH, pwFieldXPath ) ) ) pwfield.send_keys( Keys.CONTROL, "a" ) pwfield.send_keys( self.app[ 'passwd' ] ) pwfield.send_keys( Keys.ENTER ) time.sleep( 5 ) if adjustCookies: self.refresh() def getToken( self ): """get CSRF token of Atlassian App""" self.get( self.app['url'] ) return self.browser.find_element( By.ID, 'atlassian-token' ).get_attribute( 'content' )Ancestors
- macwinnie_pyhelpers.Browser.Browser
Subclasses
Methods
def getToken(self)-
get CSRF token of Atlassian App
Expand source code
def getToken( self ): """get CSRF token of Atlassian App""" self.get( self.app['url'] ) return self.browser.find_element( By.ID, 'atlassian-token' ).get_attribute( 'content' ) def webSudo(self, authUrl, sysInfoUrl, adjustCookies=True)-
run the application webSudo
Expand source code
def webSudo( self, authUrl, sysInfoUrl, adjustCookies = True ): """run the application webSudo""" if not self.checkLogin(): self.login() self.browser.get( sysInfoUrl ) pwFieldXPath = '//input[@type="password"]' doWebSudo = ( self.browser.current_url != sysInfoUrl ) or ( self.checkElementByXpath( pwFieldXPath ) ) if doWebSudo: self.browser.get( authUrl ) pwfield = WebDriverWait( self.browser, 10 ).until( EC.presence_of_element_located( ( By.XPATH, pwFieldXPath ) ) ) pwfield.send_keys( Keys.CONTROL, "a" ) pwfield.send_keys( self.app[ 'passwd' ] ) pwfield.send_keys( Keys.ENTER ) time.sleep( 5 ) if adjustCookies: self.refresh()
class Confluence-
Confluence Browser – recommended to use administrative user; otherwise most functions won't work properly
function to initiate the systems as needed
Expand source code
class Confluence ( Atlassian ): """Confluence Browser – recommended to use administrative user; otherwise most functions won't work properly""" spaceKeys = [] spaceNames = {} permissions = {} permissionsNames = [] appInfo = { "url": { "env": "CONFLUENCE_URL", "url": True, "description": "Enter Confluence URL", }, "user": { "env": "CONFLUENCE_USER", "description": "Enter Confluence username", }, "passwd": { "password": True, "env": "CONFLUENCE_PASS", "description": "Enter Confluence password", }, } def __init__( self ): super( Confluence, self ).__init__() def login( self, skipSSO=True ): """Run Confluence login""" loginUrl = self.app[ 'url' ] + "dologin.action" if skipSSO: loginUrl += '?nosso' self.loadCookies( url=loginUrl ) self.setSessionBaseHeaders() userObject = { "os_username": self.app[ 'user' ], "os_password": self.app[ 'passwd' ], "os_cookie": "true", "login": "Log in", "os_destination": "", } if os.getenv( "PRINT_DEBUG_INFO", "False" ).lower() in [ "1", "true", "t", "y", "yes" ]: print( 'User is being logged in with those credentials:' ) print( userObject ) print() self.sessionPostRequest( loginUrl, userObject, True, urlOverride=True ) self.get( self.app[ 'url' ] , False ) def checkLogin( self ): """Check if login is needed""" currentUrl = self.browser.current_url urlLen = len( self.app[ 'url' ] ) if currentUrl[ 0:urlLen ] != self.app[ 'url' ]: self.get( self.app[ 'url' ], False ) try: self.browser.find_element( By.ID, 'loginButton' ) return False except: return True def setSessionBaseHeaders( self, api=False ): """Api requests need special headers in Confluence ... sometimes ...""" if not api: self.sessionBaseHeaders = None else: self.sessionBaseHeaders = { "Content-Type": "application/json", "X-Atlassian-Token": "no-check" } def getPageInfo( self, spaceKey, title, session=None ): """Function to get page information by Confluence space key and page title""" self.setSessionBaseHeaders( api=True ) parameters = { 'spaceKey': spaceKey, 'title': title, 'expand': 'version,space', } apiUrl = '{url}rest/api/content?{params}'.format( url=self.app[ 'url' ], params=urllib.parse.urlencode( parameters ) ) results = self.apiGetInfo( apiUrl, session=session ) if results[ 'size' ] > 1: raise Exception( 'More than one page found – cannot proceed properly ...' ) return results[ 'results' ][ 0 ] def getPageInfoByID( self, pageID, session=None ): """Function to get page information by page ID""" self.setSessionBaseHeaders( api=True ) parameters = { 'expand': 'version,space', } apiUrl = '{url}rest/api/content/{id}?{params}'.format( url=self.app[ 'url' ], id=pageID, params=urllib.parse.urlencode( parameters ) ) result = self.apiGetInfo( apiUrl, session=session ) return result def getSpaceName( self, spaceKey, session=None ): """Helper function to retrieve the space name of a Confluence Space""" self.setSessionBaseHeaders( api=True ) apiUrl = '{url}rest/api/space/{spaceKey}'.format( url=self.app[ 'url' ], spaceKey=spaceKey ) result = self.apiGetInfo( apiUrl, session=session ) if result != None: return result[ 'name' ] def getSpaceKeyFromName( self, spaceName, force=False, session=None ): """Helper function to retrieve space key by space name""" if len( self.spaceNames ) == 0 or force == True: sks = self.getSpaceKeys() for sk in sks: self.spaceNames[ self.getSpaceName( sk ) ] = sk if spaceName in self.spaceNames: return self.spaceNames[ spaceName ] def getSpaceHomepageID( self, spaceKey, session=None ): """Helper function to get page ID of homepage of Space""" self.setSessionBaseHeaders( api=True ) parameters = { 'expand': 'homepage', 'spaceKey': spaceKey, } apiUrl = '{url}rest/api/space?{params}'.format( url=self.app[ 'url' ], id=pageID, params=urllib.parse.urlencode( parameters ) ) result = self.apiGetInfo( apiUrl, session=session ) if result != None: return result[ 'results' ][ 0 ][ 'homepage' ][ 'id' ] def createPage( self, title, contentHtml, spaceKey, parent=None, session=None ): """helper function to create new pages within Confluence""" if parent == None: parent = getSpaceHomepageID( spaceKey ) createUrl = '{url}rest/api/content'.format( url=self.app[ 'url' ] ) createObject = { "type": "page", "title": title, "space": { "key": spaceKey }, "body": { "storage": { "value": contentHtml, "representation": "storage" } }, "ancestors": [{ "id": parent }] } response = self.sessionPostRequest( createUrl, createObject, session ) rc = response.status_code i = 1 title = createObject[ 'title' ] while rc != 200 and 'already exists' in response.json()['message'].lower(): print( ' ' + response.json()['message'] ) createObject[ 'title' ] = '{} ({})'.format( title, i ) response = self.sessionPostRequest( createUrl, createObject, session ) rc = response.status_code i += 1 if rc == 200: return [ response.json()['id'], createObject[ 'title' ] ] else: raise Exception ( "API Response ({}) was:\n\n{}".format( rc, response.content ) ) def updatePage( self, newTitle, contentHtml, pageID, spaceKey, version=None, session=None ): """helper function to update a pages content""" if version == None: version = self.getPageInfoByID( pageID )[ 'version' ][ 'number' ] + 1 createObject = { "id": pageID, "type": "page", "title": newTitle, "space": { "key": spaceKey }, "body": { "storage": { "value": contentHtml, "representation": "storage" } }, "version": { "number": version } } createUrl = '{url}rest/api/content/{pid}'.format( url=self.app[ 'url' ], pid=pageID ) response = self.sessionPutRequest( createUrl, createObject, session=session ) rc = response.status_code if rc == 200: return response.json()['id'] else: raise Exception ( "API Response ({}) was:\n\n{}".format( rc, response.content ) ) def getPageContent( self, pageID, session=None ): """helper function to retrieve content of page by pageID""" apiUrl = '{url}rest/api/content/{pid}?expand=body.storage'.format( url=self.app[ 'url' ], pid=pid ) if session == None: session = self.toSession( curl = True ) rspC = session.get( apiUrl ) if rspC.status_code == 200: rsp = json.loads( rspC.text ) return rsp[ 'body' ][ 'storage' ][ 'value' ] else: return None def deletePage( self, pageID, session=None ): """helper function to delete a given page""" if session == None: session = self.toSession( curl = True ) rsp = session.delete( '{url}rest/api/content/{id}'.format( url=self.app[ 'url' ], id=pageID ) ) return rsp def getGroups( self ): """Retrieve Confluence local groups""" cgroups = [] session = self.toSession() stop = False i = 0 while True: limit = 1000 rspC = session.get('{url}rest/api/group?start={start}&limit={limit}'.format( url=self.app[ 'url' ], limit=limit, start=( i * limit ) ) ) if rspC.status_code == 200: rsp = json.loads( rspC.text ) for g in rsp['results']: cgroups.append( g['name'] ) else: stop = True if stop or len( rsp['results'] ) == 0: break else: i += 1 return cgroups def webSudo( self, adjustCookies = True ): """run Confluence websudo""" authUrl = '{url}authenticate.action'.format( url=self.app[ 'url' ] ) sysInfoUrl = '{url}admin/viewgeneralconfig.action'.format( url=self.app[ 'url' ] ) super( Confluence, self ).webSudo( authUrl, sysInfoUrl, adjustCookies ) def getSpaceKeys( self, force = False, personalSpaces = False ): """get all space keys of Confluence spaces through API""" # self.webSudo() session = self.toSession( curl=True ) if force or self.spaceKeys == []: self.spaceKeys = [] stop = False i = 0 users = [] while True: limit = 50 start = i * limit apiUrl = '{url}rest/api/space?start={start}&limit={limit}'.format( url=self.app[ 'url' ], start=start, limit=limit ) rspJ = session.get( apiUrl ) if rspJ.status_code == 200: rsp = json.loads( rspJ.text ) if rsp[ 'size' ] < limit: stop = True for k in rsp[ 'results' ]: if personalSpaces or k[ 'type' ] not in [ 'personal' ]: self.spaceKeys.append( k[ 'key' ] ) else: print('start: {s}, limit: {l}, rc: {r}'.format( s=start, l=limit, r=str(rspJ.status_code) ) ) stop = True if stop or len( rsp['results'] ) == 0: break else: i += 1 return self.spaceKeys def getSpaceKeysGUI( self, force=False ): """get all space keys of Confluence spaces through GUI""" if force or self.spaceKeys == []: self.spaceKeys = [] self.get( self.app[ 'url' ] + 'spacedirectory/view.action' ) while True: soup = self.toSoup() self.spaceKeys += [ item[ 'data-spacekey' ] for item in soup.select( 'tbody tr' ) ] try: nextBtn = soup.select( '.aui-nav-next a' )[0] check = nextBtn[ 'href' ] self.browser.find_element( By.XPATH, '//*[contains(concat(" ", normalize-space(@class), " "), " aui-nav-next ")]//a' ).click() time.sleep( 5 ) except: break return self.spaceKeys def APIFileRequest( self, url, mime, filename, filecontent, transferCookies = False ): """send a file through POST request into space""" session = self.toSession( curl=True ) attachData = { "bodyType": "storage", "supportedContainerTypes": [ "space", "page" ], "supportedChildTypes": [ "attachment", "comment" ], "supportedSpacePermissions": [], "preventDuplicateTitle": False, "indexing": { "enabled": True }, "files": { 'file': ( filename, filecontent, mime ) } } request = session.post( url, data = attachData, headers = { "Content-Type": "multipart/form-data", "X-Atlassian-Token": "no-check" } ) if transferCookies: # write Cookies from POST request to Selenium Browser new_cookies = session.cookies.get_dict() self.browser.get( request.url ) for key, value in new_cookies.items(): self.browser.add_cookie( { "name": key, "value": value } ) self.get( request.url ) return request def cleanupPermissions( self ): """function to clean up permissions in spaces""" editPermissionUrl = self.app[ 'url' ] + 'spaces/editspacepermissions.action?edit=Edit+Permissions&key=' userCheckApiUrl = self.app[ 'url' ] + 'rest/api/user?username=' admingroupname = os.getenv( 'CONFLUENCE_ADMIN_GROUP', 'confluence-administrators' ) self.getSpaceKeys() session = self.toSession() resp = session.get( userCheckApiUrl + self.app[ 'user' ] ) if resp.status_code == 403: self.login() self.webSudo() session = self.toSession() for space in self.spaceKeys: self.get( editPermissionUrl + space ) rows = self.browser.find_elements( By.XPATH, '//*[@id="uPermissionsTable"]//tr')[2:] removedUsers = [] for row in rows: username = row.find_element( By.XPATH,'.//span').text[1:-1] userApiUrl = userCheckApiUrl + username resp = session.get( userApiUrl ) if resp.status_code == 200: # User exists, all fine pass elif resp.status_code == 404: removedUsers += [ username ] row.find_element( By.XPATH,'.//td[1]//button').click() # User does not exist, remove all permissions else: raise APIError( resp.status_code, userApiUrl ) time.sleep(4) self.browser.find_element( By.XPATH,'//input[@name="save" and @type="submit"]').click() time.sleep(2) self.get( editPermissionUrl + space ) rows = self.browser.find_elements( By.XPATH, '//*[@id="uPermissionsTable"]//tr')[2:] for row in rows: username = row.find_element( By.XPATH,'.//span').text[1:-1] if username in removedUsers: row.find_element( By.XPATH,'.//td[1]//button').click() self.browser.find_element( By.XPATH,'//input[@name="save" and @type="submit"]').click() time.sleep(2) self.get( editPermissionUrl + space ) rows = self.browser.find_elements( By.XPATH, '//*[@id="uPermissionsTable"]//tr')[2:] admins_added = False for row in rows: username = row.find_element( By.XPATH,'.//span').text[1:-1] if username in removedUsers: if not admins_added: groupaddfield = self.browser.find_element( By.ID,'groups-to-add-autocomplete') groupaddfield.send_keys( Keys.CONTROL, "a" ) groupaddfield.send_keys( admingroupname ) self.browser.find_element( By.XPATH, '//*[@name="groupsToAddButton" and @type="submit"]' ).click() time.sleep(2) self.browser.find_element( By.XPATH, '//*[@id="gPermissionsTable"]//*[contains(text(), "' + admingroupname + '")]//button' ).click() admins_added = True break if admins_added: rows = self.browser.find_elements( By.XPATH, '//*[@id="uPermissionsTable"]//tr')[2:] for row in rows: username = row.find_element( By.XPATH,'.//span').text[1:-1] if username in removedUsers: row.find_element( By.XPATH,'.//td[1]//button').click() self.browser.find_element( By.XPATH,'//input[@name="save" and @type="submit"]').click() time.sleep(2) def getPermissionRepresentation( self, boolList, trueVal='X', falseVal='-' ): """generate a simple representation of a permission row for Confluence space""" rep = '' for x in boolList: if x: rep += trueVal else: rep += falseVal return rep def getBoolListFromRepresentation( self, representation, trueVal='X', falseVal='-' ): """revert representation of a permission row for Confluence space to boolean list""" boolList = [] for c in representation: if c == trueVal: boolList.append( True ) elif c == falseVal: boolList.append( False ) else: raise Exception( 'Check your representation – trueVal (\'{trueVal}\') and falseVal (\'{falseVal}\') are not matched by \'{val}\'!'.format( trueVal=trueVal, falseVal=falseVal, val=c ) ) return boolList def getAllSpacePermissions( self, force=False, debug=True ): """function to retrieve all space permissions in Confluence""" if force or self.permissions == {}: self.permissions = {} self.getSpaceKeys() for space in self.spaceKeys: self.permissions[ space ] = self.getSpacePermissions( space ) return self.permissions def getSpacePermissions( self, spaceKey, force=False, debug=False ): """function to retrieve space permissions for one Confluence space specified through Space Key""" if force or spaceKey not in self.permissions: permissionUrl = '{url}spaces/spacepermissions.action?key={space}'.format( url=self.app[ 'url' ], space=spaceKey ) if debug: print( spaceKey ) self.get( permissionUrl ) soup = self.toSoup() if self.permissionsNames == []: mainPerms = [ item.text for item in soup.select('table.permissions tr:first-of-type')[0].find_all('th')[1:] ] i = 0 for item in soup.select('table.permissions tr:nth-of-type(2)')[0].find_all('th')[1:]: try: if 'permissions-group-start' in item[ 'class' ]: i += 1 except: pass self.permissionsNames += [ mainPerms[ i ] + ' (' + item.text + ')' ] tempPermissions = {} anonymous = soup.select('#aPermissionsTable tr:not(:first-of-type):not(:nth-of-type(2))')[0].select( 'td' ) tempPermissions[ 'ANONYMOUS' ] = { 'anonymous': self.fetchPermissionsFromRow( anonymous[1:] ) } users = soup.select('#uPermissionsTable tr:not(:first-of-type):not(:nth-of-type(2))') tempPermissions[ 'USERS' ] = {} for row in users: tds = row.select( 'td' ) tempPermissions[ 'USERS' ][ tds[0].select( 'span' )[0].text.strip()[1:-1] ] = self.fetchPermissionsFromRow( tds[1:] ) groups = soup.select('#gPermissionsTable tr:not(:first-of-type):not(:nth-of-type(2))') tempPermissions[ 'GROUPS' ] = {} for row in groups: tds = row.select( 'td' ) tempPermissions[ 'GROUPS' ][ tds[0].text.strip() ] = self.fetchPermissionsFromRow( tds[1:] ) return tempPermissions else: return self.permissions[ spaceKey ] def fetchPermissionsFromRow( self, row ): """helper function to fetch permissions from one table row in permission view of Confluence space""" return [ item[ 'data-permission-set' ].lower() != 'false' for item in row ] def getGroupMembers( self, group ): """Retrieve group member usernames from Confluence""" sessionC = self.toSession() stop = False i = 0 users = [] limit = 50 firstRun = True while True: parameters = { 'start': ( i * limit ), 'limit': limit, } memberUrl = '{url}rest/api/group/{group}/member?{params}'.format( url=self.app[ 'url' ], group=group, params=urllib.parse.urlencode( parameters ) ) rspC = sessionC.get( memberUrl ) if rspC.status_code == 200: rsp = json.loads( rspC.text ) if firstRun and ( rsp[ 'size' ] < rsp[ 'limit' ] or limit != rsp[ 'limit' ] ): limit = rsp[ 'size' ] elif rsp[ 'size' ] < limit: stop = True for u in rsp['results']: users.append( u[ 'username' ] ) else: print('rc: ' + str(rspC.status_code)) print(parameters) stop = True if stop or len( rsp['results'] ) == 0: len( rsp['results'] ) break else: i += 1 firstRun = False return users def getUserApiInfo( self, username, returnResponse=False, session=None ): """Function to gather information about a user from API – if `returnResponse` is True, the whole request response is returned, otherwise only the decoded answer object""" apiUrl = '{url}rest/mobile/latest/profile/{user}'.format( url=self.app['url'], user=urllib.parse.quote( username ) ) return self.apiGetInfo( apiUrl, returnResponse, session ) def recoverConfluencePermissions( self ): """Function to recover confluence permissions if a system administrator has no space access. ENSURE YOUR SYSTEM LANGUAGE TO BE ENGLISH!""" permsurl = '{url}/admin/permissions/viewdefaultspacepermissions.action'.format( url=self.app['url'] ) self.browser.get( permsurl ) xpath = '//a[text()="Recover Permissions"]' xpathOk = '//button[text()="OK"]' okJS = 'xPathRes = document.evaluate (\'{xpath}\', document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null); xPathRes.singleNodeValue.click();'.format( xpath=xpathOk ) listOfSpaces = [] try: element = self.browser.find_element( By.XPATH, xpath ) element.location_once_scrolled_into_view except: element = False while element: listOfSpaces += [ element.get_attribute( 'data-space-key' ) ] element.click() WebDriverWait( self.browser, 10 ).until( EC.presence_of_element_located( ( By.XPATH, xpathOk ) ) ) self.browser.execute_script( okJS ) WebDriverWait( self.browser, 10 ).until_not( EC.presence_of_element_located( ( By.ID, 'recover-permissions-dialog' ) ) ) time.sleep( 0.5 ) try: element = self.browser.find_element( By.XPATH, xpath ) element.location_once_scrolled_into_view except: element = False return listOfSpaces def getAllSpacePermissionsAsRepresentation( self, force=False, debug=True, trueVal='X', falseVal='-' ): """ Function to retrieve dictionary of permission representations. First level keys are space keys followed by second key as (like in getAllSpacePermissions) categories `GROUPS`, `USERS` and `ANONYMOUS`. The last key-level of the result dictionary is the granted object identifier (group name, user name or `anonymous`) followed by the representation. """ perms = self.getAllSpacePermissions( force, debug ) vperms = {} for spacekey, permissions in perms.items(): vperms[ spacekey ] = {} for category, specificperms in permissions.items(): vperms[ spacekey ][ category ] = {} for key, permarray in specificperms.items(): vperms[ spacekey ][ category ][ key ] = self.getPermissionRepresentation( permarray, trueVal, falseVal ) return vperms def addNewSpacePermissions( self, spaceKey, category, objectName, representation, isRepresentation=True, submitChanges=True ): """Function to add new space permissions – if `isRepresentation` is `False`, a boolean list for permissions is expected.""" self.callPermissionChangePage( spaceKey ) if category == 'GROUPS': gfield = self.browser.find_element_by_xpath( '//input[@name="groupsToAdd"]' ) gfield.send_keys( Keys.CONTROL, "a" ) gfield.send_keys( objectName ) time.sleep(1) self.browser.find_element( By.XPATH, '//input[@name="groupsToAddButton"]' ).click() elif category == 'USERS': gfield = self.browser.find_element_by_xpath( '//input[@name="usersToAdd"]' ) gfield.send_keys( Keys.CONTROL, "a" ) gfield.send_keys( objectName ) time.sleep(1) self.browser.find_element( By.XPATH, '//input[@name="usersToAddButton"]' ).click() elif category == 'ANONYMOUS': # ANONYMOUS has only one row which is always present pass else: raise Exception( '{val} is an invalid category!'.format( val=category ) ) time.sleep(1) self.adjustSpacePermissions( spaceKey, category, objectName, representation, isRepresentation, submitChanges ) def adjustSpacePermissions( self, spaceKey, category, objectName, representation, isRepresentation=True, submitChanges=True ): """Function to change existing space permissions – if `isRepresentation` is `False`, a boolean list for permissions is expected.""" self.callPermissionChangePage( spaceKey ) if isRepresentation: representation = self.getBoolListFromRepresentation( representation ) try: permissionCheckboxes = self.selectObjectRowForSpacePermissionChange( category, objectName, spaceKey ) if os.getenv( "PRINT_DEBUG_INFO", "False" ).lower() in [ "1", "true", "t", "y", "yes" ]: rc = len( representation ) pcc = len( permissionCheckboxes ) if rc != pcc: print( 'Representation length ({rc}) and permission checkboxes lenght ({pcc}) do not match! Values are mapped as far as possible.'.format( rc=rc, pcc=pcc ) ) print() for i, granted in enumerate( representation ): try: element = permissionCheckboxes[ i ].find_element( By.XPATH, './/input[@type="checkbox"]' ) while element.is_selected() != granted: element.click() except: pass if submitChanges: self.submitSpacePermissionChange() except: if os.getenv( "PRINT_DEBUG_INFO", "False" ).lower() in [ "1", "true", "t", "y", "yes" ]: print( 'Object {obj} in category {cat} could not be found for space with key {sk} ...'.format( obj=objectName, cat=category, sk=spaceKey ) ) def removeSpacePermissions( self, spaceKey, category, objectName, submitChanges=True ): """Function to remove permissions from space""" self.callPermissionChangePage( spaceKey ) try: permissionCheckboxes = self.selectObjectRowForSpacePermissionChange( category, objectName, spaceKey ) for element in permissionCheckboxes: checkbox = element.find_element( By.XPATH, './/input[@type="checkbox"]' ) while checkbox.is_selected(): checkbox.click() if submitChanges: self.submitSpacePermissionChange() except: if os.getenv( "PRINT_DEBUG_INFO", "False" ).lower() in [ "1", "true", "t", "y", "yes" ]: print( 'Object {obj} in category {cat} could not be found for space with key {sk} ...'.format( obj=objectName, cat=category, sk=spaceKey ) ) def selectObjectRowForSpacePermissionChange( self, category, objectName, spaceKey ): """Helper function to select the correct row for changing permissions""" xpathPermissionCell = './/*[contains(concat(" ", normalize-space(@class), " "), " permissionCell ")]' if category == 'GROUPS': permissionRow = self.browser.find_element( By.XPATH, '//td[@data-permission-group=\'{obj}\'][1]/parent::tr'.format( obj=objectName ) ) elif category == 'USERS': permissionRow = self.browser.find_element( By.XPATH, '//td[@data-permission-user=\'{obj}\'][1]/parent::tr'.format( obj=objectName ) ) elif category == 'ANONYMOUS': permissionRow = self.browser.find_element( By.ID, 'aPermissionsTable' ) permissionRow = permissionRow.find_element( By.XPATH, '{xpath}[1]/parent::tr'.format( xpath=xpathPermissionCell) ) else: raise Exception( '{val} is an invalid category!'.format( val=category ) ) permissionCheckboxes = permissionRow.find_elements( By.XPATH, xpathPermissionCell ) pcc = len( permissionCheckboxes ) if pcc != 14: raise Exception( 'There are {c} permission checkboxes found for object {obj} in space permissions for space with key {sk}! (14 expected ...)'.format( c=pcc, obj=objectName, sk=spaceKey ) ) return permissionCheckboxes def callPermissionChangePage( self, spaceKey ): """Helper function to call permission change page for a specified Confluence space""" url = '{url}spaces/editspacepermissions.action?key={space}'.format( url=self.app['url'], space=spaceKey ) if self.browser.current_url != url: self.webSudo() self.get( url ) def submitSpacePermissionChange( self ): """Submit the permission changes on space permission change page of confluence""" time.sleep(1) self.browser.find_element( By.XPATH, '//input[@name="save"]' ).click() def setSpacePermissions( self, spaceKey, representationDictionary, forceRefresh=False ): """Function meant to adjust space permissions to match a representation dictionary like returned by `getAllSpacePermissionsAsRepresentation`.""" currentPermissions = self.getSpacePermissions( spaceKey, forceRefresh ) for category in representationDictionary: for objectName, newPermissions in representationDictionary[ category ].items(): if objectName not in currentPermissions[ category ]: self.addNewSpacePermissions( spaceKey, category, objectName, newPermissions, submitChanges=False ) else: self.adjustSpacePermissions( spaceKey, category, objectName, newPermissions, submitChanges=False ) # removing permissions has to be the last step since it could be that space permissions then lock out the current user for category in representationDictionary: for objectName in currentPermissions[ category ]: if objectName not in representationDictionary[ category ]: self.removeSpacePermissions( spaceKey, category, objectName, submitChanges=False ) self.submitSpacePermissionChange()Ancestors
- Atlassian
- macwinnie_pyhelpers.Browser.Browser
Class variables
var appInfovar permissionsvar permissionsNamesvar spaceKeysvar spaceNames
Methods
def APIFileRequest(self, url, mime, filename, filecontent, transferCookies=False)-
send a file through POST request into space
Expand source code
def APIFileRequest( self, url, mime, filename, filecontent, transferCookies = False ): """send a file through POST request into space""" session = self.toSession( curl=True ) attachData = { "bodyType": "storage", "supportedContainerTypes": [ "space", "page" ], "supportedChildTypes": [ "attachment", "comment" ], "supportedSpacePermissions": [], "preventDuplicateTitle": False, "indexing": { "enabled": True }, "files": { 'file': ( filename, filecontent, mime ) } } request = session.post( url, data = attachData, headers = { "Content-Type": "multipart/form-data", "X-Atlassian-Token": "no-check" } ) if transferCookies: # write Cookies from POST request to Selenium Browser new_cookies = session.cookies.get_dict() self.browser.get( request.url ) for key, value in new_cookies.items(): self.browser.add_cookie( { "name": key, "value": value } ) self.get( request.url ) return request def addNewSpacePermissions(self, spaceKey, category, objectName, representation, isRepresentation=True, submitChanges=True)-
Function to add new space permissions – if
isRepresentationisFalse, a boolean list for permissions is expected.Expand source code
def addNewSpacePermissions( self, spaceKey, category, objectName, representation, isRepresentation=True, submitChanges=True ): """Function to add new space permissions – if `isRepresentation` is `False`, a boolean list for permissions is expected.""" self.callPermissionChangePage( spaceKey ) if category == 'GROUPS': gfield = self.browser.find_element_by_xpath( '//input[@name="groupsToAdd"]' ) gfield.send_keys( Keys.CONTROL, "a" ) gfield.send_keys( objectName ) time.sleep(1) self.browser.find_element( By.XPATH, '//input[@name="groupsToAddButton"]' ).click() elif category == 'USERS': gfield = self.browser.find_element_by_xpath( '//input[@name="usersToAdd"]' ) gfield.send_keys( Keys.CONTROL, "a" ) gfield.send_keys( objectName ) time.sleep(1) self.browser.find_element( By.XPATH, '//input[@name="usersToAddButton"]' ).click() elif category == 'ANONYMOUS': # ANONYMOUS has only one row which is always present pass else: raise Exception( '{val} is an invalid category!'.format( val=category ) ) time.sleep(1) self.adjustSpacePermissions( spaceKey, category, objectName, representation, isRepresentation, submitChanges ) def adjustSpacePermissions(self, spaceKey, category, objectName, representation, isRepresentation=True, submitChanges=True)-
Function to change existing space permissions – if
isRepresentationisFalse, a boolean list for permissions is expected.Expand source code
def adjustSpacePermissions( self, spaceKey, category, objectName, representation, isRepresentation=True, submitChanges=True ): """Function to change existing space permissions – if `isRepresentation` is `False`, a boolean list for permissions is expected.""" self.callPermissionChangePage( spaceKey ) if isRepresentation: representation = self.getBoolListFromRepresentation( representation ) try: permissionCheckboxes = self.selectObjectRowForSpacePermissionChange( category, objectName, spaceKey ) if os.getenv( "PRINT_DEBUG_INFO", "False" ).lower() in [ "1", "true", "t", "y", "yes" ]: rc = len( representation ) pcc = len( permissionCheckboxes ) if rc != pcc: print( 'Representation length ({rc}) and permission checkboxes lenght ({pcc}) do not match! Values are mapped as far as possible.'.format( rc=rc, pcc=pcc ) ) print() for i, granted in enumerate( representation ): try: element = permissionCheckboxes[ i ].find_element( By.XPATH, './/input[@type="checkbox"]' ) while element.is_selected() != granted: element.click() except: pass if submitChanges: self.submitSpacePermissionChange() except: if os.getenv( "PRINT_DEBUG_INFO", "False" ).lower() in [ "1", "true", "t", "y", "yes" ]: print( 'Object {obj} in category {cat} could not be found for space with key {sk} ...'.format( obj=objectName, cat=category, sk=spaceKey ) ) def callPermissionChangePage(self, spaceKey)-
Helper function to call permission change page for a specified Confluence space
Expand source code
def callPermissionChangePage( self, spaceKey ): """Helper function to call permission change page for a specified Confluence space""" url = '{url}spaces/editspacepermissions.action?key={space}'.format( url=self.app['url'], space=spaceKey ) if self.browser.current_url != url: self.webSudo() self.get( url ) def checkLogin(self)-
Check if login is needed
Expand source code
def checkLogin( self ): """Check if login is needed""" currentUrl = self.browser.current_url urlLen = len( self.app[ 'url' ] ) if currentUrl[ 0:urlLen ] != self.app[ 'url' ]: self.get( self.app[ 'url' ], False ) try: self.browser.find_element( By.ID, 'loginButton' ) return False except: return True def cleanupPermissions(self)-
function to clean up permissions in spaces
Expand source code
def cleanupPermissions( self ): """function to clean up permissions in spaces""" editPermissionUrl = self.app[ 'url' ] + 'spaces/editspacepermissions.action?edit=Edit+Permissions&key=' userCheckApiUrl = self.app[ 'url' ] + 'rest/api/user?username=' admingroupname = os.getenv( 'CONFLUENCE_ADMIN_GROUP', 'confluence-administrators' ) self.getSpaceKeys() session = self.toSession() resp = session.get( userCheckApiUrl + self.app[ 'user' ] ) if resp.status_code == 403: self.login() self.webSudo() session = self.toSession() for space in self.spaceKeys: self.get( editPermissionUrl + space ) rows = self.browser.find_elements( By.XPATH, '//*[@id="uPermissionsTable"]//tr')[2:] removedUsers = [] for row in rows: username = row.find_element( By.XPATH,'.//span').text[1:-1] userApiUrl = userCheckApiUrl + username resp = session.get( userApiUrl ) if resp.status_code == 200: # User exists, all fine pass elif resp.status_code == 404: removedUsers += [ username ] row.find_element( By.XPATH,'.//td[1]//button').click() # User does not exist, remove all permissions else: raise APIError( resp.status_code, userApiUrl ) time.sleep(4) self.browser.find_element( By.XPATH,'//input[@name="save" and @type="submit"]').click() time.sleep(2) self.get( editPermissionUrl + space ) rows = self.browser.find_elements( By.XPATH, '//*[@id="uPermissionsTable"]//tr')[2:] for row in rows: username = row.find_element( By.XPATH,'.//span').text[1:-1] if username in removedUsers: row.find_element( By.XPATH,'.//td[1]//button').click() self.browser.find_element( By.XPATH,'//input[@name="save" and @type="submit"]').click() time.sleep(2) self.get( editPermissionUrl + space ) rows = self.browser.find_elements( By.XPATH, '//*[@id="uPermissionsTable"]//tr')[2:] admins_added = False for row in rows: username = row.find_element( By.XPATH,'.//span').text[1:-1] if username in removedUsers: if not admins_added: groupaddfield = self.browser.find_element( By.ID,'groups-to-add-autocomplete') groupaddfield.send_keys( Keys.CONTROL, "a" ) groupaddfield.send_keys( admingroupname ) self.browser.find_element( By.XPATH, '//*[@name="groupsToAddButton" and @type="submit"]' ).click() time.sleep(2) self.browser.find_element( By.XPATH, '//*[@id="gPermissionsTable"]//*[contains(text(), "' + admingroupname + '")]//button' ).click() admins_added = True break if admins_added: rows = self.browser.find_elements( By.XPATH, '//*[@id="uPermissionsTable"]//tr')[2:] for row in rows: username = row.find_element( By.XPATH,'.//span').text[1:-1] if username in removedUsers: row.find_element( By.XPATH,'.//td[1]//button').click() self.browser.find_element( By.XPATH,'//input[@name="save" and @type="submit"]').click() time.sleep(2) def createPage(self, title, contentHtml, spaceKey, parent=None, session=None)-
helper function to create new pages within Confluence
Expand source code
def createPage( self, title, contentHtml, spaceKey, parent=None, session=None ): """helper function to create new pages within Confluence""" if parent == None: parent = getSpaceHomepageID( spaceKey ) createUrl = '{url}rest/api/content'.format( url=self.app[ 'url' ] ) createObject = { "type": "page", "title": title, "space": { "key": spaceKey }, "body": { "storage": { "value": contentHtml, "representation": "storage" } }, "ancestors": [{ "id": parent }] } response = self.sessionPostRequest( createUrl, createObject, session ) rc = response.status_code i = 1 title = createObject[ 'title' ] while rc != 200 and 'already exists' in response.json()['message'].lower(): print( ' ' + response.json()['message'] ) createObject[ 'title' ] = '{} ({})'.format( title, i ) response = self.sessionPostRequest( createUrl, createObject, session ) rc = response.status_code i += 1 if rc == 200: return [ response.json()['id'], createObject[ 'title' ] ] else: raise Exception ( "API Response ({}) was:\n\n{}".format( rc, response.content ) ) def deletePage(self, pageID, session=None)-
helper function to delete a given page
Expand source code
def deletePage( self, pageID, session=None ): """helper function to delete a given page""" if session == None: session = self.toSession( curl = True ) rsp = session.delete( '{url}rest/api/content/{id}'.format( url=self.app[ 'url' ], id=pageID ) ) return rsp def fetchPermissionsFromRow(self, row)-
helper function to fetch permissions from one table row in permission view of Confluence space
Expand source code
def fetchPermissionsFromRow( self, row ): """helper function to fetch permissions from one table row in permission view of Confluence space""" return [ item[ 'data-permission-set' ].lower() != 'false' for item in row ] def getAllSpacePermissions(self, force=False, debug=True)-
function to retrieve all space permissions in Confluence
Expand source code
def getAllSpacePermissions( self, force=False, debug=True ): """function to retrieve all space permissions in Confluence""" if force or self.permissions == {}: self.permissions = {} self.getSpaceKeys() for space in self.spaceKeys: self.permissions[ space ] = self.getSpacePermissions( space ) return self.permissions def getAllSpacePermissionsAsRepresentation(self, force=False, debug=True, trueVal='X', falseVal='-')-
Function to retrieve dictionary of permission representations. First level keys are space keys followed by second key as (like in getAllSpacePermissions) categories
GROUPS,USERSandANONYMOUS. The last key-level of the result dictionary is the granted object identifier (group name, user name oranonymous) followed by the representation.Expand source code
def getAllSpacePermissionsAsRepresentation( self, force=False, debug=True, trueVal='X', falseVal='-' ): """ Function to retrieve dictionary of permission representations. First level keys are space keys followed by second key as (like in getAllSpacePermissions) categories `GROUPS`, `USERS` and `ANONYMOUS`. The last key-level of the result dictionary is the granted object identifier (group name, user name or `anonymous`) followed by the representation. """ perms = self.getAllSpacePermissions( force, debug ) vperms = {} for spacekey, permissions in perms.items(): vperms[ spacekey ] = {} for category, specificperms in permissions.items(): vperms[ spacekey ][ category ] = {} for key, permarray in specificperms.items(): vperms[ spacekey ][ category ][ key ] = self.getPermissionRepresentation( permarray, trueVal, falseVal ) return vperms def getBoolListFromRepresentation(self, representation, trueVal='X', falseVal='-')-
revert representation of a permission row for Confluence space to boolean list
Expand source code
def getBoolListFromRepresentation( self, representation, trueVal='X', falseVal='-' ): """revert representation of a permission row for Confluence space to boolean list""" boolList = [] for c in representation: if c == trueVal: boolList.append( True ) elif c == falseVal: boolList.append( False ) else: raise Exception( 'Check your representation – trueVal (\'{trueVal}\') and falseVal (\'{falseVal}\') are not matched by \'{val}\'!'.format( trueVal=trueVal, falseVal=falseVal, val=c ) ) return boolList def getGroupMembers(self, group)-
Retrieve group member usernames from Confluence
Expand source code
def getGroupMembers( self, group ): """Retrieve group member usernames from Confluence""" sessionC = self.toSession() stop = False i = 0 users = [] limit = 50 firstRun = True while True: parameters = { 'start': ( i * limit ), 'limit': limit, } memberUrl = '{url}rest/api/group/{group}/member?{params}'.format( url=self.app[ 'url' ], group=group, params=urllib.parse.urlencode( parameters ) ) rspC = sessionC.get( memberUrl ) if rspC.status_code == 200: rsp = json.loads( rspC.text ) if firstRun and ( rsp[ 'size' ] < rsp[ 'limit' ] or limit != rsp[ 'limit' ] ): limit = rsp[ 'size' ] elif rsp[ 'size' ] < limit: stop = True for u in rsp['results']: users.append( u[ 'username' ] ) else: print('rc: ' + str(rspC.status_code)) print(parameters) stop = True if stop or len( rsp['results'] ) == 0: len( rsp['results'] ) break else: i += 1 firstRun = False return users def getGroups(self)-
Retrieve Confluence local groups
Expand source code
def getGroups( self ): """Retrieve Confluence local groups""" cgroups = [] session = self.toSession() stop = False i = 0 while True: limit = 1000 rspC = session.get('{url}rest/api/group?start={start}&limit={limit}'.format( url=self.app[ 'url' ], limit=limit, start=( i * limit ) ) ) if rspC.status_code == 200: rsp = json.loads( rspC.text ) for g in rsp['results']: cgroups.append( g['name'] ) else: stop = True if stop or len( rsp['results'] ) == 0: break else: i += 1 return cgroups def getPageContent(self, pageID, session=None)-
helper function to retrieve content of page by pageID
Expand source code
def getPageContent( self, pageID, session=None ): """helper function to retrieve content of page by pageID""" apiUrl = '{url}rest/api/content/{pid}?expand=body.storage'.format( url=self.app[ 'url' ], pid=pid ) if session == None: session = self.toSession( curl = True ) rspC = session.get( apiUrl ) if rspC.status_code == 200: rsp = json.loads( rspC.text ) return rsp[ 'body' ][ 'storage' ][ 'value' ] else: return None def getPageInfo(self, spaceKey, title, session=None)-
Function to get page information by Confluence space key and page title
Expand source code
def getPageInfo( self, spaceKey, title, session=None ): """Function to get page information by Confluence space key and page title""" self.setSessionBaseHeaders( api=True ) parameters = { 'spaceKey': spaceKey, 'title': title, 'expand': 'version,space', } apiUrl = '{url}rest/api/content?{params}'.format( url=self.app[ 'url' ], params=urllib.parse.urlencode( parameters ) ) results = self.apiGetInfo( apiUrl, session=session ) if results[ 'size' ] > 1: raise Exception( 'More than one page found – cannot proceed properly ...' ) return results[ 'results' ][ 0 ] def getPageInfoByID(self, pageID, session=None)-
Function to get page information by page ID
Expand source code
def getPageInfoByID( self, pageID, session=None ): """Function to get page information by page ID""" self.setSessionBaseHeaders( api=True ) parameters = { 'expand': 'version,space', } apiUrl = '{url}rest/api/content/{id}?{params}'.format( url=self.app[ 'url' ], id=pageID, params=urllib.parse.urlencode( parameters ) ) result = self.apiGetInfo( apiUrl, session=session ) return result def getPermissionRepresentation(self, boolList, trueVal='X', falseVal='-')-
generate a simple representation of a permission row for Confluence space
Expand source code
def getPermissionRepresentation( self, boolList, trueVal='X', falseVal='-' ): """generate a simple representation of a permission row for Confluence space""" rep = '' for x in boolList: if x: rep += trueVal else: rep += falseVal return rep def getSpaceHomepageID(self, spaceKey, session=None)-
Helper function to get page ID of homepage of Space
Expand source code
def getSpaceHomepageID( self, spaceKey, session=None ): """Helper function to get page ID of homepage of Space""" self.setSessionBaseHeaders( api=True ) parameters = { 'expand': 'homepage', 'spaceKey': spaceKey, } apiUrl = '{url}rest/api/space?{params}'.format( url=self.app[ 'url' ], id=pageID, params=urllib.parse.urlencode( parameters ) ) result = self.apiGetInfo( apiUrl, session=session ) if result != None: return result[ 'results' ][ 0 ][ 'homepage' ][ 'id' ] def getSpaceKeyFromName(self, spaceName, force=False, session=None)-
Helper function to retrieve space key by space name
Expand source code
def getSpaceKeyFromName( self, spaceName, force=False, session=None ): """Helper function to retrieve space key by space name""" if len( self.spaceNames ) == 0 or force == True: sks = self.getSpaceKeys() for sk in sks: self.spaceNames[ self.getSpaceName( sk ) ] = sk if spaceName in self.spaceNames: return self.spaceNames[ spaceName ] def getSpaceKeys(self, force=False, personalSpaces=False)-
get all space keys of Confluence spaces through API
Expand source code
def getSpaceKeys( self, force = False, personalSpaces = False ): """get all space keys of Confluence spaces through API""" # self.webSudo() session = self.toSession( curl=True ) if force or self.spaceKeys == []: self.spaceKeys = [] stop = False i = 0 users = [] while True: limit = 50 start = i * limit apiUrl = '{url}rest/api/space?start={start}&limit={limit}'.format( url=self.app[ 'url' ], start=start, limit=limit ) rspJ = session.get( apiUrl ) if rspJ.status_code == 200: rsp = json.loads( rspJ.text ) if rsp[ 'size' ] < limit: stop = True for k in rsp[ 'results' ]: if personalSpaces or k[ 'type' ] not in [ 'personal' ]: self.spaceKeys.append( k[ 'key' ] ) else: print('start: {s}, limit: {l}, rc: {r}'.format( s=start, l=limit, r=str(rspJ.status_code) ) ) stop = True if stop or len( rsp['results'] ) == 0: break else: i += 1 return self.spaceKeys def getSpaceKeysGUI(self, force=False)-
get all space keys of Confluence spaces through GUI
Expand source code
def getSpaceKeysGUI( self, force=False ): """get all space keys of Confluence spaces through GUI""" if force or self.spaceKeys == []: self.spaceKeys = [] self.get( self.app[ 'url' ] + 'spacedirectory/view.action' ) while True: soup = self.toSoup() self.spaceKeys += [ item[ 'data-spacekey' ] for item in soup.select( 'tbody tr' ) ] try: nextBtn = soup.select( '.aui-nav-next a' )[0] check = nextBtn[ 'href' ] self.browser.find_element( By.XPATH, '//*[contains(concat(" ", normalize-space(@class), " "), " aui-nav-next ")]//a' ).click() time.sleep( 5 ) except: break return self.spaceKeys def getSpaceName(self, spaceKey, session=None)-
Helper function to retrieve the space name of a Confluence Space
Expand source code
def getSpaceName( self, spaceKey, session=None ): """Helper function to retrieve the space name of a Confluence Space""" self.setSessionBaseHeaders( api=True ) apiUrl = '{url}rest/api/space/{spaceKey}'.format( url=self.app[ 'url' ], spaceKey=spaceKey ) result = self.apiGetInfo( apiUrl, session=session ) if result != None: return result[ 'name' ] def getSpacePermissions(self, spaceKey, force=False, debug=False)-
function to retrieve space permissions for one Confluence space specified through Space Key
Expand source code
def getSpacePermissions( self, spaceKey, force=False, debug=False ): """function to retrieve space permissions for one Confluence space specified through Space Key""" if force or spaceKey not in self.permissions: permissionUrl = '{url}spaces/spacepermissions.action?key={space}'.format( url=self.app[ 'url' ], space=spaceKey ) if debug: print( spaceKey ) self.get( permissionUrl ) soup = self.toSoup() if self.permissionsNames == []: mainPerms = [ item.text for item in soup.select('table.permissions tr:first-of-type')[0].find_all('th')[1:] ] i = 0 for item in soup.select('table.permissions tr:nth-of-type(2)')[0].find_all('th')[1:]: try: if 'permissions-group-start' in item[ 'class' ]: i += 1 except: pass self.permissionsNames += [ mainPerms[ i ] + ' (' + item.text + ')' ] tempPermissions = {} anonymous = soup.select('#aPermissionsTable tr:not(:first-of-type):not(:nth-of-type(2))')[0].select( 'td' ) tempPermissions[ 'ANONYMOUS' ] = { 'anonymous': self.fetchPermissionsFromRow( anonymous[1:] ) } users = soup.select('#uPermissionsTable tr:not(:first-of-type):not(:nth-of-type(2))') tempPermissions[ 'USERS' ] = {} for row in users: tds = row.select( 'td' ) tempPermissions[ 'USERS' ][ tds[0].select( 'span' )[0].text.strip()[1:-1] ] = self.fetchPermissionsFromRow( tds[1:] ) groups = soup.select('#gPermissionsTable tr:not(:first-of-type):not(:nth-of-type(2))') tempPermissions[ 'GROUPS' ] = {} for row in groups: tds = row.select( 'td' ) tempPermissions[ 'GROUPS' ][ tds[0].text.strip() ] = self.fetchPermissionsFromRow( tds[1:] ) return tempPermissions else: return self.permissions[ spaceKey ] def getUserApiInfo(self, username, returnResponse=False, session=None)-
Function to gather information about a user from API – if
returnResponseis True, the whole request response is returned, otherwise only the decoded answer objectExpand source code
def getUserApiInfo( self, username, returnResponse=False, session=None ): """Function to gather information about a user from API – if `returnResponse` is True, the whole request response is returned, otherwise only the decoded answer object""" apiUrl = '{url}rest/mobile/latest/profile/{user}'.format( url=self.app['url'], user=urllib.parse.quote( username ) ) return self.apiGetInfo( apiUrl, returnResponse, session ) def login(self, skipSSO=True)-
Run Confluence login
Expand source code
def login( self, skipSSO=True ): """Run Confluence login""" loginUrl = self.app[ 'url' ] + "dologin.action" if skipSSO: loginUrl += '?nosso' self.loadCookies( url=loginUrl ) self.setSessionBaseHeaders() userObject = { "os_username": self.app[ 'user' ], "os_password": self.app[ 'passwd' ], "os_cookie": "true", "login": "Log in", "os_destination": "", } if os.getenv( "PRINT_DEBUG_INFO", "False" ).lower() in [ "1", "true", "t", "y", "yes" ]: print( 'User is being logged in with those credentials:' ) print( userObject ) print() self.sessionPostRequest( loginUrl, userObject, True, urlOverride=True ) self.get( self.app[ 'url' ] , False ) def recoverConfluencePermissions(self)-
Function to recover confluence permissions if a system administrator has no space access. ENSURE YOUR SYSTEM LANGUAGE TO BE ENGLISH!
Expand source code
def recoverConfluencePermissions( self ): """Function to recover confluence permissions if a system administrator has no space access. ENSURE YOUR SYSTEM LANGUAGE TO BE ENGLISH!""" permsurl = '{url}/admin/permissions/viewdefaultspacepermissions.action'.format( url=self.app['url'] ) self.browser.get( permsurl ) xpath = '//a[text()="Recover Permissions"]' xpathOk = '//button[text()="OK"]' okJS = 'xPathRes = document.evaluate (\'{xpath}\', document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null); xPathRes.singleNodeValue.click();'.format( xpath=xpathOk ) listOfSpaces = [] try: element = self.browser.find_element( By.XPATH, xpath ) element.location_once_scrolled_into_view except: element = False while element: listOfSpaces += [ element.get_attribute( 'data-space-key' ) ] element.click() WebDriverWait( self.browser, 10 ).until( EC.presence_of_element_located( ( By.XPATH, xpathOk ) ) ) self.browser.execute_script( okJS ) WebDriverWait( self.browser, 10 ).until_not( EC.presence_of_element_located( ( By.ID, 'recover-permissions-dialog' ) ) ) time.sleep( 0.5 ) try: element = self.browser.find_element( By.XPATH, xpath ) element.location_once_scrolled_into_view except: element = False return listOfSpaces def removeSpacePermissions(self, spaceKey, category, objectName, submitChanges=True)-
Function to remove permissions from space
Expand source code
def removeSpacePermissions( self, spaceKey, category, objectName, submitChanges=True ): """Function to remove permissions from space""" self.callPermissionChangePage( spaceKey ) try: permissionCheckboxes = self.selectObjectRowForSpacePermissionChange( category, objectName, spaceKey ) for element in permissionCheckboxes: checkbox = element.find_element( By.XPATH, './/input[@type="checkbox"]' ) while checkbox.is_selected(): checkbox.click() if submitChanges: self.submitSpacePermissionChange() except: if os.getenv( "PRINT_DEBUG_INFO", "False" ).lower() in [ "1", "true", "t", "y", "yes" ]: print( 'Object {obj} in category {cat} could not be found for space with key {sk} ...'.format( obj=objectName, cat=category, sk=spaceKey ) ) def selectObjectRowForSpacePermissionChange(self, category, objectName, spaceKey)-
Helper function to select the correct row for changing permissions
Expand source code
def selectObjectRowForSpacePermissionChange( self, category, objectName, spaceKey ): """Helper function to select the correct row for changing permissions""" xpathPermissionCell = './/*[contains(concat(" ", normalize-space(@class), " "), " permissionCell ")]' if category == 'GROUPS': permissionRow = self.browser.find_element( By.XPATH, '//td[@data-permission-group=\'{obj}\'][1]/parent::tr'.format( obj=objectName ) ) elif category == 'USERS': permissionRow = self.browser.find_element( By.XPATH, '//td[@data-permission-user=\'{obj}\'][1]/parent::tr'.format( obj=objectName ) ) elif category == 'ANONYMOUS': permissionRow = self.browser.find_element( By.ID, 'aPermissionsTable' ) permissionRow = permissionRow.find_element( By.XPATH, '{xpath}[1]/parent::tr'.format( xpath=xpathPermissionCell) ) else: raise Exception( '{val} is an invalid category!'.format( val=category ) ) permissionCheckboxes = permissionRow.find_elements( By.XPATH, xpathPermissionCell ) pcc = len( permissionCheckboxes ) if pcc != 14: raise Exception( 'There are {c} permission checkboxes found for object {obj} in space permissions for space with key {sk}! (14 expected ...)'.format( c=pcc, obj=objectName, sk=spaceKey ) ) return permissionCheckboxes def setSessionBaseHeaders(self, api=False)-
Api requests need special headers in Confluence … sometimes …
Expand source code
def setSessionBaseHeaders( self, api=False ): """Api requests need special headers in Confluence ... sometimes ...""" if not api: self.sessionBaseHeaders = None else: self.sessionBaseHeaders = { "Content-Type": "application/json", "X-Atlassian-Token": "no-check" } def setSpacePermissions(self, spaceKey, representationDictionary, forceRefresh=False)-
Function meant to adjust space permissions to match a representation dictionary like returned by
getAllSpacePermissionsAsRepresentation.Expand source code
def setSpacePermissions( self, spaceKey, representationDictionary, forceRefresh=False ): """Function meant to adjust space permissions to match a representation dictionary like returned by `getAllSpacePermissionsAsRepresentation`.""" currentPermissions = self.getSpacePermissions( spaceKey, forceRefresh ) for category in representationDictionary: for objectName, newPermissions in representationDictionary[ category ].items(): if objectName not in currentPermissions[ category ]: self.addNewSpacePermissions( spaceKey, category, objectName, newPermissions, submitChanges=False ) else: self.adjustSpacePermissions( spaceKey, category, objectName, newPermissions, submitChanges=False ) # removing permissions has to be the last step since it could be that space permissions then lock out the current user for category in representationDictionary: for objectName in currentPermissions[ category ]: if objectName not in representationDictionary[ category ]: self.removeSpacePermissions( spaceKey, category, objectName, submitChanges=False ) self.submitSpacePermissionChange() def submitSpacePermissionChange(self)-
Submit the permission changes on space permission change page of confluence
Expand source code
def submitSpacePermissionChange( self ): """Submit the permission changes on space permission change page of confluence""" time.sleep(1) self.browser.find_element( By.XPATH, '//input[@name="save"]' ).click() def updatePage(self, newTitle, contentHtml, pageID, spaceKey, version=None, session=None)-
helper function to update a pages content
Expand source code
def updatePage( self, newTitle, contentHtml, pageID, spaceKey, version=None, session=None ): """helper function to update a pages content""" if version == None: version = self.getPageInfoByID( pageID )[ 'version' ][ 'number' ] + 1 createObject = { "id": pageID, "type": "page", "title": newTitle, "space": { "key": spaceKey }, "body": { "storage": { "value": contentHtml, "representation": "storage" } }, "version": { "number": version } } createUrl = '{url}rest/api/content/{pid}'.format( url=self.app[ 'url' ], pid=pageID ) response = self.sessionPutRequest( createUrl, createObject, session=session ) rc = response.status_code if rc == 200: return response.json()['id'] else: raise Exception ( "API Response ({}) was:\n\n{}".format( rc, response.content ) ) def webSudo(self, adjustCookies=True)-
run Confluence websudo
Expand source code
def webSudo( self, adjustCookies = True ): """run Confluence websudo""" authUrl = '{url}authenticate.action'.format( url=self.app[ 'url' ] ) sysInfoUrl = '{url}admin/viewgeneralconfig.action'.format( url=self.app[ 'url' ] ) super( Confluence, self ).webSudo( authUrl, sysInfoUrl, adjustCookies )
Inherited members
class Jira-
Jira Browser – recommended to use administrative user; otherwise most functions won't work properly
function to initiate the systems as needed
Expand source code
class Jira ( Atlassian ): """Jira Browser – recommended to use administrative user; otherwise most functions won't work properly""" appInfo = { "url": { "env": "JIRA_URL", "url": True, "description": "Enter Jira URL", }, "user": { "env": "JIRA_USER", "description": "Enter Jira username", }, "passwd": { "password": True, "env": "JIRA_PASS", "description": "Enter Jira password", }, } projectKeys = [] permissions = {} projectLeads = {} def __init__( self ): super( Jira, self ).__init__() def setSessionBaseHeaders( self, api=False ): """Api requests need special headers in Jira ... sometimes ...""" if not api: self.sessionBaseHeaders = None else: self.sessionBaseHeaders = { "Content-Type": "application/json", "X-Atlassian-Token": "no-check" } def login( self, skipSSO=True ): """Run Jira login""" loginUrl = self.app[ 'url' ] + "login.jsp" if skipSSO: loginUrl += '?nosso' self.loadCookies( url=loginUrl ) self.setSessionBaseHeaders() userObject = { "os_username": self.app[ 'user' ], "os_password": self.app[ 'passwd' ], } if os.getenv( "PRINT_DEBUG_INFO", "False" ).lower() in [ "1", "true", "t", "y", "yes" ]: print( 'User is being logged in with those credentials:' ) print( userObject ) print() self.sessionPostRequest( loginUrl, userObject, True, urlOverride=True ) self.get( self.app[ 'url' ] , False ) def checkLogin( self ): """Check if login is needed""" currentUrl = self.browser.current_url urlLen = len( self.app[ 'url' ] ) if currentUrl[ 0:urlLen ] != self.app[ 'url' ]: self.get( self.app[ 'url' ], False ) try: self.browser.find_element( By.ID, 'login' ) return False except: return True def webSudo( self, adjustCookies = True ): """Run the Jira WebSudo""" authUrl = '{url}secure/admin/WebSudoAuthenticate!default.jspa'.format( url=self.app[ 'url' ] ) sysInfoUrl = '{url}secure/admin/ViewApplicationProperties.jspa'.format( url=self.app[ 'url' ] ) super( Jira, self ).webSudo( authUrl, sysInfoUrl, adjustCookies ) def getProjectKeys( self ): """Get a list of Project Keys of those Jira projects visible to logged in user""" self.webSudo() self.get( self.app[ 'url' ] + 'secure/project/BrowseProjects.jspa' ) while True: soup = self.toSoup() self.projectKeys += [ item.text for item in soup.select( 'tbody tr .cell-type-key' ) ] try: nextBtn = soup.select( '.aui-nav-next a' )[0] check = nextBtn[ 'data-page' ] self.browser.find_element( By.XPATH, '//*[contains(concat(" ", normalize-space(@class), " "), " aui-nav-next ")]//a' ).click() time.sleep(5) except: break return self.projectKeys def getUserGroups( self, username, session=None ): """Function to gather groups of user""" apiUrl = '{url}rest/api/2/user?username={user}&expand=groups'.format( url=self.app['url'], user=urllib.parse.quote( username ) ) groups = self.apiGetInfo( apiUrl, session=session ) return [ g['name'] for g in groups['groups']['items'] ] def getUserApiInfo( self, username, returnResponse=False, session=None ): """Function to gather information about a user from API – if `returnResponse` is True, the whole request response is returned, otherwise only the decoded answer object""" apiUrl = '{url}rest/api/2/user?username={user}'.format( url=self.app['url'], user=urllib.parse.quote( username ) ) return self.apiGetInfo( apiUrl, returnResponse, session ) def getGroupApiInfo( self, groupname, returnResponse=False, session=None ): """Function to gather information about a group from API – if `returnResponse` is True, the whole request response is returned, otherwise only the decoded answer object""" apiUrl = '{url}rest/api/2/group?groupname={group}'.format( url=self.app['url'], group=urllib.parse.quote( groupname ) ) return self.apiGetInfo( apiUrl, returnResponse, session ) def gatherProjectPermissions( self, projectKey, session=None ): """Function to gather permissions for a specific Jira project""" rolesDropdownXpath = '//*[contains(concat(" ", @class, " "), " iLKsB ")]' projectPermissions = None self.get( '{url}plugins/servlet/project-config/{pjkey}/roles'.format( url=self.app['url'], pjkey=projectKey ) ) skip = True try: WebDriverWait( self.browser, 10 ).until( EC.presence_of_element_located( ( By.XPATH, '//*[contains(concat(" ", @class, " "), " EKGYc ")]' ) ) ) skip = False except: print( '{p} skipped'.format( p = projectKey ) ) if not skip: projectPermissions = {} continueLookup = True # retrieve possible roles roleBtn = WebDriverWait( self.browser, 10 ).until(EC.element_to_be_clickable((By.XPATH, '//*[contains(concat(" ", @class, " "), " css-18u3ks8 ")]'))) roleBtn.click() # wait until dropdown is visible WebDriverWait( self.browser, 10 ).until(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath))) # kteeYD (selected, single one), goEvqh (selected, multiple ones), eJTYOK (not selected) roleElements = self.browser.find_elements( By.XPATH, '//*[contains(concat(" ", @class, " "), " eJTYOk ") or contains(concat(" ", @class, " "), " goEvqh ") or contains(concat(" ", @class, " "), " kteeYD ")]' ) for rl in roleElements: role = re.match( r'^(.+?)(\s\([0-9]+\))?$' ,rl.text ).group( 1 ) projectPermissions[ role ] = {'USERS':[], 'GROUPS':[]} try: roleBtn.click() WebDriverWait( self.browser, 10 ).until_not(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath))) except: actions = ActionChains( self.browser ) actions.send_keys( Keys.ESCAPE ) actions.perform() WebDriverWait( self.browser, 10 ).until_not(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath))) while continueLookup: # find all username and groupname fields on page nameFields = self.browser.find_elements( By.XPATH, '//*[contains(concat(" ", @class, " "), " EKGYc ")]' ) # fetch users for roles for x in nameFields: n = x.text trigger = x.find_element( By.XPATH, './parent::*/parent::*/parent::*/parent::*//*[contains(concat(" ", @class, " "), " css-8xpfx5 ")]' ) resp = self.getUserApiInfo( n, True, session ) index = None if ( resp.status_code == 200 ): index = 'USERS' else: resp = self.getGroupApiInfo( n, True, session ) if ( resp.status_code == 200 ): index = 'GROUPS' if index != None: try: trigger.click() WebDriverWait( self.browser, 10 ).until(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath))) clicked = True except: actions = ActionChains( self.browser ) actions.send_keys( Keys.ESCAPE ) actions.perform() try: WebDriverWait( self.browser, 10 ).until(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath))) except Exception as e: self.browser.get_screenshot_as_file('./data/debug/error_{index}_open_{date}.png'.format( date = datetime.now().strftime("%Y%m%d_%H%M%S"), index=index )) raise e roleElements = self.browser.find_elements( By.XPATH, '//*[contains(concat(" ", @class, " "), " kteeYD ") or contains(concat(" ", @class, " "), " goEvqh ")]' ) for rl in roleElements: projectPermissions[ rl.text ][ index ].append( n ) try: trigger.click() WebDriverWait( self.browser, 10 ).until_not(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath))) clicked = True except: actions = ActionChains( self.browser ) actions.send_keys( Keys.ESCAPE ) actions.perform() try: WebDriverWait( self.browser, 10 ).until_not(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath))) except Exception as e: self.browser.get_screenshot_as_file('./data/debug/error_{index}_close_{date}.png'.format( date = datetime.now().strftime("%Y%m%d_%H%M%S"), index=index )) raise e try: self.browser.find_element( By.XPATH, '//*[@aria-label="Next" and not(@disabled)]' ).click() except: continueLookup = False return projectPermissions def getGroups( self ): """get list of groupnames existing in Jira""" session = self.toSession() rspJ = session.get( '{url}rest/api/2/groups/picker?maxResults={limit}'.format( url=self.app[ 'url' ], limit=5000 ) ) jgroups = [] if rspJ.status_code == 200: rsp = json.loads( rspJ.text ) for g in rsp['groups']: jgroups.append( g[ 'name' ] ) return jgroups def removeUserFromGroup( self, group, user, session=None ): """Remove user from group""" if session == None: session = self.toSession( curl = True ) requestData = { 'groupname': group, 'username': user } rsp = session.delete( '{url}rest/api/2/group/user?{params}'.format( url=self.app[ 'url' ], params=urllib.parse.urlencode( requestData ) ) ) return rsp def addUserToGroup( self, group, user, session=None ): """Add user to group""" if session == None: session = self.toSession( curl = True ) rsp = session.post( '{url}rest/api/2/group/user?groupname={group}'.format(url=self.app['url'], group=group), data = json.dumps({'name':user}), headers={'content-type':'application/json'} ) return rsp def getGroupMembers( self, group, activeFilter='true' ): """get group members from GUI""" self.webSudo() self.setSessionBaseHeaders() self.sessionPostRequest( '{url}secure/admin/user/UserBrowser.jspa'.format( url=self.app['url'] ), { 'userSearchFilter': '', 'group': group, 'applicationFilter': '', 'activeFilter': activeFilter, 'max': 1000000 } , True ) should = WebDriverWait( self.browser, 10 ).until( EC.presence_of_element_located( ( By.XPATH, '//*[contains(concat(" ", @class, " "), " results-count ")]//*[contains(concat(" ", @class, " "), " results-count-total ")]' ) ) ).text gms = self.browser.find_elements( By.XPATH, '//*[@data-cell-type="username"]/ancestor::tr[position() = 1]' ) if len(gms) != int(should): raise Exception('Count-Missmatch with group members of "{g}": {f} found, but {s} should have!'.format(g=group, f=len(gms), s=should)) users = [] for i in gms: users.append( i.get_attribute( "data-user" ) ) return users def getGroupMembersAPI( self, group ): """get group members from API – sometimes a little bit buggy ...""" sessionJ = self.toSession() stop = False i = 0 users = [] while True: limit = 50 parameters = { 'groupname': group, 'startAt': ( i * limit ), 'maxResults': limit, 'includeInactiveUsers': 'false', } memberUrl = '{url}rest/api/2/group/member?{params}'.format( url=self.app[ 'url' ], params=urllib.parse.urlencode( parameters ) ) rspJ = sessionJ.get( memberUrl ) if rspJ.status_code == 200: rsp = json.loads( rspJ.text ) if rsp[ 'isLast' ] == True: stop = True for u in rsp['values']: users.append( u[ 'name' ] ) else: print('rc: ' + str(rspJ.status_code)) print(parameters) stop = True if stop or len( rsp['values'] ) == 0: len( rsp['values'] ) break else: i += 1 return users def getActiveProjectLead( self, projectKey, session=None ): """Function to retrieve project lead of Jira project, if the user is still active""" if session == None: session = self.toSession() self.browser.get( '{url}plugins/servlet/project-config/{pjkey}/roles'.format( url=self.app['url'], pjkey=projectKey ) ) plo = WebDriverWait( self.browser, 10 ).until( EC.presence_of_element_located( ( By.XPATH, '//*[contains(concat(" ", @class, " "), " jKQrhX ")]//a' ) ) ) parsedProfile = urlparse( plo.get_attribute('href') ) ploUser = parse_qs(parsedProfile.query)['name'][0] resp = session.get( '{url}rest/api/2/user?username={user}'.format( url=self.app['url'], user=urllib.parse.quote( ploUser ) ) ) try: userJson = json.loads( resp.text ) active = userJson['active'] except: active = False if ( resp.status_code == 200 and active): return ploUser else: return None def getProjectLeads( self ): """Function to retrieve active project leads for all Jira projects""" if len( self.projectKeys ) == 0: self.getProjectKeys() if len( self.projectLeads ) == 0: session = self.toSession() for pk in self.projectKeys: self.projectLeads[ projectKey ] = self.getActiveProjectLead( pk, session ) return self.projectLeads def gatherAllProjectPermissions( self, deletePermissionHelperFile=True ): """Function to retrieve project permissions for all projects""" if len( self.projectKeys ) == 0: self.getProjectKeys() if len( self.permissions ) == 0: permissionHelperFile = './data/projectPermissionsHelper.json' session = self.toSession() if not os.path.exists( os.path.dirname( permissionHelperFile ) ): os.makedirs( os.path.dirname( permissionHelperFile ), exist_ok=True) if not os.path.exists( permissionHelperFile ): with open( permissionHelperFile, 'w+'): pass with open( permissionHelperFile ) as json_file: self.permissions = json.load( json_file ) for pk in self.projectKeys: if pk not in self.permissions: print( pk ) self.permissions[ pk ] = self.gatherProjectPermissions( pk, session ) with open( permissionHelperFile, 'w' ) as json_file: json.dump( self.permissions, json_file ) print() if deletePermissionHelperFile: os.remove( permissionHelperFile ) return self.permissions def createGroup( self, groupName ): """Create a local user group""" groupUrl = self.app[ 'url' ] + '/secure/admin/user/GroupBrowser.jspa' self.get( groupUrl ) field = self.browser.find_element( By.XPATH, '//input[@name="addName"]' ) field.send_keys( Keys.CONTROL, "a" ) field.send_keys( groupName ) field.send_keys( Keys.ENTER ) session = self.toSession() groupCreated = False while not groupCreated: time.sleep(1) rsp = session.get( '{url}rest/api/latest/group?groupname={group}'.format( url=self.app[ 'url' ], group=urllib.parse.quote( groupName ) ) ) groupCreated = ( rsp.status_code == 200 )Ancestors
- Atlassian
- macwinnie_pyhelpers.Browser.Browser
Class variables
var appInfovar permissionsvar projectKeysvar projectLeads
Methods
def addUserToGroup(self, group, user, session=None)-
Add user to group
Expand source code
def addUserToGroup( self, group, user, session=None ): """Add user to group""" if session == None: session = self.toSession( curl = True ) rsp = session.post( '{url}rest/api/2/group/user?groupname={group}'.format(url=self.app['url'], group=group), data = json.dumps({'name':user}), headers={'content-type':'application/json'} ) return rsp def checkLogin(self)-
Check if login is needed
Expand source code
def checkLogin( self ): """Check if login is needed""" currentUrl = self.browser.current_url urlLen = len( self.app[ 'url' ] ) if currentUrl[ 0:urlLen ] != self.app[ 'url' ]: self.get( self.app[ 'url' ], False ) try: self.browser.find_element( By.ID, 'login' ) return False except: return True def createGroup(self, groupName)-
Create a local user group
Expand source code
def createGroup( self, groupName ): """Create a local user group""" groupUrl = self.app[ 'url' ] + '/secure/admin/user/GroupBrowser.jspa' self.get( groupUrl ) field = self.browser.find_element( By.XPATH, '//input[@name="addName"]' ) field.send_keys( Keys.CONTROL, "a" ) field.send_keys( groupName ) field.send_keys( Keys.ENTER ) session = self.toSession() groupCreated = False while not groupCreated: time.sleep(1) rsp = session.get( '{url}rest/api/latest/group?groupname={group}'.format( url=self.app[ 'url' ], group=urllib.parse.quote( groupName ) ) ) groupCreated = ( rsp.status_code == 200 ) def gatherAllProjectPermissions(self, deletePermissionHelperFile=True)-
Function to retrieve project permissions for all projects
Expand source code
def gatherAllProjectPermissions( self, deletePermissionHelperFile=True ): """Function to retrieve project permissions for all projects""" if len( self.projectKeys ) == 0: self.getProjectKeys() if len( self.permissions ) == 0: permissionHelperFile = './data/projectPermissionsHelper.json' session = self.toSession() if not os.path.exists( os.path.dirname( permissionHelperFile ) ): os.makedirs( os.path.dirname( permissionHelperFile ), exist_ok=True) if not os.path.exists( permissionHelperFile ): with open( permissionHelperFile, 'w+'): pass with open( permissionHelperFile ) as json_file: self.permissions = json.load( json_file ) for pk in self.projectKeys: if pk not in self.permissions: print( pk ) self.permissions[ pk ] = self.gatherProjectPermissions( pk, session ) with open( permissionHelperFile, 'w' ) as json_file: json.dump( self.permissions, json_file ) print() if deletePermissionHelperFile: os.remove( permissionHelperFile ) return self.permissions def gatherProjectPermissions(self, projectKey, session=None)-
Function to gather permissions for a specific Jira project
Expand source code
def gatherProjectPermissions( self, projectKey, session=None ): """Function to gather permissions for a specific Jira project""" rolesDropdownXpath = '//*[contains(concat(" ", @class, " "), " iLKsB ")]' projectPermissions = None self.get( '{url}plugins/servlet/project-config/{pjkey}/roles'.format( url=self.app['url'], pjkey=projectKey ) ) skip = True try: WebDriverWait( self.browser, 10 ).until( EC.presence_of_element_located( ( By.XPATH, '//*[contains(concat(" ", @class, " "), " EKGYc ")]' ) ) ) skip = False except: print( '{p} skipped'.format( p = projectKey ) ) if not skip: projectPermissions = {} continueLookup = True # retrieve possible roles roleBtn = WebDriverWait( self.browser, 10 ).until(EC.element_to_be_clickable((By.XPATH, '//*[contains(concat(" ", @class, " "), " css-18u3ks8 ")]'))) roleBtn.click() # wait until dropdown is visible WebDriverWait( self.browser, 10 ).until(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath))) # kteeYD (selected, single one), goEvqh (selected, multiple ones), eJTYOK (not selected) roleElements = self.browser.find_elements( By.XPATH, '//*[contains(concat(" ", @class, " "), " eJTYOk ") or contains(concat(" ", @class, " "), " goEvqh ") or contains(concat(" ", @class, " "), " kteeYD ")]' ) for rl in roleElements: role = re.match( r'^(.+?)(\s\([0-9]+\))?$' ,rl.text ).group( 1 ) projectPermissions[ role ] = {'USERS':[], 'GROUPS':[]} try: roleBtn.click() WebDriverWait( self.browser, 10 ).until_not(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath))) except: actions = ActionChains( self.browser ) actions.send_keys( Keys.ESCAPE ) actions.perform() WebDriverWait( self.browser, 10 ).until_not(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath))) while continueLookup: # find all username and groupname fields on page nameFields = self.browser.find_elements( By.XPATH, '//*[contains(concat(" ", @class, " "), " EKGYc ")]' ) # fetch users for roles for x in nameFields: n = x.text trigger = x.find_element( By.XPATH, './parent::*/parent::*/parent::*/parent::*//*[contains(concat(" ", @class, " "), " css-8xpfx5 ")]' ) resp = self.getUserApiInfo( n, True, session ) index = None if ( resp.status_code == 200 ): index = 'USERS' else: resp = self.getGroupApiInfo( n, True, session ) if ( resp.status_code == 200 ): index = 'GROUPS' if index != None: try: trigger.click() WebDriverWait( self.browser, 10 ).until(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath))) clicked = True except: actions = ActionChains( self.browser ) actions.send_keys( Keys.ESCAPE ) actions.perform() try: WebDriverWait( self.browser, 10 ).until(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath))) except Exception as e: self.browser.get_screenshot_as_file('./data/debug/error_{index}_open_{date}.png'.format( date = datetime.now().strftime("%Y%m%d_%H%M%S"), index=index )) raise e roleElements = self.browser.find_elements( By.XPATH, '//*[contains(concat(" ", @class, " "), " kteeYD ") or contains(concat(" ", @class, " "), " goEvqh ")]' ) for rl in roleElements: projectPermissions[ rl.text ][ index ].append( n ) try: trigger.click() WebDriverWait( self.browser, 10 ).until_not(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath))) clicked = True except: actions = ActionChains( self.browser ) actions.send_keys( Keys.ESCAPE ) actions.perform() try: WebDriverWait( self.browser, 10 ).until_not(EC.presence_of_element_located((By.XPATH, rolesDropdownXpath))) except Exception as e: self.browser.get_screenshot_as_file('./data/debug/error_{index}_close_{date}.png'.format( date = datetime.now().strftime("%Y%m%d_%H%M%S"), index=index )) raise e try: self.browser.find_element( By.XPATH, '//*[@aria-label="Next" and not(@disabled)]' ).click() except: continueLookup = False return projectPermissions def getActiveProjectLead(self, projectKey, session=None)-
Function to retrieve project lead of Jira project, if the user is still active
Expand source code
def getActiveProjectLead( self, projectKey, session=None ): """Function to retrieve project lead of Jira project, if the user is still active""" if session == None: session = self.toSession() self.browser.get( '{url}plugins/servlet/project-config/{pjkey}/roles'.format( url=self.app['url'], pjkey=projectKey ) ) plo = WebDriverWait( self.browser, 10 ).until( EC.presence_of_element_located( ( By.XPATH, '//*[contains(concat(" ", @class, " "), " jKQrhX ")]//a' ) ) ) parsedProfile = urlparse( plo.get_attribute('href') ) ploUser = parse_qs(parsedProfile.query)['name'][0] resp = session.get( '{url}rest/api/2/user?username={user}'.format( url=self.app['url'], user=urllib.parse.quote( ploUser ) ) ) try: userJson = json.loads( resp.text ) active = userJson['active'] except: active = False if ( resp.status_code == 200 and active): return ploUser else: return None def getGroupApiInfo(self, groupname, returnResponse=False, session=None)-
Function to gather information about a group from API – if
returnResponseis True, the whole request response is returned, otherwise only the decoded answer objectExpand source code
def getGroupApiInfo( self, groupname, returnResponse=False, session=None ): """Function to gather information about a group from API – if `returnResponse` is True, the whole request response is returned, otherwise only the decoded answer object""" apiUrl = '{url}rest/api/2/group?groupname={group}'.format( url=self.app['url'], group=urllib.parse.quote( groupname ) ) return self.apiGetInfo( apiUrl, returnResponse, session ) def getGroupMembers(self, group, activeFilter='true')-
get group members from GUI
Expand source code
def getGroupMembers( self, group, activeFilter='true' ): """get group members from GUI""" self.webSudo() self.setSessionBaseHeaders() self.sessionPostRequest( '{url}secure/admin/user/UserBrowser.jspa'.format( url=self.app['url'] ), { 'userSearchFilter': '', 'group': group, 'applicationFilter': '', 'activeFilter': activeFilter, 'max': 1000000 } , True ) should = WebDriverWait( self.browser, 10 ).until( EC.presence_of_element_located( ( By.XPATH, '//*[contains(concat(" ", @class, " "), " results-count ")]//*[contains(concat(" ", @class, " "), " results-count-total ")]' ) ) ).text gms = self.browser.find_elements( By.XPATH, '//*[@data-cell-type="username"]/ancestor::tr[position() = 1]' ) if len(gms) != int(should): raise Exception('Count-Missmatch with group members of "{g}": {f} found, but {s} should have!'.format(g=group, f=len(gms), s=should)) users = [] for i in gms: users.append( i.get_attribute( "data-user" ) ) return users def getGroupMembersAPI(self, group)-
get group members from API – sometimes a little bit buggy …
Expand source code
def getGroupMembersAPI( self, group ): """get group members from API – sometimes a little bit buggy ...""" sessionJ = self.toSession() stop = False i = 0 users = [] while True: limit = 50 parameters = { 'groupname': group, 'startAt': ( i * limit ), 'maxResults': limit, 'includeInactiveUsers': 'false', } memberUrl = '{url}rest/api/2/group/member?{params}'.format( url=self.app[ 'url' ], params=urllib.parse.urlencode( parameters ) ) rspJ = sessionJ.get( memberUrl ) if rspJ.status_code == 200: rsp = json.loads( rspJ.text ) if rsp[ 'isLast' ] == True: stop = True for u in rsp['values']: users.append( u[ 'name' ] ) else: print('rc: ' + str(rspJ.status_code)) print(parameters) stop = True if stop or len( rsp['values'] ) == 0: len( rsp['values'] ) break else: i += 1 return users def getGroups(self)-
get list of groupnames existing in Jira
Expand source code
def getGroups( self ): """get list of groupnames existing in Jira""" session = self.toSession() rspJ = session.get( '{url}rest/api/2/groups/picker?maxResults={limit}'.format( url=self.app[ 'url' ], limit=5000 ) ) jgroups = [] if rspJ.status_code == 200: rsp = json.loads( rspJ.text ) for g in rsp['groups']: jgroups.append( g[ 'name' ] ) return jgroups def getProjectKeys(self)-
Get a list of Project Keys of those Jira projects visible to logged in user
Expand source code
def getProjectKeys( self ): """Get a list of Project Keys of those Jira projects visible to logged in user""" self.webSudo() self.get( self.app[ 'url' ] + 'secure/project/BrowseProjects.jspa' ) while True: soup = self.toSoup() self.projectKeys += [ item.text for item in soup.select( 'tbody tr .cell-type-key' ) ] try: nextBtn = soup.select( '.aui-nav-next a' )[0] check = nextBtn[ 'data-page' ] self.browser.find_element( By.XPATH, '//*[contains(concat(" ", normalize-space(@class), " "), " aui-nav-next ")]//a' ).click() time.sleep(5) except: break return self.projectKeys def getProjectLeads(self)-
Function to retrieve active project leads for all Jira projects
Expand source code
def getProjectLeads( self ): """Function to retrieve active project leads for all Jira projects""" if len( self.projectKeys ) == 0: self.getProjectKeys() if len( self.projectLeads ) == 0: session = self.toSession() for pk in self.projectKeys: self.projectLeads[ projectKey ] = self.getActiveProjectLead( pk, session ) return self.projectLeads def getUserApiInfo(self, username, returnResponse=False, session=None)-
Function to gather information about a user from API – if
returnResponseis True, the whole request response is returned, otherwise only the decoded answer objectExpand source code
def getUserApiInfo( self, username, returnResponse=False, session=None ): """Function to gather information about a user from API – if `returnResponse` is True, the whole request response is returned, otherwise only the decoded answer object""" apiUrl = '{url}rest/api/2/user?username={user}'.format( url=self.app['url'], user=urllib.parse.quote( username ) ) return self.apiGetInfo( apiUrl, returnResponse, session ) def getUserGroups(self, username, session=None)-
Function to gather groups of user
Expand source code
def getUserGroups( self, username, session=None ): """Function to gather groups of user""" apiUrl = '{url}rest/api/2/user?username={user}&expand=groups'.format( url=self.app['url'], user=urllib.parse.quote( username ) ) groups = self.apiGetInfo( apiUrl, session=session ) return [ g['name'] for g in groups['groups']['items'] ] def login(self, skipSSO=True)-
Run Jira login
Expand source code
def login( self, skipSSO=True ): """Run Jira login""" loginUrl = self.app[ 'url' ] + "login.jsp" if skipSSO: loginUrl += '?nosso' self.loadCookies( url=loginUrl ) self.setSessionBaseHeaders() userObject = { "os_username": self.app[ 'user' ], "os_password": self.app[ 'passwd' ], } if os.getenv( "PRINT_DEBUG_INFO", "False" ).lower() in [ "1", "true", "t", "y", "yes" ]: print( 'User is being logged in with those credentials:' ) print( userObject ) print() self.sessionPostRequest( loginUrl, userObject, True, urlOverride=True ) self.get( self.app[ 'url' ] , False ) def removeUserFromGroup(self, group, user, session=None)-
Remove user from group
Expand source code
def removeUserFromGroup( self, group, user, session=None ): """Remove user from group""" if session == None: session = self.toSession( curl = True ) requestData = { 'groupname': group, 'username': user } rsp = session.delete( '{url}rest/api/2/group/user?{params}'.format( url=self.app[ 'url' ], params=urllib.parse.urlencode( requestData ) ) ) return rsp def setSessionBaseHeaders(self, api=False)-
Api requests need special headers in Jira … sometimes …
Expand source code
def setSessionBaseHeaders( self, api=False ): """Api requests need special headers in Jira ... sometimes ...""" if not api: self.sessionBaseHeaders = None else: self.sessionBaseHeaders = { "Content-Type": "application/json", "X-Atlassian-Token": "no-check" } def webSudo(self, adjustCookies=True)-
Run the Jira WebSudo
Expand source code
def webSudo( self, adjustCookies = True ): """Run the Jira WebSudo""" authUrl = '{url}secure/admin/WebSudoAuthenticate!default.jspa'.format( url=self.app[ 'url' ] ) sysInfoUrl = '{url}secure/admin/ViewApplicationProperties.jspa'.format( url=self.app[ 'url' ] ) super( Jira, self ).webSudo( authUrl, sysInfoUrl, adjustCookies )
Inherited members