Source code for openreview.api.client

#!/usr/bin/python
from __future__ import absolute_import, division, print_function, unicode_literals
import datetime
import sys
if sys.version_info[0] < 3:
    string_types = [str, unicode]
else:
    string_types = [str]

from .. import tools
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import pprint
import os
import re
import time
import jwt
import traceback
from ..openreview import Profile
from ..openreview import OpenReviewException
from .. import tools

class LogRetry(Retry):
     
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)   

    def increment(self, method=None, url=None, response=None, error=None, _pool=None, _stacktrace=None):
        # Log retry information before calling the parent class method
        response_string = 'no response'
        if response:
            if 'application/json' in response.headers.get('Content-Type'):
                response_string = response.json()
            if response.text:
                response_string = response.text
            else:
                response_string = response.reason
        print(f"Retrying request: {method} {url}, response: {response_string}, error: {error}")

        # Call the parent class method to perform the actual retry increment
        return super().increment(method=method, url=url, response=response, error=error, _pool=_pool, _stacktrace=_stacktrace)
    

[docs] class OpenReviewClient(object): """ :param baseurl: URL to the host, example: https://api.openreview.net (should be replaced by 'host' name). If none is provided, it defaults to the environment variable `OPENREVIEW_BASEURL` :type baseurl: str, optional :param username: OpenReview username. If none is provided, it defaults to the environment variable `OPENREVIEW_USERNAME` :type username: str, optional :param password: OpenReview password. If none is provided, it defaults to the environment variable `OPENREVIEW_PASSWORD` :type password: str, optional :param token: Session token. This token can be provided instead of the username and password if the user had already logged in :type token: str, optional :param expiresIn: Time in seconds before the token expires. If none is set the value will be set automatically to one hour. The max value that it can be set to is 1 week. :type expiresIn: number, optional """ def __init__(self, baseurl = None, username = None, password = None, token= None, tokenExpiresIn=None): self.baseurl = baseurl if not self.baseurl: self.baseurl = os.environ.get('OPENREVIEW_BASEURL', 'http://localhost:3001') self.groups_url = self.baseurl + '/groups' self.login_url = self.baseurl + '/login' self.register_url = self.baseurl + '/register' self.alternate_confirm_url = self.baseurl + '/user/confirm' self.invitations_url = self.baseurl + '/invitations' self.mail_url = self.baseurl + '/mail' self.notes_url = self.baseurl + '/notes' self.tags_url = self.baseurl + '/tags' self.edges_url = self.baseurl + '/edges' self.bulk_edges_url = self.baseurl + '/edges/bulk' self.edges_count_url = self.baseurl + '/edges/count' self.edges_rename = self.baseurl + '/edges/rename' self.edges_archive = self.baseurl + '/edges/archive' self.profiles_url = self.baseurl + '/profiles' self.profiles_search_url = self.baseurl + '/profiles/search' self.profiles_merge_url = self.baseurl + '/profiles/merge' self.profiles_rename = self.baseurl + '/profiles/rename' self.profiles_moderate = self.baseurl + '/profile/moderate' self.reference_url = self.baseurl + '/references' self.tilde_url = self.baseurl + '/tildeusername' self.pdf_url = self.baseurl + '/pdf' self.pdf_revisions_url = self.baseurl + '/references/pdf' self.messages_url = self.baseurl + '/messages' self.messages_direct_url = self.baseurl + '/messages/direct' self.process_logs_url = self.baseurl + '/logs/process' self.institutions_url = self.baseurl + '/settings/institutions' self.jobs_status = self.baseurl + '/jobs/status' self.venues_url = self.baseurl + '/venues' self.note_edits_url = self.baseurl + '/notes/edits' self.invitation_edits_url = self.baseurl + '/invitations/edits' self.group_edits_url = self.baseurl + '/groups/edits' self.user_agent = 'OpenReviewPy/v' + str(sys.version_info[0]) self.limit = 1000 self.token = token.replace('Bearer ', '') if token else None self.profile = None self.headers = { 'User-Agent': self.user_agent, 'Accept': 'application/json' } retry_strategy = LogRetry(total=3, backoff_factor=1, status_forcelist=[ 500, 502, 503, 504 ], respect_retry_after_header=False) self.session = requests.Session() adapter = HTTPAdapter(max_retries=retry_strategy) self.session.mount('https://', adapter) self.session.mount('http://', adapter) if self.token: self.headers['Authorization'] = 'Bearer ' + self.token self.user = jwt.decode(self.token, "secret", algorithms=["HS256"], issuer="openreview", options={"verify_signature": False}) try: self.profile = self.get_profile() except: self.profile = None else: if not username: username = os.environ.get('OPENREVIEW_USERNAME') if not password: password = os.environ.get('OPENREVIEW_PASSWORD') if username or password: self.login_user(username, password, expiresIn=tokenExpiresIn) ## PRIVATE FUNCTIONS def __handle_token(self, response): self.token = str(response['token']) self.profile = Profile( id = response['user']['profile']['id'] ) self.headers['Authorization'] ='Bearer ' + self.token self.user = jwt.decode(self.token, "secret", algorithms=["HS256"], issuer="openreview", options={"verify_signature": False}) return response def __handle_response(self,response): try: response.raise_for_status() return response except requests.exceptions.HTTPError as e: if 'application/json' in response.headers.get('Content-Type'): error = response.json() elif response.text: error = { 'name': 'Error', 'message': response.text } else: error = { 'name': 'Error', 'message': response.reason } raise OpenReviewException(error) ## PUBLIC FUNCTIONS def impersonate(self, group_id): response = self.session.post(self.baseurl + '/impersonate', json={ 'groupId': group_id }, headers=self.headers) response = self.__handle_response(response) json_response = response.json() self.__handle_token(json_response) return json_response
[docs] def login_user(self,username=None, password=None, expiresIn=None): """ Logs in a registered user :param username: OpenReview username :type username: str, optional :param password: OpenReview password :type password: str, optional :return: Dictionary containing user information and the authentication token :rtype: dict """ user = { 'id': username, 'password': password, 'expiresIn': expiresIn } response = self.session.post(self.login_url, headers=self.headers, json=user) response = self.__handle_response(response) json_response = response.json() self.__handle_token(json_response) return json_response
[docs] def register_user(self, email = None, fullname = None, password = None): """ Registers a new user :param email: email that will be used as id to log in after the user is registered :type email: str, optional :param fullname: Full name of the user :type fullname: str, optional :param password: Password used to log into OpenReview :type password: str, optional :return: Dictionary containing the new user information including his id, username, email(s), readers, writers, etc. :rtype: dict """ register_payload = { 'email': email, 'fullname': fullname, 'password': password } response = self.session.post(self.register_url, json = register_payload, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def activate_user(self, token, content): """ Activates a newly registered user :param token: Activation token. If running in localhost, use email as token :type token: str :param content: Content of the profile to activate :type content: dict :return: Dictionary containing user information and the authentication token :rtype: dict Example: >>> res = client.activate_user('new@user.com', { 'names': [ { 'first': 'New', 'last': 'User', 'username': '~New_User1' } ], 'emails': ['new@user.com'], 'preferredEmail': 'new@user.com' }) """ response = self.session.put(self.baseurl + '/activate/' + token, json = { 'content': content }, headers = self.headers) response = self.__handle_response(response) json_response = response.json() self.__handle_token(json_response) return json_response
[docs] def confirm_alternate_email(self, profile_id, alternate_email): """ Confirms an alternate email address :param profile_id: id of the profile :type profile_id: str :param alternate_email: email address to confirm :type alternate_email: str :return: Dictionary containing the profile information :rtype: dict """ response = self.session.post(self.alternate_confirm_url, json = { 'username': profile_id, 'alternate': alternate_email }, headers = self.headers) response = self.__handle_response(response) return response.json()
def get_activatable(self, token = None): response = self.session.get(self.baseurl + '/activatable/' + token, params = {}, headers = self.headers) response = self.__handle_response(response) self.__handle_token(response.json()['activatable']) return self.token
[docs] def get_institutions(self, id=None, domain=None): """ Get a single Institution by id or domain if available :param id: id of the Institution as saved in the database :type id: str :param domain: domain of the Institution :type domain: str :return: Dictionary with the Institution information :rtype: dict Example: >>> institution = client.get_institutions(domain='umass.edu') """ params = {} if id: params['id'] = id if domain: params['domain'] = domain response = self.session.get(self.institutions_url, params = tools.format_params(params), headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def get_group(self, id): """ Get a single Group by id if available :param id: id of the group :type id: str :return: Dictionary with the group information :rtype: Group Example: >>> group = client.get_group('your-email@domain.com') """ response = self.session.get(self.groups_url, params = {'id':id}, headers = self.headers) response = self.__handle_response(response) g = response.json()['groups'][0] group = Group.from_json(g) if group.anonids: anon_prefix = (group.id[:-1] if group.id.endswith('s') else group.id) + '_' members_by_anonid = { g.id:g.members[0] for g in self.get_groups(prefix=anon_prefix) if g.members } members = [] anon_members = [] for member in group.members: if member in members_by_anonid: anon_members.append(member) members.append(members_by_anonid[member]) else: members.append(member) group.anon_members = anon_members group.members = members return group
[docs] def get_invitation(self, id): """ Get a single invitation by id if available :param id: id of the invitation :type id: str :return: Invitation matching the passed id :rtype: Invitation """ response = self.session.get(self.invitations_url, params = {'id': id}, headers = self.headers) response = self.__handle_response(response) i = response.json()['invitations'][0] return Invitation.from_json(i)
[docs] def get_note(self, id, details=None): """ Get a single Note by id if available :param id: id of the note :type id: str :return: Note matching the passed id :rtype: Note """ response = self.session.get(self.notes_url, params = {'id':id, 'details': details}, headers = self.headers) response = self.__handle_response(response) n = response.json()['notes'][0] return Note.from_json(n)
[docs] def get_tag(self, id): """ Get a single Tag by id if available :param id: id of the Tag :type id: str :return: Tag with the Tag information :rtype: Tag """ response = self.session.get(self.tags_url, params = {'id': id}, headers = self.headers) response = self.__handle_response(response) t = response.json()['tags'][0] return Tag.from_json(t)
[docs] def get_edge(self, id, trash=False): """ Get a single Edge by id if available :param id: id of the Edge :type id: str return: Edge object with its information :rtype: Edge """ response = self.session.get(self.edges_url, params = {'id': id, 'trash': 'true' if trash == True else 'false'}, headers=self.headers) response = self.__handle_response(response) edges = response.json()['edges'] if edges: return Edge.from_json(edges[0]) else: raise OpenReviewException('Edge not found')
[docs] def get_profile(self, email_or_id = None): """ Get a single Profile by id, if available :param email_or_id: e-mail or id of the profile :type email_or_id: str, optional :return: Profile object with its information :rtype: Profile """ params = {} if email_or_id: tildematch = re.compile('~.+') if tildematch.match(email_or_id): att = 'id' else: att = 'email' params[att] = email_or_id response = self.session.get(self.profiles_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) profiles = response.json()['profiles'] if profiles: return Profile.from_json(profiles[0]) else: raise OpenReviewException(['Profile Not Found'])
[docs] def get_profiles(self, trash=None, with_blocked=None, offset=None, limit=None, sort=None): """ Get a list of Profiles :param trash: Indicates if the returned profiles are trashed :type trash: bool, optional :param with_blocked: Indicates if the returned profiles are blocked :type with_blocked: bool, optional :param offset: Indicates the position to start retrieving Profiles :type offset: int, optional :param limit: Maximum amount of Profiles that this method will return :type limit: int, optional :return: List of Profile objects :rtype: list[Profile] """ params = {} if trash == True: params['trash'] = True if with_blocked == True: params['withBlocked'] = True if offset is not None: params['offset'] = offset if limit is not None: params['limit'] = limit if sort is not None: params['sort'] = sort response = self.session.get(self.profiles_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) return [Profile.from_json(p) for p in response.json()['profiles']]
[docs] def search_profiles(self, confirmedEmails = None, emails = None, ids = None, term = None, first = None, middle = None, last = None, fullname=None, relation=None, use_ES = False): """ Gets a list of profiles using either their ids or corresponding emails :param confirmedEmails: List of confirmed emails registered in OpenReview :type confirmedEmails: list, optional :param emails: List of emails registered in OpenReview :type emails: list, optional :param ids: List of OpenReview username ids :type ids: list, optional :param term: Substring in the username or e-mail to be searched :type term: str, optional :param first: First name of user :type first: str, optional :param middle: Middle name of user :type middle: str, optional :param last: Last name of user :type last: str, optional :return: List of profiles, if emails is present then a dictionary of { email: profiles } is returned. If confirmedEmails is present then a dictionary of { email: profile } is returned :rtype: list[Profile] """ def batches(items, batch_size=1000): batch = [] for item in items: if len(batch) == batch_size: yield batch batch = [] batch.append(item) if batch: yield batch if term: response = self.session.get(self.profiles_search_url, params = { 'term': term, 'es': 'true' if use_ES else 'false' }, headers = self.headers) response = self.__handle_response(response) return [Profile.from_json(p) for p in response.json()['profiles']] if fullname: response = self.session.get(self.profiles_search_url, params = { 'fullname': fullname, 'es': 'true' if use_ES else 'false' }, headers = self.headers) response = self.__handle_response(response) return [Profile.from_json(p) for p in response.json()['profiles']] if emails: full_response = [] for email_batch in batches(emails): response = self.session.post(self.profiles_search_url, json = {'emails': email_batch}, headers = self.headers) response = self.__handle_response(response) full_response.extend(response.json()['profiles']) profiles_by_email = {} for p in full_response: if p['email'] not in profiles_by_email: profiles_by_email[p['email']] = [] profiles_by_email[p['email']].append(Profile.from_json(p)) return profiles_by_email if confirmedEmails: full_response = [] for email_batch in batches(confirmedEmails): response = self.session.post(self.profiles_search_url, json = {'emails': email_batch}, headers = self.headers) response = self.__handle_response(response) full_response.extend(response.json()['profiles']) profiles_by_email = {} for p in full_response: profile = Profile.from_json(p) if p['email'] in profile.content['emailsConfirmed']: profiles_by_email[p['email']] = profile return profiles_by_email if ids: full_response = [] for id_batch in batches(ids): response = self.session.post(self.profiles_search_url, json = {'ids': id_batch}, headers = self.headers) response = self.__handle_response(response) full_response.extend(response.json()['profiles']) return [Profile.from_json(p) for p in full_response] if first or middle or last: response = self.session.get(self.profiles_url, params = {'first': first, 'middle': middle, 'last': last, 'es': 'true' if use_ES else 'false'}, headers = self.headers) response = self.__handle_response(response) return [Profile.from_json(p) for p in response.json()['profiles']] if relation: response = self.session.get(self.profiles_url, params = {'relation': relation }, headers = self.headers) response = self.__handle_response(response) return [Profile.from_json(p) for p in response.json()['profiles']] return []
[docs] def get_pdf(self, id, is_reference=False): """ Gets the binary content of a pdf using the provided note/reference id If the pdf is not found then this returns an error message with "status":404. Use the note id when trying to get the latest pdf version and reference id when trying to get a previous version of the pdf :param id: Note id or Reference id of the pdf :type id: str :param is_reference: Indicates that the passed id is a reference id instead of a note id :type is_reference: bool, optional :return: The binary content of a pdf :rtype: bytes Example: >>> f = get_pdf(id='Place Note-ID here') >>> with open('output.pdf','wb') as op: op.write(f) """ params = {} params['id'] = id headers = self.headers.copy() headers['content-type'] = 'application/pdf' url = self.pdf_revisions_url if is_reference else self.pdf_url response = self.session.get(url, params=tools.format_params(params), headers = headers) response = self.__handle_response(response) return response.content
[docs] def get_attachment(self, id, field_name): """ Gets the binary content of a attachment using the provided note id If the pdf is not found then this returns an error message with "status":404. :param id: Note id or Reference id of the pdf :type id: str :param field_name: name of the field associated with the attachment file :type field_name: str :return: The binary content of a pdf :rtype: bytes Example: >>> f = get_attachment(id='Place Note-ID here', field_name='pdf') >>> with open('output.pdf','wb') as op: op.write(f) """ response = self.session.get(self.baseurl + '/attachment', params = { 'id': id, 'name': field_name }, headers = self.headers) response = self.__handle_response(response) return response.content
[docs] def get_venues(self, id=None, ids=None, invitations=None): """ Gets list of Note objects based on the filters provided. The Notes that will be returned match all the criteria passed in the parameters. :param id: a Venue ID. If provided, returns Notes whose ID matches the given ID. :type id: str, optional :param ids: A list of Venue IDs. If provided, returns Notes containing these IDs. :type ids: list, optional :param invitations: A list of Invitation IDs. If provided, returns Venues whose "invitation" field is this Invitation ID. :type invitations: list, optional :return: List of Venues :rtype: list[dict] """ params = {} if id is not None: params['id'] = id if ids is not None: params['ids'] = ','.join(ids) if invitations is not None: params['invitations'] = ','.join(invitations) response = self.session.get(self.venues_url, params=tools.format_params(params), headers=self.headers) response = self.__handle_response(response) return response.json()['venues']
[docs] def put_attachment(self, file_path, invitation, name): """ Uploads a file to the openreview server :param file: Path to the file :type file: str :param invitation: Invitation of the note that required the attachment :type file: str :param file: name of the note field to save the attachment url :type file: str :return: A relative URL for the uploaded file :rtype: str """ headers = self.headers.copy() with open(file_path, 'rb') as f: response = self.session.put(self.baseurl + '/attachment', files=( ('invitationId', (None, invitation)), ('name', (None, name)), ('file', (file_path, f)) ), headers = headers) response = self.__handle_response(response) return response.json()['url']
[docs] def post_profile(self, profile): """ Updates a Profile :param profile: Profile object :type profile: Profile :return: The new updated Profile :rtype: Profile """ response = self.session.post( self.profiles_url, json = profile.to_json(), headers = self.headers) response = self.__handle_response(response) return Profile.from_json(response.json())
[docs] def rename_profile(self, current_id, new_id): """ Updates a the profile id of a Profile :param current_id: Current profile id :type profile: str :param new_id: New profile id :type profile: str :return: The new updated Profile :rtype: Profile """ response = self.session.post( self.profiles_rename, json = { 'currentId': current_id, 'newId': new_id }, headers = self.headers) response = self.__handle_response(response) return Profile.from_json(response.json())
[docs] def merge_profiles(self, profileTo, profileFrom): """ Merges two Profiles :param profileTo: Profile object to merge to :type profileTo: Profile :param profileFrom: Profile object to merge from (this profile will be deleted) :type: profileFrom: Profile :return: The new updated Profile :rtype: Profile """ response = self.session.post( self.profiles_merge_url, json = { 'to': profileTo, 'from': profileFrom }, headers = self.headers) response = self.__handle_response(response) return Profile.from_json(response.json())
[docs] def moderate_profile(self, profile_id, decision): """ Updates a Profile :param profile: Profile object :type profile: Profile :return: The new updated Profile :rtype: Profile """ response = self.session.post( self.profiles_moderate, json = { 'id': profile_id, 'decision': decision }, headers = self.headers) response = self.__handle_response(response) return Profile.from_json(response.json())
[docs] def get_groups(self, id=None, prefix=None, member=None, signatory=None, web=None, limit=None, offset=None, after=None, stream=None, sort=None, with_count=False): """ Gets list of Group objects based on the filters provided. The Groups that will be returned match all the criteria passed in the parameters. :param id: id of the Group :type id: str, optional :param prefix: Prefix that matches several Group ids :type prefix: str, optional :param member: Groups that contain this member :type member: str, optional :param signatory: Groups that contain this signatory :type signatory: str, optional :param web: Groups that contain a web field value :type web: bool, optional :param limit: Maximum amount of Groups that this method will return. The limit parameter can range between 0 and 1000 inclusive. If a bigger number is provided, only 1000 Groups will be returned :type limit: int, optional :param offset: Indicates the position to start retrieving Groups. For example, if there are 10 Groups and you want to obtain the last 3, then the offset would need to be 7. :type offset: int, optional :return: List of Groups :rtype: list[Group] """ params = {} if id is not None: params['id'] = id if prefix is not None: params['prefix'] = prefix if member is not None: params['member'] = member if signatory is not None: params['signatory'] = signatory if sort is not None: params['sort'] = sort if web is not None: params['web'] = web if limit is not None: params['limit'] = limit if offset is not None: params['offset'] = offset if after is not None: params['after'] = after if stream is not None: params['stream'] = stream response = self.session.get(self.groups_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) groups = [Group.from_json(g) for g in response.json()['groups']] if with_count and params.get('offset') is None: return groups, response.json()['count'] return groups
[docs] def get_all_groups(self, id=None, parent=None, prefix=None, member=None, domain=None, signatory=None, web=None, sort=None, with_count=False): """ Gets list of Group objects based on the filters provided. The Groups that will be returned match all the criteria passed in the parameters. :param id: id of the Group :type id: str, optional :param parent: id of the parent Group :type parent: str, optional :param prefix: Prefix that matches several Group ids :type prefix: str, optional :param member: Groups that contain this member :type member: str, optional :param signatory: Groups that contain this signatory :type signatory: str, optional :param web: Groups that contain a web field value :type web: bool, optional :param limit: Maximum amount of Groups that this method will return. The limit parameter can range between 0 and 1000 inclusive. If a bigger number is provided, only 1000 Groups will be returned :type limit: int, optional :param offset: Indicates the position to start retrieving Groups. For example, if there are 10 Groups and you want to obtain the last 3, then the offset would need to be 7. :type offset: int, optional :param after: Group id to start getting the list of groups from. :type after: str, optional :return: List of Groups :rtype: list[Group] """ params = { 'stream': True } if id is not None: params['id'] = id if parent is not None: params['parent'] = parent if prefix is not None: params['prefix'] = prefix if member is not None: params['member'] = member if signatory is not None: params['signatory'] = signatory if domain is not None: params['domain'] = domain if web is not None: params['web'] = web if sort is not None: params['sort'] = sort if with_count is not None: params['with_count'] = with_count return self.get_groups(**params)
[docs] def get_invitations(self, id = None, ids = None, invitee = None, replytoNote = None, replyForum = None, signature = None, note = None, prefix = None, tags = None, limit = None, offset = None, after = None, minduedate = None, duedate = None, pastdue = None, replyto = None, details = None, expired = None, sort = None, type = None, with_count=False, invitation = None ): """ Gets list of Invitation objects based on the filters provided. The Invitations that will be returned match all the criteria passed in the parameters. :param id: id of the Invitation :type id: str, optional :param ids: Comma separated Invitation IDs. If provided, returns invitations whose "id" value is any of the passed Invitation IDs. :type ids: str, optional :param invitee: Invitations that contain this invitee :type invitee: str, optional :param replytoNote: Invitations that contain this replytoNote :type replytoNote: str, optional :param replyForum: Invitations that contain this replyForum :type replyForum: str, optional :param signature: Invitations that contain this signature :type signature: optional :param note: Invitations that contain this note :type note: str, optional :param prefix: Invitation ids that match this prefix :type prefix: str, optional :param tags: Invitations that contain these tags :type tags: Tag, optional :param int limit: Maximum amount of Invitations that this method will return. The limit parameter can range between 0 and 1000 inclusive. If a bigger number is provided, only 1000 Invitations will be returned :type limit: int, optional :param int offset: Indicates the position to start retrieving Invitations. For example, if there are 10 Invitations and you want to obtain the last 3, then the offset would need to be 7. :type offset: int, optional :param after: Invitation id to start getting the list of invitations from. :type after: str, optional :param minduedate: Invitations that have at least this value as due date :type minduedate: int, optional :param duedate: Invitations that contain this due date :type duedate: int, optional :param pastdue: Invitaions that are past due :type pastdue: bool, optional :param replyto: Invitations that contain this replyto :type replyto: optional :param details: TODO: What is a valid value for this field? :type details: dict, optional :param expired: If true, retrieves the Invitations that have expired, otherwise, the ones that have not expired :type expired: bool, optional :return: List of Invitations :rtype: list[Invitation] """ params = {} if id is not None: params['id'] = id if ids is not None: params['ids'] = ids if invitee is not None: params['invitee'] = invitee if replytoNote is not None: params['replytoNote'] = replytoNote if replyForum is not None: params['replyForum'] = replyForum if signature is not None: params['signature'] = signature if note is not None: params['note']=note if prefix is not None: params['prefix'] = prefix if tags is not None: params['tags'] = tags if minduedate is not None: params['minduedate'] = minduedate if replyto is not None: params['replyto'] = replyto if duedate is not None: params['duedate'] = duedate if pastdue is not None: params['pastdue'] = pastdue if details is not None: params['details'] = details if limit is not None: params['limit'] = limit if offset is not None: params['offset'] = offset if after is not None: params['after'] = after if sort is not None: params['sort'] = sort if expired is not None: params['expired'] = expired if type is not None: params['type'] = type if invitation is not None: params['invitation'] = invitation response = self.session.get(self.invitations_url, params=tools.format_params(params), headers=self.headers) response = self.__handle_response(response) invitations = [Invitation.from_json(i) for i in response.json()['invitations']] if with_count and params.get('offset') is None: return invitations, response.json()['count'] return invitations
[docs] def get_all_invitations(self, id = None, ids = None, invitee = None, replytoNote = None, replyForum = None, signature = None, note = None, prefix = None, tags = None, minduedate = None, duedate = None, pastdue = None, replyto = None, details = None, expired = None, sort = None, type = None, with_count=False, invitation = None ): """ Gets list of Invitation objects based on the filters provided. The Invitations that will be returned match all the criteria passed in the parameters. :param id: id of the Invitation :type id: str, optional :param ids: Comma separated Invitation IDs. If provided, returns invitations whose "id" value is any of the passed Invitation IDs. :type ids: str, optional :param invitee: Invitations that contain this invitee :type invitee: str, optional :param replytoNote: Invitations that contain this replytoNote :type replytoNote: str, optional :param replyForum: Invitations that contain this replyForum :type replyForum: str, optional :param signature: Invitations that contain this signature :type signature: optional :param note: Invitations that contain this note :type note: str, optional :param prefix: Invitation ids that match this prefix :type prefix: str, optional :param tags: Invitations that contain these tags :type tags: Tag, optional :param minduedate: Invitations that have at least this value as due date :type minduedate: int, optional :param duedate: Invitations that contain this due date :type duedate: int, optional :param pastdue: Invitaions that are past due :type pastdue: bool, optional :param replyto: Invitations that contain this replyto :type replyto: optional :param details: TODO: What is a valid value for this field? :type details: dict, optional :param expired: If true, retrieves the Invitations that have expired, otherwise, the ones that have not expired :type expired: bool, optional :return: List of Invitations :rtype: list[Invitation] """ params = {} if id is not None: params['id'] = id if ids is not None: params['ids'] = ids if invitee is not None: params['invitee'] = invitee if replytoNote is not None: params['replytoNote'] = replytoNote if replyForum is not None: params['replyForum'] = replyForum if signature is not None: params['signature'] = signature if note is not None: params['note'] = note if prefix is not None: params['prefix'] = prefix if tags is not None: params['tags'] = tags if minduedate is not None: params['minduedate'] = minduedate if duedate is not None: params['duedate'] = duedate if pastdue is not None: params['pastdue'] = pastdue if replyto is not None: params['replyto'] = replyto if details is not None: params['details'] = details if expired is not None: params['expired'] = expired if sort is not None: params['sort'] = sort if type is not None: params['type'] = type if with_count is not None: params['with_count'] = with_count if invitation is not None: params['invitation'] = invitation return list(tools.efficient_iterget(self.get_invitations, desc='Getting V2 Invitations', **params))
[docs] def get_invitation_edit(self, id): """ Get a single edit by id if available :param id: id of the edit :type id: str :return: edit matching the passed id :rtype: Note """ response = self.session.get(self.invitation_edits_url, params = {'id':id}, headers = self.headers) response = self.__handle_response(response) n = response.json()['edits'][0] return Edit.from_json(n)
[docs] def get_invitation_edits(self, invitation_id = None, invitation = None, with_count=False, sort=None): """ Gets a list of edits for a note. The edits that will be returned match all the criteria passed in the parameters. :return: List of edits :rtype: list[Edit] """ params = {} if invitation_id: params['invitation.id'] = invitation_id if invitation: params['invitation'] = invitation if sort: params['sort'] = sort response = self.session.get(self.invitation_edits_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) edits = [Edit.from_json(n) for n in response.json()['edits']] if with_count and params.get('offset') is None: return edits, response.json()['count'] return edits
[docs] def get_notes(self, id = None, paperhash = None, forum = None, invitation = None, replyto = None, tauthor = None, signature = None, transitive_members = None, signatures = None, writer = None, trash = None, number = None, content = None, limit = None, offset = None, after = None, mintcdate = None, details = None, sort = None, with_count=False ): """ Gets list of Note objects based on the filters provided. The Notes that will be returned match all the criteria passed in the parameters. :param id: a Note ID. If provided, returns Notes whose ID matches the given ID. :type id: str, optional :param paperhash: A "paperhash" for a note. If provided, returns Notes whose paperhash matches this argument. (A paperhash is a human-interpretable string built from the Note's title and list of authors to uniquely identify the Note) :type paperhash: str, optional :param forum: A Note ID. If provided, returns Notes whose forum matches the given ID. :type forum: str, optional :param invitation: An Invitation ID. If provided, returns Notes whose "invitation" field is this Invitation ID. :type invitation: str, optional :param replyto: A Note ID. If provided, returns Notes whose replyto field matches the given ID. :type replyto: str, optional :param tauthor: A Group ID. If provided, returns Notes whose tauthor field ("true author") matches the given ID, or is a transitive member of the Group represented by the given ID. :type tauthor: str, optional :param signature: A Group ID. If provided, returns Notes whose signatures field contains the given Group ID. :type signature: str, optional :param transitive_members: If true, returns Notes whose tauthor field is a transitive member of the Group represented by the given Group ID. :type transitive_members: bool, optional :param signatures: Group IDs. If provided, returns Notes whose signatures field contains the given Group IDs. :type signatures: list[str], optional :param writer: A Group ID. If provided, returns Notes whose writers field contains the given Group ID. :type writer: str, optional :param trash: If True, includes Notes that have been deleted (i.e. the ddate field is less than the current date) :type trash: bool, optional :param number: If present, includes Notes whose number field equals the given integer. :type number: int, optional :param content: If present, includes Notes whose each key is present in the content field and it is equals the given value. :type content: dict, optional :param limit: Maximum amount of Notes that this method will return. The limit parameter can range between 0 and 1000 inclusive. If a bigger number is provided, only 1000 Notes will be returned :type limit: int, optional :param offset: Indicates the position to start retrieving Notes. For example, if there are 10 Notes and you want to obtain the last 3, then the offset would need to be 7. :type offset: int, optional :param after: Note id to start getting the list of notes from. :type after: str, optional :param mintcdate: Represents an Epoch time timestamp, in milliseconds. If provided, returns Notes whose "true creation date" (tcdate) is at least equal to the value of mintcdate. :type mintcdate: int, optional :param details: TODO: What is a valid value for this field? :type details: optional :param sort: Sorts the output by field depending on the string passed. Possible values: number, cdate, ddate, tcdate, tmdate, replyCount (Invitation id needed in the invitation field). :type sort: str, optional :return: List of Notes :rtype: list[Note] """ params = {} if id is not None: params['id'] = id if paperhash is not None: params['paperhash'] = paperhash if forum is not None: params['forum'] = forum if invitation is not None: params['invitation'] = invitation if replyto is not None: params['replyto'] = replyto if tauthor is not None: params['tauthor'] = tauthor if signature is not None: params['signature'] = signature if transitive_members is not None: params['transitiveMembers'] = transitive_members if signatures is not None: params['signatures'] = signatures if writer is not None: params['writer'] = writer if trash == True: params['trash']=True if number is not None: params['number'] = number if content is not None: for k in content: params['content.' + k] = content[k] if limit is not None: params['limit'] = limit if offset is not None: params['offset'] = offset if mintcdate is not None: params['mintcdate'] = mintcdate if details is not None: params['details'] = details if after is not None: params['after'] = after if sort is not None: params['sort'] = sort response = self.session.get(self.notes_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) notes = [Note.from_json(n) for n in response.json()['notes']] if with_count and params.get('offset') is None: return notes, response.json()['count'] return notes
[docs] def get_all_notes(self, id = None, paperhash = None, forum = None, invitation = None, replyto = None, signature = None, transitive_members = None, signatures = None, writer = None, trash = None, number = None, content = None, mintcdate = None, details = None, select = None, sort = None, with_count=False ): """ Gets list of Note objects based on the filters provided. The Notes that will be returned match all the criteria passed in the parameters. :param id: a Note ID. If provided, returns Notes whose ID matches the given ID. :type id: str, optional :param paperhash: A "paperhash" for a note. If provided, returns Notes whose paperhash matches this argument. (A paperhash is a human-interpretable string built from the Note's title and list of authors to uniquely identify the Note) :type paperhash: str, optional :param forum: A Note ID. If provided, returns Notes whose forum matches the given ID. :type forum: str, optional :param invitation: An Invitation ID. If provided, returns Notes whose "invitation" field is this Invitation ID. :type invitation: str, optional :param replyto: A Note ID. If provided, returns Notes whose replyto field matches the given ID. :type replyto: str, optional :param signature: A Group ID. If provided, returns Notes whose signatures field contains the given Group ID. :type signature: str, optional :param transitive_members: If true, returns Notes whose tauthor field is a transitive member of the Group represented by the given Group ID. :type transitive_members: bool, optional :param signatures: Group IDs. If provided, returns Notes whose signatures field contains the given Group IDs. :type signatures: list[str], optional :param writer: A Group ID. If provided, returns Notes whose writers field contains the given Group ID. :type writer: str, optional :param trash: If True, includes Notes that have been deleted (i.e. the ddate field is less than the current date) :type trash: bool, optional :param number: If present, includes Notes whose number field equals the given integer. :type number: int, optional :param content: If present, includes Notes whose each key is present in the content field and it is equals the given value. :type content: dict, optional :param after: Note id to start getting the list of notes from. :type after: str, optional :param mintcdate: Represents an Epoch time timestamp, in milliseconds. If provided, returns Notes whose "true creation date" (tcdate) is at least equal to the value of mintcdate. :type mintcdate: int, optional :param details: TODO: What is a valid value for this field? :type details: optional :param sort: Sorts the output by field depending on the string passed. Possible values: number, cdate, ddate, tcdate, tmdate, replyCount (Invitation id needed in the invitation field). :type sort: str, optional :return: List of Notes :rtype: list[Note] """ params = {} if id is not None: params['id'] = id if paperhash is not None: params['paperhash'] = paperhash if forum is not None: params['forum'] = forum if invitation is not None: params['invitation'] = invitation if replyto is not None: params['replyto'] = replyto if signature is not None: params['signature'] = signature if signatures is not None: params['signatures'] = signatures if transitive_members is not None: params['transitive_members'] = transitive_members if writer is not None: params['writer'] = writer if trash == True: params['trash']=True if number is not None: params['number'] = number if content is not None: params['content'] = content if mintcdate is not None: params['mintcdate'] = mintcdate if details is not None: params['details'] = details if select: params['select'] = select if sort is not None: params['sort'] = sort if with_count: params['with_count'] = with_count return list(tools.efficient_iterget(self.get_notes, desc='Getting V2 Notes', **params))
[docs] def get_note_edit(self, id, trash=None): """ Get a single edit by id if available :param id: id of the edit :type id: str :return: edit matching the passed id :rtype: Note """ response = self.session.get(self.note_edits_url, params = {'id':id, 'trash': 'true' if trash == True else 'false'}, headers = self.headers) response = self.__handle_response(response) n = response.json()['edits'][0] return Edit.from_json(n)
[docs] def get_note_edits(self, note_id = None, invitation = None, with_count=False, sort=None, trash=None): """ Gets a list of edits for a note. The edits that will be returned match all the criteria passed in the parameters. :return: List of edits :rtype: list[Edit] """ params = {} if note_id: params['note.id'] = note_id if invitation: params['invitation'] = invitation if sort: params['sort'] = sort params['trash'] = 'true' if trash == True else 'false' response = self.session.get(self.note_edits_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) edits = [Edit.from_json(n) for n in response.json()['edits']] if with_count and params.get('offset') is None: return edits, response.json()['count'] return edits
[docs] def get_group_edit(self, id): """ Get a single edit by id if available :param id: id of the edit :type id: str :return: edit matching the passed id :rtype: Group """ response = self.session.get(self.group_edits_url, params = {'id':id}, headers = self.headers) response = self.__handle_response(response) n = response.json()['edits'][0] return Edit.from_json(n)
[docs] def get_tags(self, id = None, invitation = None, forum = None, signature = None, tag = None, limit = None, offset = None, with_count=False): """ Gets a list of Tag objects based on the filters provided. The Tags that will be returned match all the criteria passed in the parameters. :param id: A Tag ID. If provided, returns Tags whose ID matches the given ID. :type id: str, optional :param forum: A Note ID. If provided, returns Tags whose forum matches the given ID. :type forum: str, optional :param invitation: An Invitation ID. If provided, returns Tags whose "invitation" field is this Invitation ID. :type invitation: str, optional :return: List of tags :rtype: list[Tag] """ params = {} if id is not None: params['id'] = id if forum is not None: params['forum'] = forum if invitation is not None: params['invitation'] = invitation if signature is not None: params['signature'] = signature if tag is not None: params['tag'] = tag if limit is not None: params['limit'] = limit if offset is not None: params['offset'] = offset response = self.session.get(self.tags_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) tags = [Tag.from_json(t) for t in response.json()['tags']] if with_count and params.get('offset') is None: return tags, response.json()['count'] return tags
[docs] def get_all_tags(self, id = None, invitation = None, forum = None, signature = None, tag = None, limit = None, offset = None, with_count=False): """ Gets a list of Tag objects based on the filters provided. The Tags that will be returned match all the criteria passed in the parameters. :param id: A Tag ID. If provided, returns Tags whose ID matches the given ID. :type id: str, optional :param forum: A Note ID. If provided, returns Tags whose forum matches the given ID. :type forum: str, optional :param invitation: An Invitation ID. If provided, returns Tags whose "invitation" field is this Invitation ID. :type invitation: str, optional :return: List of tags :rtype: list[Tag] """ params = { 'id': id, 'invitation': invitation, 'forum': forum, 'signature': signature, 'tag': tag, 'limit': limit, 'offset': offset, 'with_count': with_count } return tools.concurrent_get(self, self.get_tags, **params)
[docs] def get_edges(self, id = None, invitation = None, head = None, tail = None, label = None, limit = None, offset = None, with_count=False, trash=None): """ Returns a list of Edge objects based on the filters provided. :arg id: a Edge ID. If provided, returns Edge whose ID matches the given ID. :arg invitation: an Invitation ID. If provided, returns Edges whose "invitation" field is this Invitation ID. :arg head: Profile ID of the Profile that is connected to the Note ID in tail :arg tail: Note ID of the Note that is connected to the Profile ID in head :arg label: Label ID of the match """ params = {} params['id'] = id params['invitation'] = invitation params['head'] = head params['tail'] = tail params['label'] = label params['limit'] = limit params['offset'] = offset params['trash'] = trash response = self.session.get(self.edges_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) edges = [Edge.from_json(e) for e in response.json()['edges']] if with_count and params.get('offset') is None: return edges, response.json()['count'] return edges
[docs] def get_all_edges(self, id = None, invitation = None, head = None, tail = None, label = None, limit = None, offset = None, with_count=False, trash=None): """ Returns a list of Edge objects based on the filters provided. :arg id: a Edge ID. If provided, returns Edge whose ID matches the given ID. :arg invitation: an Invitation ID. If provided, returns Edges whose "invitation" field is this Invitation ID. :arg head: Profile ID of the Profile that is connected to the Note ID in tail :arg tail: Note ID of the Note that is connected to the Profile ID in head :arg label: Label ID of the match """ params = { 'id': id, 'invitation': invitation, 'head': head, 'tail': tail, 'label': label, 'limit': limit, 'offset': offset, 'with_count': with_count, 'trash': trash } return tools.concurrent_get(self, self.get_edges, **params)
[docs] def get_edges_count(self, id = None, invitation = None, head = None, tail = None, label = None): """ Returns a list of Edge objects based on the filters provided. :arg id: a Edge ID. If provided, returns Edge whose ID matches the given ID. :arg invitation: an Invitation ID. If provided, returns Edges whose "invitation" field is this Invitation ID. :arg head: Profile ID of the Profile that is connected to the Note ID in tail :arg tail: Note ID of the Note that is connected to the Profile ID in head :arg label: Label ID of the match """ params = {} params['id'] = id params['invitation'] = invitation params['head'] = head params['tail'] = tail params['label'] = label response = self.session.get(self.edges_count_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) return response.json()['count']
[docs] def get_grouped_edges(self, invitation=None, head=None, tail=None, label=None, groupby='head', select=None, limit=None, offset=None, trash=None): ''' Returns a list of JSON objects where each one represents a group of edges. For example calling this method with default arguments will give back a list of groups where each group is of the form: {id: {head: paper-1} values: [ {tail: user-1}, {tail: user-2} ]} Note: The limit applies to the number of groups returned. It does not apply to the number of edges within the groups. :param invitation: :param groupby: :param select: :param limit: :param offset: :return: ''' params = {} params['id'] = None params['invitation'] = invitation params['head'] = head params['tail'] = tail params['label'] = label params['groupBy'] = groupby params['select'] = select params['limit'] = limit params['offset'] = offset params['trash'] = trash response = self.session.get(self.edges_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) json = response.json() return json['groupedEdges'] # a list of JSON objects holding information about an edge
[docs] def get_archived_edges(self, invitation): """ Returns a list of Edge objects based on the filters provided. :arg invitation: an Invitation ID. If provided, returns Edges whose "invitation" field is this Invitation ID. """ params = {'invitation': invitation} print('tools.format_params(params)', tools.format_params(params)) response = self.session.get(self.edges_archive, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) edges = [Edge.from_json(e) for e in response.json()['edges']] return edges
[docs] def post_edge(self, edge): """ Posts the edge. Upon success, returns the posted Edge object. """ response = self.session.post(self.edges_url, json = edge.to_json(), headers = self.headers) response = self.__handle_response(response) return Edge.from_json(response.json())
[docs] def post_edges (self, edges): ''' Posts the list of Edges. Returns a list Edge objects updated with their ids. ''' send_json = [edge.to_json() for edge in edges] response = self.session.post(self.bulk_edges_url, json = send_json, headers = self.headers) response = self.__handle_response(response) received_json_array = response.json() edge_objects = [Edge.from_json(edge) for edge in received_json_array] return edge_objects
[docs] def rename_edges(self, current_id, new_id): """ Updates an Edge :param profile: Edge object :type edge: Edge :return: The new updated Edge :rtype: Edge """ response = self.session.post( self.edges_rename, json = { 'currentId': current_id, 'newId': new_id }, headers = self.headers) response = self.__handle_response(response) #print('RESPONSE: ', response.json()) received_json_array = response.json() edge_objects = [Edge.from_json(edge) for edge in received_json_array] return edge_objects
[docs] def post_venue(self, venue): """ Posts the venue. Upon success, returns the posted Venue object. """ response = self.session.post(self.venues_url, json=venue, headers=self.headers) response = self.__handle_response(response) return response.json()
[docs] def delete_edges(self, invitation, id=None, label=None, head=None, tail=None, wait_to_finish=False, soft_delete=False): """ Deletes edges by a combination of invitation id and one or more of the optional filters. :param invitation: an invitation ID :type invitation: str :param label: a matching label ID :type label: str, optional :param head: id of the edge head (head type defined by the edge invitation) :type head: str, optional :param tail: id of the edge tail (tail type defined by the edge invitation) :type tail: str, optional :param wait_to_finish: True if execution should pause until deletion of edges is finished :type wait_to_finish: bool, optional :return: a {status = 'ok'} in case of a successful deletion and an OpenReview exception otherwise :rtype: dict """ delete_query = {'invitation': invitation} if label: delete_query['label'] = label if head: delete_query['head'] = head if tail: delete_query['tail'] = tail if id: delete_query['id'] = id delete_query['waitToFinish'] = wait_to_finish delete_query['softDelete'] = soft_delete response = self.session.delete(self.edges_url, json = delete_query, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def delete_note(self, note_id): """ Deletes the note :param note_id: ID of Note to be deleted :type note_id: str :return: a {status = 'ok'} in case of a successful deletion and an OpenReview exception otherwise :rtype: dict """ response = self.session.delete(self.notes_url, json = {'id': note_id}, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def delete_profile_reference(self, reference_id): ''' Deletes the Profile Reference specified by `reference_id`. :param reference_id: ID of the Profile Reference to be deleted. :type reference_id: str :return: a {status = 'ok'} in case of a successful deletion and an OpenReview exception otherwise :rtype: dict ''' response = self.session.delete(self.profiles_url + '/reference', json = {'id': reference_id}, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def delete_group(self, group_id): """ Deletes the group :param group_id: ID of Group to be deleted :type group_id: str :return: a {status = 'ok'} in case of a successful deletion and an OpenReview exception otherwise :rtype: dict """ response = self.session.delete(self.groups_url, json = {'id': group_id}, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def delete_institution(self, institution_id): """ Deletes the institution :param institution_id: ID of Institution to be deleted :type institution_id: str :return: a {status = 'ok'} in case of a successful deletion and an OpenReview exception otherwise :rtype: dict """ response = self.session.delete(self.institutions_url + '/' + institution_id, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def post_message(self, subject, recipients, message, invitation=None, signature=None, ignoreRecipients=None, sender=None, replyTo=None, parentGroup=None): """ Posts a message to the recipients and consequently sends them emails :param subject: Subject of the e-mail :type subject: str :param recipients: Recipients of the e-mail. Valid inputs would be tilde username or emails registered in OpenReview :type recipients: list[str] :param message: Message in the e-mail :type message: str :param ignoreRecipients: List of groups ids to be ignored from the recipient list :type subject: list[str] :param sender: Specify the from address and name of the email, the dictionary should have two keys: 'name' and 'email' :type sender: dict :param replyTo: e-mail address used when recipients reply to this message :type replyTo: str :param parentGroup: parent group recipients of e-mail belong to :type parentGroup: str :return: Contains the message that was sent to each Group :rtype: dict """ if parentGroup: recipients = self.get_group(parentGroup).transform_to_anon_ids(recipients) json = { 'groups': recipients, 'subject': subject , 'message': message } if invitation: json['invitation'] = invitation if signature: json['signature'] = signature if ignoreRecipients: json['ignoreGroups'] = ignoreRecipients if sender: json['fromName'] = sender.get('fromName') json['fromEmail'] = sender.get('fromEmail') if replyTo: json['replyTo'] = replyTo if parentGroup: json['parentGroup'] = parentGroup response = self.session.post(self.messages_url, json = json, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def post_direct_message(self, subject, recipients, message, sender=None): """ Posts a message to the recipients and consequently sends them emails :param subject: Subject of the e-mail :type subject: str :param recipients: Recipients of the e-mail. Valid inputs would be tilde username or emails registered in OpenReview :type recipients: list[str] :param message: Message in the e-mail :type message: str :return: Contains the message that was sent to each Group :rtype: dict """ response = self.session.post(self.messages_direct_url, json = { 'groups': recipients, 'subject': subject , 'message': message, 'from': sender }, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def add_members_to_group(self, group, members): """ Adds members to a group :param group: Group (or Group's id) to which the members will be added :type group: Group or str :param members: Members that will be added to the group. Members should be in a string, unicode or a list format :type members: str, list, unicode :return: Group with the members added :rtype: Group """ def add_member(group, members): group = self.get_group(group) if type(group) in string_types else group if group.invitations: self.post_group_edit(invitation = f'{group.domain}/-/Edit', signatures = group.signatures, group = Group( id = group.id, members = { 'append': list(set(members)) } ), readers=group.signatures, writers=group.signatures ) return self.get_group(group.id) else: if members: response = self.session.put(self.groups_url + '/members', json = {'id': group.id, 'members': members}, headers = self.headers) response = self.__handle_response(response) return Group.from_json(response.json()) member_type = type(members) if member_type in string_types: return add_member(group, [members]) if member_type == list: return add_member(group, members) raise OpenReviewException("add_members_to_group()- members '"+str(members)+"' ("+str(member_type)+") must be a str, unicode or list, but got " + repr(member_type) + " instead")
[docs] def remove_members_from_group(self, group, members): """ Removes members from a group :param group: Group (or Group's id) from which the members will be removed :type group: Group or str :param members: Members that will be removed. Members should be in a string, unicode or a list format :type members: str, list, unicode :return: Group without the members that were removed :type: Group """ def remove_member(group, members): members_to_remove = list(set(members)) group = self.get_group(group if type(group) in string_types else group.id) if group.invitations: members_to_remove = group.transform_to_anon_ids(members_to_remove) self.post_group_edit(invitation = f'{group.domain}/-/Edit', signatures = group.signatures, group = Group( id = group.id, members = { 'remove': members_to_remove } ), readers=group.signatures, writers=group.signatures ) return self.get_group(group.id) else: response = self.session.delete(self.groups_url + '/members', json = {'id': group.id, 'members': members}, headers = self.headers) response = self.__handle_response(response) return Group.from_json(response.json()) member_type = type(members) if member_type in string_types: return remove_member(group, [members]) if member_type == list: return remove_member(group, members)
[docs] def search_notes(self, term, content = 'all', group = 'all', source='all', limit = None, offset = None): """ Searches notes based on term, content, group and source as the criteria. Unlike :meth:`~openreview.Client.get_notes`, this method uses Elasticsearch to retrieve the Notes :param term: Term used to look for the Notes :type term: str :param content: Specifies whether to look in all the content, authors, or keywords. Valid inputs: 'all', 'authors', 'keywords' :type content: str, optional :param group: Specifies under which Group to look. E.g. 'all', 'ICLR', 'UAI', etc. :type group: str, optional :param source: Whether to look in papers, replies or all :type source: str, optional :param limit: Maximum amount of Notes that this method will return. The limit parameter can range between 0 and 1000 inclusive. If a bigger number is provided, only 1000 Notes will be returned :type limit: int, optional :param offset: Indicates the position to start retrieving Notes. For example, if there are 10 Notes and you want to obtain the last 3, then the offset would need to be 7. :type offset: int, optional :return: List of notes :rtype: list[Note] """ params = { 'term': term, 'content': content, 'group': group, 'source': source } if limit is not None: params['limit'] = limit if offset is not None: params['offset'] = offset response = self.session.get(self.notes_url + '/search', params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) return [Note.from_json(n) for n in response.json()['notes']]
def get_notes_by_ids(self, ids): response = self.session.post(self.notes_url + '/search', json = { 'ids': ids }, headers = self.headers) response = self.__handle_response(response) return [Note.from_json(n) for n in response.json()['notes']]
[docs] def get_tildeusername(self, first, last, middle = None): """ Gets next possible tilde user name corresponding to the specified first, middle and last name :param first: First name of the user :type first: str :param last: Last name of the user :type last: str :param middle: Middle name of the user :type middle: str, optional :return: next possible tilde user name corresponding to the specified first, middle and last name :rtype: dict """ response = self.session.get(self.tilde_url, params = { 'first': first, 'last': last, 'middle': middle }, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def get_messages(self, to = None, subject = None, status = None, offset = None, limit = None): """ **Only for Super User**. Retrieves all the messages sent to a list of usernames or emails and/or a particular e-mail subject :param to: Tilde user names or emails :type to: list[str], optional :param subject: Subject of the e-mail :type subject: str, optional :param status: Commad separated list of status values corresponding to the message: delivered, bounce, droppped, etc :type status: str, optional :return: Messages that match the passed parameters :rtype: dict """ response = self.session.get(self.messages_url, params = { 'to': to, 'subject': subject, 'status': status, 'offset': offset, 'limit': limit }, headers = self.headers) response = self.__handle_response(response) return response.json()['messages']
[docs] def get_process_logs(self, id = None, invitation = None, status = None, min_sdate = None): """ **Only for Super User**. Retrieves the logs of the process function executed by an Invitation :param id: Note id :type id: str, optional :param invitation: Invitation id that executed the process function that produced the logs :type invitation: str, optional :return: Logs of the process :rtype: dict """ response = self.session.get(self.process_logs_url, params = { 'id': id, 'invitation': invitation, 'status': status, 'minsdate': min_sdate }, headers = self.headers) response = self.__handle_response(response) return response.json()['logs']
[docs] def post_institution(self, institution): """ Requires Super User permission. Adds an institution if the institution id is not found in the database, otherwise, the institution is updated. :param institution: institution to be posted :type institution: dict :return: The posted institution :rtype: dict """ response = self.session.post(self.institutions_url, json = institution, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def post_invitation_edit(self, invitations, readers=None, writers=None, signatures=None, invitation=None, content=None, replacement=None): """ """ edit_json = {} if invitations is not None: edit_json['invitations'] = invitations if readers is not None: edit_json['readers'] = readers if writers is not None: edit_json['writers'] = writers if signatures is not None: edit_json['signatures'] = signatures if content is not None: edit_json['content'] = content if replacement is not None: edit_json['replacement'] = replacement if invitation is not None: edit_json['invitation'] = invitation.to_json() response = self.session.post(self.invitation_edits_url, json = edit_json, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def post_note_edit(self, invitation, signatures, note=None, readers=None, writers=None, nonreaders=None, content=None): """ """ edit_json = { 'invitation': invitation, 'signatures': signatures, 'note': note.to_json() if note else {} } if readers is not None: edit_json['readers'] = readers if writers is not None: edit_json['writers'] = writers if nonreaders is not None: edit_json['nonreaders'] = nonreaders if content is not None: edit_json['content'] = content response = self.session.post(self.note_edits_url, json = edit_json, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def post_group_edit(self, invitation, signatures=None, group=None, readers=None, writers=None, content=None, replacement=None): """ """ edit_json = { 'invitation': invitation } if group is not None: edit_json['group'] = group.to_json() if signatures is not None: edit_json['signatures'] = signatures if readers is not None: edit_json['readers'] = readers if writers is not None: edit_json['writers'] = writers if content is not None: edit_json['content'] = content if replacement is not None: edit_json['replacement'] = replacement response = self.session.post(self.group_edits_url, json = edit_json, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def post_edit(self, edit): """ """ edit_json = edit.to_json() if 'note' in edit_json: response = self.session.post(self.note_edits_url, json = edit_json, headers = self.headers) elif 'group' in edit_json: response = self.session.post(self.group_edits_url, json = edit_json, headers = self.headers) elif 'invitation' in edit_json: response = self.session.post(self.invitation_edits_url, json = edit_json, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def get_jobs_status(self): """ **Only for Super User**. Retrieves the jobs status of the queue :return: Jobs status :rtype: dict """ response = self.session.get(self.jobs_status, headers=self.headers) response = self.__handle_response(response) return response.json()
def request_expertise(self, name, group_id, venue_id, submission_content=None, alternate_match_group = None, expertise_selection_id=None, model=None, baseurl=None): # Build entityA from group_id entityA = { 'type': 'Group', 'memberOf': group_id } if expertise_selection_id and tools.get_invitation(self, expertise_selection_id): expertise = { 'invitation': expertise_selection_id } entityA['expertise'] = expertise # Build entityB from alternate_match_group or paper_invitation if alternate_match_group: entityB = { 'type': 'Group', 'memberOf': alternate_match_group } if expertise_selection_id: expertise = { 'invitation': expertise_selection_id } entityB['expertise'] = expertise else: entityB = { 'type': 'Note', 'withVenueid': venue_id, 'withContent': submission_content } expertise_request = { 'name': name, 'entityA': entityA, 'entityB': entityB, 'model': { 'name': model } } base_url = baseurl if baseurl else self.baseurl response = self.session.post(base_url + '/expertise', json = expertise_request, headers = self.headers) response = self.__handle_response(response) return response.json() def request_single_paper_expertise(self, name, group_id, paper_id, expertise_selection_id=None, model=None, baseurl=None): # Build entityA from group_id entityA = { 'type': 'Group', 'memberOf': group_id } if expertise_selection_id and tools.get_invitation(self, expertise_selection_id): expertise = { 'invitation': expertise_selection_id } entityA['expertise'] = expertise # Build entityB from paper_id entityB = { 'type': 'Note', 'id': paper_id } expertise_request = { 'name': name, 'entityA': entityA, 'entityB': entityB, 'model': { 'name': model } } base_url = baseurl if baseurl else self.baseurl if base_url.startswith('http://localhost'): return {} print('compute expertise', {'name': name, 'match_group': group_id , 'paper_id': paper_id, 'model': model}) response = self.session.post(base_url + '/expertise', json = expertise_request, headers = self.headers) print('response json', response.json()) response = self.__handle_response(response) return response.json() def get_expertise_status(self, job_id=None, group_id=None, paper_id=None, baseurl=None): print('get expertise status', baseurl, job_id, group_id, paper_id) base_url = baseurl if baseurl else self.baseurl if base_url.startswith('http://localhost'): print('get expertise status localhost, return Completed') if job_id: return { 'status': 'Completed', 'jobId': job_id } return { 'results': [{ 'status': 'Completed', 'jobId': None }]} params = {} if job_id: params['jobId'] = job_id if group_id: params['entityA.memberOf'] = group_id if paper_id: params['entityB.id'] = paper_id response = self.session.get(base_url + '/expertise/status', params = params, headers = self.headers) response = self.__handle_response(response) response_json = response.json() print('get expertise status', response_json) return response_json def get_expertise_jobs(self, status=None, baseurl=None): print('get expertise jobs', baseurl, status) base_url = baseurl if baseurl else self.baseurl if base_url.startswith('http://localhost'): print('get expertise jobs localhost, return []') return { 'results': [] } params = {} if status: params['status'] = status response = self.session.get(base_url + '/expertise/status/all', params = params, headers = self.headers) response = self.__handle_response(response) response_json = response.json() print('get expertise jobs', response_json) return response_json def get_expertise_results(self, job_id, baseurl=None, wait_for_complete=False): print('get expertise results', baseurl, job_id) base_url = baseurl if baseurl else self.baseurl call_max = 500 if base_url.startswith('http://localhost'): print('return expertise results localhost, return []') return { 'results': [] } if wait_for_complete: call_count = 0 status_response = self.get_expertise_status(job_id, baseurl=base_url) status = status_response.get('status') while status not in ['Completed', 'Error'] and call_count < call_max: time.sleep(60) status_response = self.get_expertise_status(job_id) status = status_response.get('status') call_count += 1 if 'Completed' == status: return self.get_expertise_results(job_id, baseurl=base_url) if 'Error' == status: raise OpenReviewException('There was an error computing scores, description: ' + status_response.get('description')) if call_count == call_max: raise OpenReviewException('Time out computing scores, description: ' + status_response.get('description')) raise OpenReviewException('Unknown error, description: ' + status_response.get('description')) else: response = self.session.get(base_url + '/expertise/results', params = {'jobId': job_id}, headers = self.headers) response = self.__handle_response(response) print('return expertise results', baseurl, job_id) return response.json()
[docs] class Edit(object): """ :param id: Edit id :type id: str :param readers: List of readers in the Edit, each reader is a Group id :type readers: list[str], optional :param writers: List of writers in the Edit, each writer is a Group id :type writers: list[str], optional :param signatures: List of signatures in the Edit, each signature is a Group id :type signatures: list[str], optional :param note: Template of the Note that will be created :type note: dict, optional :param invitation: Template of the Invitation that will be created :type invitation: dict, optional :param nonreaders: List of nonreaders in the Edit, each nonreader is a Group id :type nonreaders: list[str], optional :param cdate: Creation date :type cdate: int, optional :param ddate: Deletion date :type ddate: int, optional :param tcdate: True creation date :type tcdate: int, optional :param tmdate: Modification date :type tmdate: int, optional """ def __init__(self, id = None, domain = None, invitations = None, readers = None, writers = None, signatures = None, note = None, group = None, invitation = None, nonreaders = None, cdate = None, ddate = None, tauthor = None): self.id = id self.domain = domain self.invitations = invitations self.cdate = cdate self.ddate = ddate self.readers = readers self.nonreaders = nonreaders self.writers = writers self.signatures = signatures self.note = note self.group = group self.invitation = invitation self.tauthor = tauthor def __repr__(self): content = ','.join([("%s = %r" % (attr, value)) for attr, value in vars(self).items()]) return 'Edit(' + content + ')' def __str__(self): pp = pprint.PrettyPrinter() return pp.pformat(vars(self))
[docs] def to_json(self): """ Converts Edit instance to a dictionary. The instance variable names are the keys and their values the values of the dictinary. :return: Dictionary containing all the parameters of a Edit instance :rtype: dict """ body = {} if (self.id): body['id'] = self.id if self.invitations: body['invitations'] = self.invitations if (self.readers): body['readers'] = self.readers if (self.nonreaders): body['nonreaders'] = self.nonreaders if (self.writers): body['writers'] = self.writers if (self.signatures): body['signatures'] = self.signatures if (self.note): body['note'] = self.note.to_json() if (self.group): body['group'] = self.group.to_json() if isinstance(self.invitation, Invitation): body['invitation'] = self.invitation.to_json() if isinstance(self.invitation, str): body['invitation'] = self.invitation if (self.ddate): body['ddate'] = self.ddate return body
[docs] @classmethod def from_json(Edit,e): """ Creates an Edit object from a dictionary that contains keys values equivalent to the name of the instance variables of the Edit class :param i: Dictionary containing key-value pairs, where the keys values are equivalent to the name of the instance variables in the Edit class :type i: dict :return: Edit whose instance variables contain the values from the dictionary :rtype: Edit """ edit = Edit(e.get('id'), domain = e.get('domain'), invitations = e.get('invitations'), cdate = e.get('cdate'), ddate = e.get('ddate'), readers = e.get('readers'), nonreaders = e.get('nonreaders'), writers = e.get('writers'), signatures = e.get('signatures'), note = Note.from_json(e['note']) if 'note' in e else None, group = Group.from_json(e['group']) if 'group' in e else None, invitation = e.get('invitation'), tauthor = e.get('tauthor') ) if isinstance(edit.invitation, dict): edit.invitation = Invitation.from_json(edit.invitation) return edit
[docs] class Note(object): """ TODO: write docs """ def __init__(self, invitations=None, readers=None, writers=None, signatures=None, content=None, id=None, number=None, cdate=None, pdate=None, odate=None, mdate=None, tcdate=None, tmdate=None, ddate=None, forum=None, replyto=None, nonreaders=None, domain=None, details = None, license=None): self.id = id self.number = number self.cdate = cdate self.pdate = pdate self.odate = odate self.mdate = mdate self.tcdate = tcdate self.tmdate = tmdate self.ddate = ddate self.content = content self.forum = forum self.replyto = replyto self.readers = readers self.nonreaders = nonreaders self.signatures = signatures self.writers = writers self.number = number self.details = details self.invitations = invitations self.domain = domain self.license = license def __repr__(self): content = ','.join([("%s = %r" % (attr, value)) for attr, value in vars(self).items()]) return 'Note(' + content + ')' def __str__(self): pp = pprint.PrettyPrinter() return pp.pformat(vars(self))
[docs] def to_json(self): """ Converts Note instance to a dictionary. The instance variable names are the keys and their values the values of the dictinary. :return: Dictionary containing all the parameters of a Note instance :rtype: dict """ body = { } if self.id: body['id'] = self.id if self.forum: body['forum'] = self.forum if self.replyto: body['replyto'] = self.replyto if self.content: body['content'] = self.content if self.invitations: body['invitations'] = self.invitations if self.cdate: body['cdate'] = self.cdate if self.pdate: body['pdate'] = self.pdate if self.odate: body['odate'] = self.odate if self.mdate: body['mdate'] = self.mdate if self.ddate: body['ddate'] = self.ddate if self.nonreaders is not None: body['nonreaders'] = self.nonreaders if self.signatures: body['signatures'] = self.signatures if self.writers: body['writers'] = self.writers if self.readers: body['readers'] = self.readers if self.license: body['license'] = self.license return body
[docs] @classmethod def from_json(Note,n): """ Creates a Note object from a dictionary that contains keys values equivalent to the name of the instance variables of the Note class :param n: Dictionary containing key-value pairs, where the keys values are equivalent to the name of the instance variables in the Note class :type n: dict :return: Note whose instance variables contain the values from the dictionary :rtype: Note """ note = Note( id = n.get('id'), number = n.get('number'), cdate = n.get('cdate'), mdate = n.get('mdate'), pdate = n.get('pdate'), odate = n.get('odate'), tcdate = n.get('tcdate'), tmdate =n.get('tmdate'), ddate=n.get('ddate'), content=n.get('content'), forum=n.get('forum'), invitations=n.get('invitations'), replyto=n.get('replyto'), readers=n.get('readers'), nonreaders=n.get('nonreaders'), signatures=n.get('signatures'), writers=n.get('writers'), details=n.get('details'), domain=n.get('domain'), license=n.get('license') ) return note
[docs] class Invitation(object): """ """ def __init__(self, id = None, invitations = None, domain = None, readers = None, writers = None, invitees = None, signatures = None, edit = None, edge = None, message = None, type = 'Note', noninvitees = None, nonreaders = None, web = None, process = None, preprocess = None, date_processes = None, duedate = None, expdate = None, cdate = None, ddate = None, tcdate = None, tmdate = None, minReplies = None, maxReplies = None, bulk = None, content = None, reply_forum_views = [], responseArchiveDate = None, details = None): self.id = id self.invitations = invitations self.domain = domain self.cdate = cdate self.ddate = ddate self.duedate = duedate self.expdate = expdate self.readers = readers self.nonreaders = nonreaders self.writers = writers self.invitees = invitees self.noninvitees = noninvitees self.signatures = signatures self.minReplies = minReplies self.maxReplies = maxReplies self.edit = edit self.edge = edge self.message = message self.type = type self.tcdate = tcdate self.tmdate = tmdate self.bulk = bulk self.details = details self.reply_forum_views = reply_forum_views self.responseArchiveDate = responseArchiveDate self.web = web self.process = process self.preprocess = preprocess self.date_processes = date_processes self.content = content def __repr__(self): content = ','.join([("%s = %r" % (attr, value)) for attr, value in vars(self).items()]) return 'Invitation(' + content + ')' def __str__(self): pp = pprint.PrettyPrinter() return pp.pformat(vars(self)) def is_active(self): now = tools.datetime_millis(datetime.datetime.utcnow()) cdate = self.cdate if self.cdate else now edate = self.expdate if self.expdate else now return cdate <= now and now <= edate def get_content_value(self, field_name, default_value=None): if self.content: return self.content.get(field_name, {}).get('value') return default_value def pretty_id(self): tokens = self.id.split('/')[-2:] filtered_tokens = [] for token in tokens: token = token.replace('_', ' ').strip() if token.startswith('~'): token = tools.pretty_id(token) if token != '-': filtered_tokens.append(token) return (' ').join(filtered_tokens)
[docs] def to_json(self): """ Converts Invitation instance to a dictionary. The instance variable names are the keys and their values the values of the dictinary. :return: Dictionary containing all the parameters of a Invitation instance :rtype: dict """ body = {} if self.id: body['id'] = self.id if self.cdate: body['cdate'] = self.cdate if self.ddate: body['ddate'] = self.ddate if self.duedate: body['duedate'] = self.duedate if self.expdate: body['expdate'] = self.expdate if self.readers: body['readers'] = self.readers if self.nonreaders: body['nonreaders'] = self.nonreaders if self.writers: body['writers'] = self.writers if self.invitees: body['invitees'] = self.invitees if self.noninvitees: body['noninvitees'] = self.noninvitees if self.signatures: body['signatures'] = self.signatures if self.reply_forum_views: body['replyForumViews'] = self.reply_forum_views if self.content: body['content'] = self.content if self.responseArchiveDate: body['responseArchiveDate'] = self.responseArchiveDate if self.minReplies: body['minReplies']=self.minReplies if self.maxReplies: body['maxReplies']=self.maxReplies if self.web: body['web']=self.web if self.process: body['process']=self.process if self.preprocess: body['preprocess']=self.preprocess if self.date_processes: body['dateprocesses']=self.date_processes if self.edit is not None: if self.type == 'Note': body['edit']=self.edit if self.type == 'Edge': body['edge']=self.edit if self.edge: body['edge']=self.edge if self.message: body['message']=self.message if self.bulk is not None: body['bulk']=self.bulk return body
[docs] @classmethod def from_json(Invitation,i): """ Creates an Invitation object from a dictionary that contains keys values equivalent to the name of the instance variables of the Invitation class :param i: Dictionary containing key-value pairs, where the keys values are equivalent to the name of the instance variables in the Invitation class :type i: dict :return: Invitation whose instance variables contain the values from the dictionary :rtype: Invitation """ invitation = Invitation(i['id'], invitations = i.get('invitations'), domain = i.get('domain'), cdate = i.get('cdate'), ddate = i.get('ddate'), tcdate = i.get('tcdate'), tmdate = i.get('tmdate'), duedate = i.get('duedate'), expdate = i.get('expdate'), readers = i.get('readers'), nonreaders = i.get('nonreaders'), writers = i.get('writers'), invitees = i.get('invitees'), noninvitees = i.get('noninvitees'), signatures = i.get('signatures'), minReplies = i.get('minReplies'), edit = i.get('edit'), maxReplies = i.get('maxReplies'), details = i.get('details'), reply_forum_views = i.get('replyForumViews'), responseArchiveDate = i.get('responseArchiveDate'), bulk = i.get('bulk') ) if 'content' in i: invitation.content = i['content'] if 'web' in i: invitation.web = i['web'] if 'process' in i: invitation.process = i['process'] if 'transform' in i: invitation.transform = i['transform'] if 'preprocess' in i: invitation.preprocess = i['preprocess'] if 'dateprocesses' in i: invitation.date_processes = i['dateprocesses'] if 'edge' in i: invitation.edit = i['edge'] invitation.type = 'Edge' if 'message' in i: invitation.message = i['message'] invitation.type = 'Message' return invitation
class Edge(object): def __init__(self, head, tail, invitation, domain=None, readers=None, writers=None, signatures=None, id=None, weight=None, label=None, cdate=None, ddate=None, nonreaders=None, tcdate=None, tmdate=None, tddate=None, tauthor=None): self.id = id self.invitation = invitation self.domain = domain self.head = head self.tail = tail self.weight = weight self.label = label self.cdate = cdate self.ddate = ddate self.readers = readers self.nonreaders = nonreaders self.writers = writers self.signatures = signatures self.tcdate = tcdate self.tmdate = tmdate self.tddate = tddate self.tauthor = tauthor def to_json(self): ''' Returns serialized json string for a given object ''' body = { 'invitation': self.invitation, 'head': self.head, 'tail': self.tail } if self.id: body['id'] = self.id if self.ddate: body['ddate'] = self.ddate if self.readers is not None: body['readers'] = self.readers if self.writers is not None: body['writers'] = self.writers if self.nonreaders is not None: body['nonreaders'] = self.nonreaders if self.signatures is not None: body['signatures'] = self.signatures if self.weight is not None: body['weight'] = self.weight if self.label is not None: body['label'] = self.label if self.cdate is not None: body['cdate'] = self.cdate return body @classmethod def from_json(Edge, e): ''' Returns a deserialized object from a json string :arg t: The json string consisting of a serialized object of type "Edge" ''' edge = Edge( id = e.get('id'), domain = e.get('domain'), cdate = e.get('cdate'), tcdate = e.get('tcdate'), tmdate = e.get('tmdate'), ddate = e.get('ddate'), tddate = e.get('tddate'), invitation = e.get('invitation'), readers = e.get('readers'), nonreaders = e.get('nonreaders'), writers = e.get('writers'), signatures = e.get('signatures'), head = e.get('head'), tail = e.get('tail'), weight = e.get('weight'), label = e.get('label'), tauthor=e.get('tauthor') ) return edge def __repr__(self): content = ','.join([("%s = %r" % (attr, value)) for attr, value in vars(self).items()]) return 'Edge(' + content + ')' def __str__(self): pp = pprint.PrettyPrinter() return pp.pformat(vars(self))
[docs] class Group(object): """ When a user is created, it is automatically assigned to certain groups that give him different privileges. A username is also a group, therefore, groups can be members of other groups. :param id: id of the Group :type id: str :param readers: List of readers in the Group, each reader is a Group id :type readers: list[str] :param writers: List of writers in the Group, each writer is a Group id :type writers: list[str] :param signatories: List of signatories in the Group, each writer is a Group id :type signatories: list[str] :param signatures: List of signatures in the Group, each signature is a Group id :type signatures: list[str] :param cdate: Creation date of the Group :type cdate: int, optional :param ddate: Deletion date of the Group :type ddate: int, optional :param tcdate: true creation date of the Group :type tcdate: int, optional :param tmdate: true modification date of the Group :type tmdate: int, optional :param members: List of members in the Group, each member is a Group id :type members: list[str], optional :param nonreaders: List of nonreaders in the Group, each nonreader is a Group id :type nonreaders: list[str], optional :param web: Path to a file that contains the webfield :type web: optional :param web_string: String containing the webfield for this Group :type web_string: str, optional :param details: :type details: optional """ def __init__(self, id=None, content=None, readers=None, writers=None, signatories=None, signatures=None, invitation=None, invitations=None, cdate = None, ddate = None, tcdate=None, tmdate=None, members = None, nonreaders = None, impersonators=None, web = None, anonids= None, deanonymizers=None, host=None, domain=None, parent = None, details = None): # post attributes self.id=id self.invitation=invitation self.invitations = invitations self.content = content self.cdate = cdate self.ddate = ddate self.tcdate = tcdate self.tmdate = tmdate self.writers = writers self.members = members self.readers = readers self.nonreaders = nonreaders self.signatures = signatures self.signatories = signatories self.anonids = anonids self.web=web self.impersonators = impersonators self.host = host self.domain = domain self.parent = parent self.anonids = anonids self.deanonymizers = deanonymizers self.details = details self.anon_members = [] def get_content_value(self, field_name, default_value=None): if self.content: return self.content.get(field_name, {}).get('value') return default_value def __repr__(self): content = ','.join([("%s = %r" % (attr, value)) for attr, value in vars(self).items()]) return 'Group(' + content + ')' def __str__(self): pp = pprint.PrettyPrinter() return pp.pformat(vars(self))
[docs] def to_json(self): """ Converts Group instance to a dictionary. The instance variable names are the keys and their values the values of the dictinary. :return: Dictionary containing all the parameters of a Group instance :rtype: dict """ body = {} if self.id is not None: body['id'] = self.id if self.content is not None: body['content'] = self.content if self.signatures is not None: body['signatures'] = self.signatures if self.writers is not None: body['writers'] = self.writers if self.members is not None: body['members'] = self.members if self.readers is not None: body['readers'] = self.readers if self.invitation is not None: body['invitation'] = self.invitation if self.cdate is not None: body['cdate'] = self.cdate if self.ddate is not None: body['ddate'] = self.ddate if self.host is not None: body['host'] = self.host if self.impersonators is not None: body['impersonators'] = self.impersonators if self.anonids is not None: body['anonids'] = self.anonids if self.deanonymizers is not None: body['deanonymizers'] = self.deanonymizers if self.web is not None: body['web'] = self.web if self.nonreaders is not None: body['nonreaders'] = self.nonreaders if self.signatories is not None: body['signatories'] = self.signatories return body
[docs] @classmethod def from_json(Group,g): """ Creates a Group object from a dictionary that contains keys values equivalent to the name of the instance variables of the Group class :param g: Dictionary containing key-value pairs, where the keys values are equivalent to the name of the instance variables in the Group class :type g: dict :return: Group whose instance variables contain the values from the dictionary :rtype: Group """ group = Group(g['id'], content=g.get('content'), invitation=g.get('invitation'), invitations=g.get('invitations'), cdate = g.get('cdate'), ddate = g.get('ddate'), tcdate = g.get('tcdate'), tmdate = g.get('tmdate'), writers = g.get('writers'), members = g.get('members'), readers = g.get('readers'), nonreaders = g.get('nonreaders'), signatories = g.get('signatories'), signatures = g.get('signatures'), anonids=g.get('anonids'), deanonymizers=g.get('deanonymizers'), impersonators=g.get('impersonators'), host=g.get('host'), web=g.get('web'), domain=g.get('domain'), parent=g.get('parent'), details = g.get('details')) return group
[docs] def add_member(self, member): """ Adds a member to the group. This is done only on the object not in OpenReview. Another method like :meth:`~openreview.Group.post` is needed for the change to show in OpenReview :param member: Member to add to the group :type member: str :return: Group with the new member added :rtype: Group """ if type(member) is Group: self.members.append(member.id) else: self.members.append(str(member)) return self
[docs] def remove_member(self, member): """ Removes a member from the group. This is done only on the object not in OpenReview. Another method like :meth:`~openreview.Group.post` is needed for the change to show in OpenReview :param member: Member to remove from the group :type member: str :return: Group after the member was removed :rtype: Group """ if type(member) is Group: try: self.members.remove(member.id) except(ValueError): pass else: try: self.members.remove(str(member)) except(ValueError): pass return self
[docs] def add_webfield(self, web): """ Adds a webfield to the group :param web: Path to the file that contains the webfield :type web: str """ with open(web) as f: self.web = f.read()
[docs] def post(self, client): """ Posts a group to OpenReview :param client: Client that will post the Group :type client: Client """ client.post_group(self)
def transform_to_anon_ids(self, elements): if self.anonids: for index, element in enumerate(elements): if element in self.members and self.anon_members: elements[index] = self.anon_members[self.members.index(element)] else: elements[index] = element return elements