Source code for openreview.api.client

#!/usr/bin/python
from __future__ import absolute_import, division, print_function, unicode_literals
import datetime
import sys
if sys.version_info[0] < 3:
    string_types = [str, unicode]
else:
    string_types = [str]

from importlib.metadata import version as get_package_version, PackageNotFoundError

from .. import tools
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import pprint
import os
import re
import time
import jwt
import json
import csv
from ..openreview import Profile
from ..openreview import OpenReviewException
from ..openreview import MfaRequiredException
from .. import tools
from .. import mfa

class LogRetry(Retry):
     
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)   

    def increment(self, method=None, url=None, response=None, error=None, _pool=None, _stacktrace=None):
        # Log retry information before calling the parent class method
        response_string = 'no response'
        if response:
            if 'application/json' in response.headers.get('Content-Type', ''):
                response_string = json.loads(response.data.decode('utf-8'))
            elif response.data:
                response_string = response.data
            else:
                response_string = response.reason
        print(f"Retrying request: {method} {url}, response: {response_string}, error: {error}")

        # Call the parent class method to perform the actual retry increment
        return super().increment(method=method, url=url, response=response, error=error, _pool=_pool, _stacktrace=_stacktrace)
    

[docs] class OpenReviewClient(object): """ :param baseurl: URL to the host, example: https://api.openreview.net (should be replaced by 'host' name). If none is provided, it defaults to the environment variable `OPENREVIEW_API_BASEURL_V2` :type baseurl: str, optional :param username: OpenReview username. If none is provided, it defaults to the environment variable `OPENREVIEW_USERNAME` :type username: str, optional :param password: OpenReview password. If none is provided, it defaults to the environment variable `OPENREVIEW_PASSWORD` :type password: str, optional :param token: Session token. This token can be provided instead of the username and password if the user had already logged in :type token: str, optional :param expiresIn: Time in seconds before the token expires. If none is set the value will be set automatically to one hour. The max value that it can be set to is 1 week. :type expiresIn: number, optional """ def __init__(self, baseurl = None, username = None, password = None, token= None, tokenExpiresIn=None): self.baseurl = baseurl if baseurl is not None else os.environ.get('OPENREVIEW_API_BASEURL_V2', 'http://localhost:3001') if any(url in self.baseurl for url in tools.V1_REMOTE_URLS): correct_baseurl = tools.get_base_urls(self)[1] raise OpenReviewException(f'Please use "{correct_baseurl}" as the baseurl for the OpenReview API or use the old client openreview.Client') self.groups_url = self.baseurl + '/groups' self.login_url = self.baseurl + '/login' self.register_url = self.baseurl + '/register' self.alternate_confirm_url = self.baseurl + '/user/confirm' self.invitations_url = self.baseurl + '/invitations' self.mail_url = self.baseurl + '/mail' self.notes_url = self.baseurl + '/notes' self.tags_url = self.baseurl + '/tags' self.bulk_tags_url = self.baseurl + '/tags/bulk' self.tags_rename = self.baseurl + '/tags/rename' self.edges_url = self.baseurl + '/edges' self.bulk_edges_url = self.baseurl + '/edges/bulk' self.edges_count_url = self.baseurl + '/edges/count' self.edges_rename = self.baseurl + '/edges/rename' self.edges_archive = self.baseurl + '/edges/archive' self.profiles_url = self.baseurl + '/profiles' self.profiles_search_url = self.baseurl + '/profiles/search' self.profiles_merge_url = self.baseurl + '/profiles/merge' self.profiles_rename = self.baseurl + '/profiles/rename' self.relation_readers_url = self.baseurl + '/settings/relationReaders' self.profiles_moderate = self.baseurl + '/profile/moderate' self.reference_url = self.baseurl + '/references' self.tilde_url = self.baseurl + '/tildeusername' self.pdf_url = self.baseurl + '/pdf' self.pdf_revisions_url = self.baseurl + '/references/pdf' self.messages_url = self.baseurl + '/messages' self.messages_requests_url = self.baseurl + '/messages/requests' self.messages_direct_url = self.baseurl + '/messages/direct' self.process_logs_url = self.baseurl + '/logs/process' self.institutions_url = self.baseurl + '/settings/institutions' self.jobs_status = self.baseurl + '/jobs/status' self.venues_url = self.baseurl + '/venues' self.note_edits_url = self.baseurl + '/notes/edits' self.invitation_edits_url = self.baseurl + '/invitations/edits' self.group_edits_url = self.baseurl + '/groups/edits' self.activatelink_url = self.baseurl + '/activatelink' self.domains_rename = self.baseurl + '/domains/rename' self.domains_restriction = self.baseurl + '/domains/restriction' self.groups_members_cache_url = self.baseurl + '/groups/members/cache' self.mfa_challenge_url = self.baseurl + '/mfa/challenge' self.mfa_verify_url = self.baseurl + '/mfa/verify' # Build User-Agent string: openreview-py/{package_version} (Python/{python_version}) try: package_version = get_package_version('openreview-py') except PackageNotFoundError: package_version = 'unknown' python_version = f"{sys.version_info.major}.{sys.version_info.minor}" self.user_agent = f"openreview-py/{package_version} (Python/{python_version})" self.limit = 1000 self.token = token.replace('Bearer ', '') if token else None self.profile = None self.headers = { 'User-Agent': self.user_agent, 'Accept': 'application/json' } retry_strategy = LogRetry(total=3, backoff_factor=1, status_forcelist=[ 500, 502, 503, 504 ], respect_retry_after_header=True) self.session = requests.Session() adapter = HTTPAdapter(max_retries=retry_strategy) self.session.mount('https://', adapter) self.session.mount('http://', adapter) if self.token: self.headers['Authorization'] = 'Bearer ' + self.token try: payload = jwt.decode(self.token, options={"verify_signature": False}) self.user = payload.get('user', payload) user_id = self.user.get('profile', {}).get('id') or self.user.get('id') self.profile = self.get_profile(user_id) if user_id else None except: self.profile = None else: if not username: username = os.environ.get('OPENREVIEW_USERNAME') if not password: password = os.environ.get('OPENREVIEW_PASSWORD') if username or password: self.login_user(username, password, expiresIn=tokenExpiresIn) ## PRIVATE FUNCTIONS def __handle_authorization(self, response): self.token = str(response['token']) self.profile = Profile( id = response['user']['profile']['id'] ) self.headers['Authorization'] ='Bearer ' + self.token # self.user = jwt.decode(self.token, options={"verify_signature": False}) self.user = response['user'] return response def __handle_response(self,response): try: response.raise_for_status() return response except requests.exceptions.HTTPError as e: if 'application/json' in response.headers.get('Content-Type', ''): error = response.json() elif response.text: error = { 'name': 'Error', 'message': response.text } else: error = { 'name': 'Error', 'message': response.reason } raise OpenReviewException(error) def __request_mfa_challenge(self, mfa_pending_token, method): """Trigger MFA challenge (e.g., send email OTP).""" payload = {'mfaPendingToken': mfa_pending_token, 'method': method} response = self.session.post(self.mfa_challenge_url, headers=self.headers, json=payload) response = self.__handle_response(response) return response.json() def __verify_mfa(self, mfa_pending_token, method, code): """Verify MFA code and complete login.""" payload = {'mfaPendingToken': mfa_pending_token, 'method': method, 'code': code} response = self.session.post(self.mfa_verify_url, headers=self.headers, json=payload) response = self.__handle_response(response) return response.json() def __resolve_mfa(self, mfa_pending_token, mfa_methods, preferred_method): """Resolve MFA via interactive prompt.""" supported = [m for m in mfa_methods if m in ('totp', 'emailOtp', 'passkey')] if not supported: raise OpenReviewException({ 'name': 'MfaError', 'message': f'No supported MFA methods. Server offered: {", ".join(mfa_methods)}' }) if not mfa._is_interactive(): raise MfaRequiredException(mfa_pending_token, mfa_methods, preferred_method) method = mfa._default_mfa_method_chooser(mfa_methods, preferred_method) if not method: raise MfaRequiredException(mfa_pending_token, mfa_methods, preferred_method) if method == 'passkey': return self.__resolve_passkey(mfa_pending_token) if method == 'emailOtp': self.__request_mfa_challenge(mfa_pending_token, 'emailOtp') print('A verification code has been sent to your email.') code = mfa._default_mfa_code_prompt(method) if not code: raise MfaRequiredException(mfa_pending_token, mfa_methods, preferred_method) return self.__verify_mfa(mfa_pending_token, method, code) def __resolve_passkey(self, mfa_pending_token): """Handle passkey authentication via browser flow.""" result = mfa._passkey_browser_flow(self, mfa_pending_token) if result and result.get('token'): return result raise OpenReviewException({ 'name': 'MfaError', 'message': 'Passkey authentication failed or timed out.' }) def __await_process(self, edit_id): process_logs = self.get_process_logs(id=edit_id) if not process_logs: return ## no process function found for i in range(1200): # 1200 × 0.5s = 10 minutes if process_logs[0]['status'] == 'ok': return elif process_logs[0]['status'] == 'error': raise OpenReviewException(process_logs[0].get('log', 'No log available')) time.sleep(0.5) process_logs = self.get_process_logs(id=edit_id) raise OpenReviewException("Process timed out") def get_invitation_date_process_job(self, job_id): response = self.session.get(self.baseurl + '/jobs/queues/pyDateProcessQueueMQ/' + job_id.replace('/', '%2F'), params = {}, headers = self.headers) response = self.__handle_response(response) return response.json() def reschedule_date_process_jobs(self, invitation_id): response = self.session.post(self.baseurl + '/invitations/dateprocesses', json = { 'ids': [invitation_id]}, headers = self.headers) response = self.__handle_response(response) return response.json() ## PUBLIC FUNCTIONS
[docs] def impersonate(self, group_id): """Impersonate a group by obtaining a new authentication token scoped to the given group. Replaces the current client session token with a token that authorizes requests as the specified group. The client's profile and authorization headers are updated in place. :param group_id: ID of the group to impersonate (e.g., a venue ID such as ``ICML.cc/2024/Conference``). :type group_id: str :return: Dictionary containing the new authentication token and user information. :rtype: dict """ response = self.session.post(self.baseurl + '/impersonate', json={ 'groupId': group_id }, headers=self.headers) response = self.__handle_response(response) json_response = response.json() self.__handle_authorization(json_response) return json_response
[docs] def login_user(self,username=None, password=None, expiresIn=None): """ Logs in a registered user. If MFA is enabled for the account, this method will attempt to complete MFA verification automatically using the configured an interactive terminal prompt. :param username: OpenReview username :type username: str, optional :param password: OpenReview password :type password: str, optional :return: Dictionary containing user information and the authentication token :rtype: dict """ user = { 'id': username, 'password': password, 'expiresIn': expiresIn } response = self.session.post(self.login_url, headers=self.headers, json=user) response = self.__handle_response(response) json_response = response.json() if json_response.get('mfaPending'): json_response = self.__resolve_mfa( json_response['mfaPendingToken'], json_response['mfaMethods'], json_response.get('preferredMethod') ) self.__handle_authorization(json_response) return json_response
[docs] def register_user(self, email = None, fullname = None, password = None): """ Registers a new user :param email: email that will be used as id to log in after the user is registered :type email: str, optional :param fullname: Full name of the user :type fullname: str, optional :param password: Password used to log into OpenReview :type password: str, optional :return: Dictionary containing the new user information including his id, username, email(s), readers, writers, etc. :rtype: dict """ register_payload = { 'email': email, 'fullname': fullname, 'password': password } response = self.session.post(self.register_url, json = register_payload, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def activate_user(self, token, content): """ Activates a newly registered user :param token: Activation token. If running in localhost, use email as token :type token: str :param content: Content of the profile to activate :type content: dict :return: Dictionary containing user information and the authentication token :rtype: dict Example: >>> res = client.activate_user('new@user.com', { 'names': [ { 'first': 'New', 'last': 'User', 'username': '~New_User1' } ], 'emails': ['new@user.com'], 'preferredEmail': 'new@user.com' }) """ response = self.session.put(self.baseurl + '/activate/' + token, json = { 'content': content }, headers = self.headers) response = self.__handle_response(response) json_response = response.json() self.__handle_authorization(json_response) return json_response
[docs] def confirm_alternate_email(self, profile_id, alternate_email, activation_token=None): """ Confirms an alternate email address :param profile_id: id of the profile :type profile_id: str :param alternate_email: email address to confirm :type alternate_email: str :return: Dictionary containing the profile information :rtype: dict """ response = self.session.post(self.alternate_confirm_url + (f'/{activation_token}' if activation_token else ''), json = { 'username': profile_id, 'alternate': alternate_email }, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def activate_email_with_token(self, email, token, activation_token=None): """ Activates an email address :param email: email address to activate :type email: str :param token: token to activate the email :type token: str :return: Dictionary containing the profile information :rtype: dict """ response = self.session.put(self.activatelink_url + (f'/{activation_token}' if activation_token else ''), json = { 'email': email, 'token': token }, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def post_note_edit_as_guest(self, token, edit): """Post a note edit as a guest user using a guest token. Submits a note edit without requiring a logged-in session. The guest token is sent via the ``X-Guest-Token`` header instead of the standard ``Authorization`` header. :param token: Guest authentication token (e.g., provided via an invitation link). :type token: str :param edit: Dictionary representing the note edit to post, following the same schema as :meth:`post_note_edit`. :type edit: dict :return: Dictionary containing the posted edit, including the assigned edit ``id``. :rtype: dict """ headers = { 'User-Agent': self.user_agent, 'Accept': 'application/json', 'X-Guest-Token': token } response = self.session.post(self.note_edits_url, json = edit, headers = headers) response = self.__handle_response(response) return response.json()
[docs] def flush_members_cache(self, group_id=None): """ Flushes the members cache for a group :param group_id: id of the group to flush the cache for :type group_id: str, optional :return: Dictionary containing the status of the request :rtype: dict """ if not group_id: return if '/' in group_id: group_id = group_id.replace('/', '%2F') response = self.session.delete(self.groups_members_cache_url + '/' + group_id, params= {}, headers=self.headers) response = self.__handle_response(response) return response.json()
def get_activatable(self, token = None): response = self.session.get(self.baseurl + '/activatable/' + token, params = {}, headers = self.headers) response = self.__handle_response(response) self.__handle_authorization(response.json()['activatable']) return self.token
[docs] def get_institutions(self, id=None, domain=None): """ Get a single Institution by id or domain if available :param id: id of the Institution as saved in the database :type id: str :param domain: domain of the Institution :type domain: str :return: Dictionary with the Institution information :rtype: dict Example: >>> institution = client.get_institutions(domain='umass.edu') """ params = {} if id: params['id'] = id if domain: params['domain'] = domain response = self.session.get(self.institutions_url, params = tools.format_params(params), headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def get_group(self, id, details=None): """ Get a single Group by id if available :param id: id of the group :type id: str :return: Dictionary with the group information :rtype: Group Example: >>> group = client.get_group('your-email@domain.com') """ response = self.session.get(self.groups_url, params = {'id':id, 'details': details}, headers = self.headers) response = self.__handle_response(response) g = response.json()['groups'][0] group = Group.from_json(g) if group.anonids: anon_prefix = (group.id[:-1] if group.id.endswith('s') else group.id) + '_' members_by_anonid = { g.id:g.members[0] for g in self.get_groups(prefix=anon_prefix) if g.members } members = [] anon_members = [] for member in group.members: if member in members_by_anonid: anon_members.append(member) members.append(members_by_anonid[member]) else: members.append(member) group.anon_members = anon_members group.members = members return group
[docs] def get_invitation(self, id): """ Get a single invitation by id if available :param id: id of the invitation :type id: str :return: Invitation matching the passed id :rtype: Invitation """ response = self.session.get(self.invitations_url, params = {'id': id}, headers = self.headers) response = self.__handle_response(response) i = response.json()['invitations'][0] return Invitation.from_json(i)
[docs] def get_note(self, id, details=None): """ Get a single Note by id if available :param id: id of the note :type id: str :return: Note matching the passed id :rtype: Note """ response = self.session.get(self.notes_url, params = {'id':id, 'details': details}, headers = self.headers) response = self.__handle_response(response) n = response.json()['notes'][0] return Note.from_json(n)
[docs] def get_tag(self, id): """ Get a single Tag by id if available :param id: id of the Tag :type id: str :return: Tag with the Tag information :rtype: Tag """ response = self.session.get(self.tags_url, params = {'id': id}, headers = self.headers) response = self.__handle_response(response) t = response.json()['tags'][0] return Tag.from_json(t)
[docs] def get_edge(self, id, trash=False): """ Get a single Edge by id if available :param id: id of the Edge :type id: str return: Edge object with its information :rtype: Edge """ response = self.session.get(self.edges_url, params = {'id': id, 'trash': 'true' if trash == True else 'false'}, headers=self.headers) response = self.__handle_response(response) edges = response.json()['edges'] if edges: return Edge.from_json(edges[0]) else: raise OpenReviewException('Edge not found')
[docs] def get_profile(self, email_or_id = None): """ Get a single Profile by id, if available :param email_or_id: e-mail or id of the profile :type email_or_id: str, optional :return: Profile object with its information :rtype: Profile """ params = {} if email_or_id: tildematch = re.compile('~.+') if tildematch.match(email_or_id): att = 'id' else: att = 'email' email_or_id = email_or_id.lower() params[att] = email_or_id response = self.session.get(self.profiles_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) profiles = response.json()['profiles'] if profiles: return Profile.from_json(profiles[0]) else: raise OpenReviewException(['Profile Not Found'])
[docs] def get_profiles(self, id=None, trash=None, with_blocked=None, state=None, offset=None, limit=None, sort=None): """ Get a list of Profiles :param trash: Indicates if the returned profiles are trashed :type trash: bool, optional :param with_blocked: Indicates if the returned profiles are blocked :type with_blocked: bool, optional :param state: Filter profiles by state (e.g. 'Needs Moderation', 'Active', 'Rejected') :type state: str, optional :param offset: Indicates the position to start retrieving Profiles :type offset: int, optional :param limit: Maximum amount of Profiles that this method will return :type limit: int, optional :return: List of Profile objects :rtype: list[Profile] """ params = {} if id is not None: params['id'] = id if trash == True: params['trash'] = True if with_blocked == True: params['withBlocked'] = True if state is not None: params['state'] = state if offset is not None: params['offset'] = offset if limit is not None: params['limit'] = limit if sort is not None: params['sort'] = sort response = self.session.get(self.profiles_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) return [Profile.from_json(p) for p in response.json()['profiles']]
[docs] def search_profiles(self, confirmedEmails = None, emails = None, ids = None, term = None, first = None, middle = None, last = None, fullname=None, relation=None, use_ES = False): """ Gets a list of profiles using either their ids or corresponding emails :param confirmedEmails: List of confirmed emails registered in OpenReview :type confirmedEmails: list, optional :param emails: List of emails registered in OpenReview :type emails: list, optional :param ids: List of OpenReview username ids :type ids: list, optional :param term: Substring in the username or e-mail to be searched :type term: str, optional :param first: First name of user :type first: str, optional :param middle: Middle name of user :type middle: str, optional :param last: Last name of user :type last: str, optional :return: List of profiles, if emails is present then a dictionary of { emails: profiles } is returned. If confirmedEmails is present then a dictionary of { confirmedEmails: profile } is returned :rtype: list[Profile] """ def batches(items, batch_size=1000): batch = [] for item in items: if len(batch) == batch_size: yield batch batch = [] batch.append(item) if batch: yield batch if term: response = self.session.get(self.profiles_search_url, params = { 'term': term, 'es': 'true' if use_ES else 'false' }, headers = self.headers) response = self.__handle_response(response) return [Profile.from_json(p) for p in response.json()['profiles']] if fullname: response = self.session.get(self.profiles_search_url, params = { 'fullname': fullname, 'es': 'true' if use_ES else 'false' }, headers = self.headers) response = self.__handle_response(response) return [Profile.from_json(p) for p in response.json()['profiles']] if emails: emails = [email.lower() for email in emails] full_response = [] for email_batch in batches(emails): response = self.session.post(self.profiles_search_url, json = {'emails': email_batch}, headers = self.headers) response = self.__handle_response(response) full_response.extend(response.json()['profiles']) profiles_by_email = {} for p in full_response: if p['email'] not in profiles_by_email: profiles_by_email[p['email']] = [] profiles_by_email[p['email']].append(Profile.from_json(p)) return profiles_by_email if confirmedEmails: confirmedEmails = [email.lower() for email in confirmedEmails] full_response = [] for email_batch in batches(confirmedEmails): response = self.session.post(self.profiles_search_url, json = {'confirmedEmails': email_batch}, headers = self.headers) response = self.__handle_response(response) full_response.extend(response.json()['profiles']) profiles_by_email = {} for p in full_response: profile_confirmed_emails = p.get('confirmedEmails', p['content'].get('emailsConfirmed', [])) for email in profile_confirmed_emails: profiles_by_email[email] = Profile.from_json(p) return profiles_by_email if ids: full_response = [] for id_batch in batches(ids): response = self.session.post(self.profiles_search_url, json = {'ids': id_batch}, headers = self.headers) response = self.__handle_response(response) full_response.extend(response.json()['profiles']) return [Profile.from_json(p) for p in full_response] if first or middle or last: response = self.session.get(self.profiles_url, params = {'first': first, 'middle': middle, 'last': last, 'es': 'true' if use_ES else 'false'}, headers = self.headers) response = self.__handle_response(response) return [Profile.from_json(p) for p in response.json()['profiles']] if relation: response = self.session.get(self.profiles_url, params = {'relation': relation }, headers = self.headers) response = self.__handle_response(response) return [Profile.from_json(p) for p in response.json()['profiles']] return []
[docs] def get_pdf(self, id, is_reference=False): """ Gets the binary content of a pdf using the provided note/reference id If the pdf is not found then this returns an error message with "status":404. Use the note id when trying to get the latest pdf version and reference id when trying to get a previous version of the pdf :param id: Note id or Reference id of the pdf :type id: str :param is_reference: Indicates that the passed id is a reference id instead of a note id :type is_reference: bool, optional :return: The binary content of a pdf :rtype: bytes Example: >>> f = get_pdf(id='Place Note-ID here') >>> with open('output.pdf','wb') as op: op.write(f) """ params = {} params['id'] = id headers = self.headers.copy() headers['content-type'] = 'application/pdf' url = self.pdf_revisions_url if is_reference else self.pdf_url response = self.session.get(url, params=tools.format_params(params), headers = headers) response = self.__handle_response(response) return response.content
[docs] def get_attachment(self, field_name, id=None, ids=None, group_id=None, invitation_id=None): """ Gets the binary content of a attachment using the provided note id If the pdf is not found then this returns an error message with "status":404. :param field_name: name of the field associated with the attachment file :type field_name: str :param id: Note id or Reference id of the pdf :type id: str :param ids: List of Note ids or Reference ids. The max number of ids is 50 :type id: list[str] :param group_id: Id of group where attachment is stored :type group_id: str :param invitation_id: Id of invitation where attachment is stored :type invitation_id: str :return: The binary content of a pdf :rtype: bytes Example: >>> f = get_attachment(id='Place Note-ID here', field_name='pdf') >>> with open('output.pdf','wb') as op: op.write(f) """ if not any([id, ids, group_id, invitation_id]): raise OpenReviewException('Provide exactly one of the following: id, ids, group_id, invitation_id') params = {} params['name'] = field_name if id: url = self.baseurl params['id'] = id elif ids: url = self.baseurl params['ids'] = ','.join(ids) elif group_id: url = self.groups_url params['id'] = group_id elif invitation_id: url = self.invitations_url params['id'] = invitation_id response = self.session.get(url + '/attachment', params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) return response.content
[docs] def get_venues(self, id=None, ids=None, invitations=None): """Get a list of Venue objects based on the filters provided. Returns Venues matching all the criteria passed in the parameters. :param id: A Venue ID. If provided, returns the Venue whose ID matches the given ID. :type id: str, optional :param ids: A list of Venue IDs. If provided, returns Venues whose IDs are in this list. :type ids: list[str], optional :param invitations: A list of Invitation IDs. If provided, returns Venues whose ``invitation`` field matches one of these IDs. :type invitations: list[str], optional :return: List of Venues. :rtype: list[dict] """ params = {} if id is not None: params['id'] = id if ids is not None: params['ids'] = ','.join(ids) if invitations is not None: params['invitations'] = ','.join(invitations) response = self.session.get(self.venues_url, params=tools.format_params(params), headers=self.headers) response = self.__handle_response(response) return response.json()['venues']
[docs] def rename_venue(self, old_venue_id, new_venue_id, request_form=None, additional_renames=None): """ Updates the domain for an entire venue :param old_domain: Current domain :param new_domain: New domain :return: Status of the request. The process can be tracked in the queue. :rtype: dict """ json = { 'oldDomain': old_venue_id, 'newDomain': new_venue_id, 'requestForm': request_form } if additional_renames: json['additionalRenames'] = additional_renames response = self.session.post( self.domains_rename, json = json, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def put_attachment(self, file_path, invitation, name): """Upload a file attachment to the OpenReview server. :param file_path: Path to the local file to upload. :type file_path: str :param invitation: Invitation ID of the note that requires the attachment. :type invitation: str :param name: Name of the note content field where the attachment URL will be stored (e.g., ``pdf``, ``supplementary_material``). :type name: str :return: A relative URL for the uploaded file, to be used as the field value in a Note. :rtype: str """ headers = self.headers.copy() with open(file_path, 'rb') as f: response = self.session.put(self.baseurl + '/attachment', files=( ('invitationId', (None, invitation)), ('name', (None, name)), ('file', (file_path, f)) ), headers = headers) response = self.__handle_response(response) return response.json()['url']
[docs] def post_profile(self, profile): """ Updates a Profile :param profile: Profile object :type profile: Profile :return: The new updated Profile :rtype: Profile """ response = self.session.post( self.profiles_url, json = profile.to_json(), headers = self.headers) response = self.__handle_response(response) return Profile.from_json(response.json())
[docs] def rename_domain(self, old_domain, new_domain, request_form, additional_renames=None): """ Updates the domain for an entire venue :param old_domain: Current domain :param new_domain: New domain :return: Status of the request. The process can be tracked in the queue. :rtype: dict """ json = { 'oldDomain': old_domain, 'newDomain': new_domain, 'requestForm': request_form } if additional_renames: json['additionalRenames'] = additional_renames response = self.session.post( self.domains_rename, json = json, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def rename_profile(self, current_id, new_id): """Rename a Profile by changing its tilde ID. :param current_id: Current profile ID (e.g., ``~Old_Name1``). :type current_id: str :param new_id: New profile ID (e.g., ``~New_Name1``). :type new_id: str :return: The updated Profile with the new ID. :rtype: Profile """ response = self.session.post( self.profiles_rename, json = { 'currentId': current_id, 'newId': new_id }, headers = self.headers) response = self.__handle_response(response) return Profile.from_json(response.json())
[docs] def merge_profiles(self, profileTo, profileFrom): """ Merges two Profiles :param profileTo: Profile object to merge to :type profileTo: Profile :param profileFrom: Profile object to merge from (this profile will be deleted) :type: profileFrom: Profile :return: The new updated Profile :rtype: Profile """ response = self.session.post( self.profiles_merge_url, json = { 'to': profileTo, 'from': profileFrom }, headers = self.headers) response = self.__handle_response(response) return Profile.from_json(response.json())
[docs] def moderate_profile(self, profile_id, decision, reason=None): """ Moderates a Profile :param profile_id: Profile id to moderate :type profile_id: str :param decision: Moderation decision (accept, reject, block, unblock, delete, restore, limit) :type decision: str :param reason: Reason for the decision. When rejecting, this text is emailed to the user. :type reason: str, optional :return: The new updated Profile :rtype: Profile """ body = { 'id': profile_id, 'decision': decision } if reason is not None: body['reason'] = reason response = self.session.post( self.profiles_moderate, json = body, headers = self.headers) response = self.__handle_response(response) return Profile.from_json(response.json())
[docs] def update_relation_readers(self, update): """ Updates the relation readers available in the profile. This is an admin method. :param update: Dictionary that accepts the keys append or remove with a list of group ids. :type update: dict :return: The new list of relation readers :rtype: list """ response = self.session.patch( self.relation_readers_url, json = update, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def get_groups(self, id=None, invitation=None, prefix=None, member=None, members=None, signatory=None, web=None, limit=None, offset=None, after=None, stream=None, sort=None, with_count=None, domain=None): """ Gets list of Group objects based on the filters provided. The Groups that will be returned match all the criteria passed in the parameters. :param id: id of the Group :type id: str, optional :param prefix: Prefix that matches several Group ids :type prefix: str, optional :param member: Groups that that are transitive members of the member value :type member: str, optional :param members: Groups that contain the value members in the members field :type members: str, optional :param signatory: Groups that contain this signatory :type signatory: str, optional :param web: Groups that contain a web field value :type web: bool, optional :param limit: Maximum amount of Groups that this method will return. The limit parameter can range between 0 and 1000 inclusive. If a bigger number is provided, only 1000 Groups will be returned :type limit: int, optional :param offset: Indicates the position to start retrieving Groups. For example, if there are 10 Groups and you want to obtain the last 3, then the offset would need to be 7. :type offset: int, optional :return: List of Groups :rtype: list[Group] """ params = {} if id is not None: params['id'] = id if invitation is not None: params['invitation'] = invitation if prefix is not None: params['prefix'] = prefix if member is not None: params['member'] = member if members is not None: params['members'] = members if signatory is not None: params['signatory'] = signatory if sort is not None: params['sort'] = sort if web is not None: params['web'] = web if limit is not None: params['limit'] = limit if offset is not None: params['offset'] = offset if after is not None: params['after'] = after if stream is not None: params['stream'] = stream if with_count is not None: params['count'] = with_count if domain is not None: params['domain'] = domain response = self.session.get(self.groups_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) groups = [Group.from_json(g) for g in response.json()['groups']] if with_count and params.get('offset') is None: return groups, response.json()['count'] return groups
[docs] def get_all_groups(self, id=None, invitation=None, parent=None, prefix=None, member=None, members=None, domain=None, signatory=None, web=None, sort=None, with_count=None): """ Gets list of Group objects based on the filters provided. The Groups that will be returned match all the criteria passed in the parameters. :param id: id of the Group :type id: str, optional :param parent: id of the parent Group :type parent: str, optional :param prefix: Prefix that matches several Group ids :type prefix: str, optional :param member: Groups that that are trasitive members of the member value :type member: str, optional :param members: Groups that contain the value members in the members field :type members: str, optional :param signatory: Groups that contain this signatory :type signatory: str, optional :param web: Groups that contain a web field value :type web: bool, optional :param limit: Maximum amount of Groups that this method will return. The limit parameter can range between 0 and 1000 inclusive. If a bigger number is provided, only 1000 Groups will be returned :type limit: int, optional :param offset: Indicates the position to start retrieving Groups. For example, if there are 10 Groups and you want to obtain the last 3, then the offset would need to be 7. :type offset: int, optional :param after: Group id to start getting the list of groups from. :type after: str, optional :return: List of Groups :rtype: list[Group] """ params = { 'stream': True } if id is not None: params['id'] = id if invitation is not None: params['invitation'] = invitation if parent is not None: params['parent'] = parent if prefix is not None: params['prefix'] = prefix if member is not None: params['member'] = member if members is not None: params['members'] = members if signatory is not None: params['signatory'] = signatory if domain is not None: params['domain'] = domain if web is not None: params['web'] = web if sort is not None: params['sort'] = sort if with_count is not None: params['with_count'] = with_count return self.get_groups(**params)
[docs] def get_invitations(self, id = None, ids = None, invitee = None, replytoNote = None, replyForum = None, signature = None, note = None, prefix = None, tags = None, limit = None, offset = None, after = None, minduedate = None, duedate = None, pastdue = None, replyto = None, details = None, expired = None, sort = None, type = None, with_count=None, invitation = None, trash = None, stream = None, domain = None ): """ Gets list of Invitation objects based on the filters provided. The Invitations that will be returned match all the criteria passed in the parameters. :param id: id of the Invitation :type id: str, optional :param ids: Comma separated Invitation IDs. If provided, returns invitations whose "id" value is any of the passed Invitation IDs. :type ids: str, optional :param invitee: Invitations that contain this invitee :type invitee: str, optional :param replytoNote: Invitations that contain this replytoNote :type replytoNote: str, optional :param replyForum: Invitations that contain this replyForum :type replyForum: str, optional :param signature: Invitations that contain this signature :type signature: optional :param note: Invitations that contain this note :type note: str, optional :param prefix: Invitation ids that match this prefix :type prefix: str, optional :param tags: Invitations that contain these tags :type tags: Tag, optional :param int limit: Maximum amount of Invitations that this method will return. The limit parameter can range between 0 and 1000 inclusive. If a bigger number is provided, only 1000 Invitations will be returned :type limit: int, optional :param int offset: Indicates the position to start retrieving Invitations. For example, if there are 10 Invitations and you want to obtain the last 3, then the offset would need to be 7. :type offset: int, optional :param after: Invitation id to start getting the list of invitations from. :type after: str, optional :param minduedate: Invitations that have at least this value as due date :type minduedate: int, optional :param duedate: Invitations that contain this due date :type duedate: int, optional :param pastdue: Invitaions that are past due :type pastdue: bool, optional :param replyto: Invitations that contain this replyto :type replyto: optional :param details: TODO: What is a valid value for this field? :type details: dict, optional :param expired: If true, retrieves the Invitations that have expired, otherwise, the ones that have not expired :type expired: bool, optional :param trash: If true, retrieves the Invitations that have been trashed, otherwise, the ones that have not been trashed :type trash: bool, optional :return: List of Invitations :rtype: list[Invitation] """ params = {} if id is not None: params['id'] = id if ids is not None: params['ids'] = ids if invitee is not None: params['invitee'] = invitee if replytoNote is not None: params['replytoNote'] = replytoNote if replyForum is not None: params['replyForum'] = replyForum if signature is not None: params['signature'] = signature if note is not None: params['note']=note if prefix is not None: params['prefix'] = prefix if tags is not None: params['tags'] = tags if minduedate is not None: params['minduedate'] = minduedate if replyto is not None: params['replyto'] = replyto if duedate is not None: params['duedate'] = duedate if pastdue is not None: params['pastdue'] = pastdue if details is not None: params['details'] = details if limit is not None: params['limit'] = limit if offset is not None: params['offset'] = offset if after is not None: params['after'] = after if sort is not None: params['sort'] = sort if expired is not None: params['expired'] = expired if type is not None: params['type'] = type if invitation is not None: params['invitation'] = invitation if with_count is not None: params['count'] = with_count if trash is not None: params['trash'] = trash if domain is not None: params['domain'] = domain if stream is not None: params['stream'] = stream response = self.session.get(self.invitations_url, params=tools.format_params(params), headers=self.headers) response = self.__handle_response(response) invitations = [Invitation.from_json(i) for i in response.json()['invitations']] if with_count and params.get('offset') is None: return invitations, response.json()['count'] return invitations
[docs] def get_all_invitations(self, id = None, ids = None, invitee = None, replytoNote = None, replyForum = None, signature = None, note = None, prefix = None, tags = None, minduedate = None, duedate = None, pastdue = None, replyto = None, expired = None, sort = None, type = None, invitation = None, trash = None, domain = None ): """ Gets list of Invitation objects based on the filters provided. The Invitations that will be returned match all the criteria passed in the parameters. :param id: id of the Invitation :type id: str, optional :param ids: Comma separated Invitation IDs. If provided, returns invitations whose "id" value is any of the passed Invitation IDs. :type ids: str, optional :param invitee: Invitations that contain this invitee :type invitee: str, optional :param replytoNote: Invitations that contain this replytoNote :type replytoNote: str, optional :param replyForum: Invitations that contain this replyForum :type replyForum: str, optional :param signature: Invitations that contain this signature :type signature: optional :param note: Invitations that contain this note :type note: str, optional :param prefix: Invitation ids that match this prefix :type prefix: str, optional :param tags: Invitations that contain these tags :type tags: Tag, optional :param minduedate: Invitations that have at least this value as due date :type minduedate: int, optional :param duedate: Invitations that contain this due date :type duedate: int, optional :param pastdue: Invitaions that are past due :type pastdue: bool, optional :param replyto: Invitations that contain this replyto :type replyto: optional :param details: TODO: What is a valid value for this field? :type details: dict, optional :param expired: If true, retrieves the Invitations that have expired, otherwise, the ones that have not expired :type expired: bool, optional :return: List of Invitations :rtype: list[Invitation] """ params = { 'stream': True } if id is not None: params['id'] = id if ids is not None: params['ids'] = ids if invitee is not None: params['invitee'] = invitee if replytoNote is not None: params['replytoNote'] = replytoNote if replyForum is not None: params['replyForum'] = replyForum if signature is not None: params['signature'] = signature if note is not None: params['note'] = note if prefix is not None: params['prefix'] = prefix if tags is not None: params['tags'] = tags if minduedate is not None: params['minduedate'] = minduedate if duedate is not None: params['duedate'] = duedate if pastdue is not None: params['pastdue'] = pastdue if replyto is not None: params['replyto'] = replyto if expired is not None: params['expired'] = expired if sort is not None: params['sort'] = sort if type is not None: params['type'] = type if invitation is not None: params['invitation'] = invitation if trash is not None: params['trash'] = trash if domain is not None: params['domain'] = domain return self.get_invitations(**params)
[docs] def get_invitation_edit(self, id): """ Get a single edit by id if available :param id: id of the edit :type id: str :return: edit matching the passed id :rtype: Note """ response = self.session.get(self.invitation_edits_url, params = {'id':id}, headers = self.headers) response = self.__handle_response(response) n = response.json()['edits'][0] return Edit.from_json(n)
[docs] def get_invitation_edits(self, invitation_id = None, invitation = None, with_count=None, sort=None): """ Gets a list of edits for a note. The edits that will be returned match all the criteria passed in the parameters. :return: List of edits :rtype: list[Edit] """ params = {} if invitation_id: params['invitation.id'] = invitation_id if invitation: params['invitation'] = invitation if sort: params['sort'] = sort if with_count is not None: params['count'] = with_count response = self.session.get(self.invitation_edits_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) edits = [Edit.from_json(n) for n in response.json()['edits']] if with_count and params.get('offset') is None: return edits, response.json()['count'] return edits
[docs] def get_notes(self, id = None, external_id = None, paperhash = None, forum = None, invitation = None, parent_invitations = None, replyto = None, tauthor = None, signature = None, transitive_members = None, signatures = None, writer = None, trash = None, number = None, content = None, limit = None, offset = None, after = None, mintcdate = None, domain = None, paper_hash = None, details = None, sort = None, with_count=None, stream=None ): """ Gets list of Note objects based on the filters provided. The Notes that will be returned match all the criteria passed in the parameters. :param id: a Note ID. If provided, returns Notes whose ID matches the given ID. :type id: str, optional :param paperhash: A "paperhash" for a note. If provided, returns Notes whose paperhash matches this argument. (A paperhash is a human-interpretable string built from the Note's title and list of authors to uniquely identify the Note) :type paperhash: str, optional :param forum: A Note ID. If provided, returns Notes whose forum matches the given ID. :type forum: str, optional :param invitation: An Invitation ID. If provided, returns Notes whose "invitation" field is this Invitation ID. :type invitation: str, optional :param replyto: A Note ID. If provided, returns Notes whose replyto field matches the given ID. :type replyto: str, optional :param tauthor: A Group ID. If provided, returns Notes whose tauthor field ("true author") matches the given ID, or is a transitive member of the Group represented by the given ID. :type tauthor: str, optional :param signature: A Group ID. If provided, returns Notes whose signatures field contains the given Group ID. :type signature: str, optional :param transitive_members: If true, returns Notes whose tauthor field is a transitive member of the Group represented by the given Group ID. :type transitive_members: bool, optional :param signatures: Group IDs. If provided, returns Notes whose signatures field contains the given Group IDs. :type signatures: list[str], optional :param writer: A Group ID. If provided, returns Notes whose writers field contains the given Group ID. :type writer: str, optional :param trash: If True, includes Notes that have been deleted (i.e. the ddate field is less than the current date) :type trash: bool, optional :param number: If present, includes Notes whose number field equals the given integer. :type number: int, optional :param content: If present, includes Notes whose each key is present in the content field and it is equals the given value. :type content: dict, optional :param limit: Maximum amount of Notes that this method will return. The limit parameter can range between 0 and 1000 inclusive. If a bigger number is provided, only 1000 Notes will be returned :type limit: int, optional :param offset: Indicates the position to start retrieving Notes. For example, if there are 10 Notes and you want to obtain the last 3, then the offset would need to be 7. :type offset: int, optional :param after: Note id to start getting the list of notes from. :type after: str, optional :param mintcdate: Represents an Epoch time timestamp, in milliseconds. If provided, returns Notes whose "true creation date" (tcdate) is at least equal to the value of mintcdate. :type mintcdate: int, optional :param domain: If provided, returns Notes whose domain field matches the given domain. :param details: TODO: What is a valid value for this field? :type details: optional :param sort: Sorts the output by field depending on the string passed. Possible values: number, cdate, ddate, tcdate, tmdate, replyCount (Invitation id needed in the invitation field). :type sort: str, optional :return: List of Notes :rtype: list[Note] """ params = {} if id is not None: params['id'] = id if external_id is not None: params['externalId'] = external_id if paperhash is not None: params['paperhash'] = paperhash if forum is not None: params['forum'] = forum if invitation is not None: params['invitation'] = invitation if parent_invitations is not None: params['parentInvitations'] = parent_invitations if replyto is not None: params['replyto'] = replyto if tauthor is not None: params['tauthor'] = tauthor if signature is not None: params['signature'] = signature if transitive_members is not None: params['transitiveMembers'] = transitive_members if signatures is not None: params['signatures'] = signatures if writer is not None: params['writer'] = writer if trash == True: params['trash']=True if number is not None: params['number'] = number if content is not None: for k in content: params['content.' + k] = content[k] if limit is not None: params['limit'] = limit if offset is not None: params['offset'] = offset if mintcdate is not None: params['mintcdate'] = mintcdate if domain is not None: params['domain'] = domain if paper_hash is not None: params['paperhash'] = paper_hash if details is not None: params['details'] = details if after is not None: params['after'] = after if sort is not None: params['sort'] = sort if with_count is not None: params['count'] = with_count if stream is not None: params['stream'] = stream response = self.session.get(self.notes_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) notes = [Note.from_json(n) for n in response.json()['notes']] if with_count and params.get('offset') is None: return notes, response.json()['count'] return notes
[docs] def get_all_notes(self, id = None, paperhash = None, forum = None, invitation = None, parent_invitations = None, replyto = None, signature = None, transitive_members = None, signatures = None, writer = None, trash = None, number = None, content = None, mintcdate = None, details = None, select = None, sort = None, domain=None ): """ Gets list of Note objects based on the filters provided. The Notes that will be returned match all the criteria passed in the parameters. :param id: a Note ID. If provided, returns Notes whose ID matches the given ID. :type id: str, optional :param paperhash: A "paperhash" for a note. If provided, returns Notes whose paperhash matches this argument. (A paperhash is a human-interpretable string built from the Note's title and list of authors to uniquely identify the Note) :type paperhash: str, optional :param forum: A Note ID. If provided, returns Notes whose forum matches the given ID. :type forum: str, optional :param invitation: An Invitation ID. If provided, returns Notes whose "invitation" field is this Invitation ID. :type invitation: str, optional :param parent_invitations: An Invitation ID. If provided, returns Notes whose parentInvitations field contains the given Invitation ID. :type parent_invitations: str, optional :param replyto: A Note ID. If provided, returns Notes whose replyto field matches the given ID. :type replyto: str, optional :param signature: A Group ID. If provided, returns Notes whose signatures field contains the given Group ID. :type signature: str, optional :param transitive_members: If true, returns Notes whose tauthor field is a transitive member of the Group represented by the given Group ID. :type transitive_members: bool, optional :param signatures: Group IDs. If provided, returns Notes whose signatures field contains the given Group IDs. :type signatures: list[str], optional :param writer: A Group ID. If provided, returns Notes whose writers field contains the given Group ID. :type writer: str, optional :param trash: If True, includes Notes that have been deleted (i.e. the ddate field is less than the current date) :type trash: bool, optional :param number: If present, includes Notes whose number field equals the given integer. :type number: int, optional :param content: If present, includes Notes whose each key is present in the content field and it is equals the given value. :type content: dict, optional :param after: Note id to start getting the list of notes from. :type after: str, optional :param mintcdate: Represents an Epoch time timestamp, in milliseconds. If provided, returns Notes whose "true creation date" (tcdate) is at least equal to the value of mintcdate. :type mintcdate: int, optional :param details: TODO: What is a valid value for this field? :type details: optional :param sort: Sorts the output by field depending on the string passed. Possible values: number, cdate, ddate, tcdate, tmdate, replyCount (Invitation id needed in the invitation field). :type sort: str, optional :return: List of Notes :rtype: list[Note] """ params = {} if id is not None: params['id'] = id if paperhash is not None: params['paperhash'] = paperhash if forum is not None: params['forum'] = forum if invitation is not None: params['invitation'] = invitation if parent_invitations is not None: params['parent_invitations'] = parent_invitations if replyto is not None: params['replyto'] = replyto if signature is not None: params['signature'] = signature if signatures is not None: params['signatures'] = signatures if transitive_members is not None: params['transitive_members'] = transitive_members if writer is not None: params['writer'] = writer if trash == True: params['trash']=True if number is not None: params['number'] = number if content is not None: params['content'] = content if mintcdate is not None: params['mintcdate'] = mintcdate if details is not None: params['details'] = details if select: params['select'] = select if sort is not None: params['sort'] = sort if domain is not None: params['domain'] = domain if 'details' not in params: params['stream'] = True # Handle sort param for local sorting sort_key = None reverse = False if 'sort' in params: # Accept format like "number:asc", "tcdate:desc", etc. valid_fields = { 'number': lambda n: n.number, 'tcdate': lambda n: n.tcdate, 'tmdate': lambda n: n.tmdate, 'cdate': lambda n: n.cdate, 'mdate': lambda n: n.mdate } if ':' in sort: field, direction = sort.split(':', 1) else: field, direction = sort, 'desc' if field in valid_fields: sort_key = valid_fields[field] reverse = direction == 'desc' params['sort'] = None # Remove for API call, sort locally results = self.get_notes(**params) if sort_key: return sorted(results, key=sort_key, reverse=reverse) return results return list(tools.efficient_iterget(self.get_notes, desc='Getting V2 Notes', **params))
[docs] def get_note_edit(self, id, trash=None): """ Get a single edit by id if available :param id: id of the edit :type id: str :return: edit matching the passed id :rtype: Note """ response = self.session.get(self.note_edits_url, params = {'id':id, 'trash': 'true' if trash == True else 'false'}, headers = self.headers) response = self.__handle_response(response) n = response.json()['edits'][0] return Edit.from_json(n)
[docs] def get_note_edits(self, note_id = None, invitation = None, with_count=None, sort=None, trash=None, limit=None): """Get a list of Edit objects for a Note matching the filters provided. Returns edits that match all the criteria passed in the parameters. When ``with_count`` is True, returns a tuple of ``(edits, count)``. :param note_id: ID of the Note whose edits to retrieve. :type note_id: str, optional :param invitation: Invitation ID to filter edits by. :type invitation: str, optional :param with_count: If True, also returns the total count of matching edits. :type with_count: bool, optional :param sort: Field to sort results by (e.g., ``tcdate``, ``tmdate``). :type sort: str, optional :param trash: If True, includes soft-deleted edits in the results. :type trash: bool, optional :param limit: Maximum number of edits to return. :type limit: int, optional :return: List of Edit objects, or a tuple ``(list[Edit], int)`` when ``with_count`` is True. :rtype: list[Edit] | tuple[list[Edit], int] """ params = {} if note_id: params['note.id'] = note_id if invitation: params['invitation'] = invitation if sort: params['sort'] = sort if trash: params['trash'] = trash if with_count is not None: params['count'] = with_count if limit is not None: params['limit'] = limit response = self.session.get(self.note_edits_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) edits = [Edit.from_json(n) for n in response.json()['edits']] if with_count and params.get('offset') is None: return edits, response.json()['count'] return edits
[docs] def get_group_edit(self, id): """ Get a single edit by id if available :param id: id of the edit :type id: str :return: edit matching the passed id :rtype: Group """ response = self.session.get(self.group_edits_url, params = {'id':id}, headers = self.headers) response = self.__handle_response(response) n = response.json()['edits'][0] return Edit.from_json(n)
[docs] def get_group_edits(self, group_id = None, invitation = None, with_count = False, sort = None, trash = None): """ Gets a list of edits for a group. The edits that will be returned match all the criteria passed in the parameters. :return: List of edits :rtype: list[Edit] """ params = {} if group_id: params['group.id'] = group_id if invitation: params['invitation'] = invitation if sort: params['sort'] = sort if trash: params['trash'] = trash if with_count is not None: params['count'] = with_count response = self.session.get(self.group_edits_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) edits = [Edit.from_json(n) for n in response.json()['edits']] if with_count and params.get('offset') is None: return edits, response.json()['count'] return edits
[docs] def post_tag(self, tag): """ Posts the tag. :param tag: Tag to be posted :type tag: Tag :return Tag: The posted Tag """ response = self.session.post(self.tags_url, json = tag.to_json(), headers = self.headers) response = self.__handle_response(response) return Tag.from_json(response.json())
[docs] def post_tags(self, tags): ''' Posts the list of Tags. Returns a list Tag objects updated with their ids. ''' send_json = [tag.to_json() for tag in tags] response = self.session.post(self.bulk_tags_url, json = send_json, headers = self.headers) response = self.__handle_response(response) received_json_array = response.json() tag_objects = [Tag.from_json(tag) for tag in received_json_array] return tag_objects
[docs] def rename_tags(self, current_id, new_id): """ Updates a Tag """ response = self.session.post( self.tags_rename, json = { 'currentId': current_id, 'newId': new_id }, headers = self.headers) response = self.__handle_response(response) return
#return response.json()
[docs] def get_tags(self, id = None, note = None, invitation = None, parent_invitations = None, forum = None, profile = None, signature = None, tag = None, limit = None, offset = None, with_count=None, mintmdate=None, stream=None, domain=None): """Get a list of Tag objects based on the filters provided. Returns Tags matching all the criteria passed in the parameters. When ``with_count`` is True and ``offset`` is not set, returns a tuple of ``(tags, count)``. :param id: A Tag ID. If provided, returns the Tag whose ID matches. :type id: str, optional :param note: A Note ID. If provided, returns Tags whose ``note`` field matches. :type note: str, optional :param invitation: An Invitation ID. If provided, returns Tags whose ``invitation`` field matches. :type invitation: str, optional :param parent_invitations: A list of parent Invitation IDs to filter Tags by. :type parent_invitations: list[str], optional :param forum: A Note ID. If provided, returns Tags whose ``forum`` field matches. :type forum: str, optional :param profile: A Profile ID. If provided, returns Tags associated with this profile. :type profile: str, optional :param signature: A group ID. If provided, returns Tags signed by this group. :type signature: str, optional :param tag: Tag value to filter by. :type tag: str, optional :param limit: Maximum number of Tags to return. :type limit: int, optional :param offset: Number of Tags to skip (for pagination). :type offset: int, optional :param with_count: If True, also returns the total count of matching Tags. :type with_count: bool, optional :param mintmdate: Minimum modification timestamp (in epoch milliseconds). Returns Tags modified on or after this time. :type mintmdate: int, optional :param stream: If True, returns all matching Tags using server-side streaming (ignores ``limit``/``offset``). :type stream: bool, optional :param domain: Venue domain ID; improves query efficiency when the caller is a venue organizer. :type domain: str, optional :return: List of Tag objects, or a tuple ``(list[Tag], int)`` when ``with_count`` is True and ``offset`` is None. :rtype: list[Tag] | tuple[list[Tag], int] """ params = {} if id is not None: params['id'] = id if forum is not None: params['forum'] = forum if note is not None: params['note'] = note if profile is not None: params['profile'] = profile if invitation is not None: params['invitation'] = invitation if parent_invitations is not None: params['parentInvitations'] = parent_invitations if signature is not None: params['signature'] = signature if tag is not None: params['tag'] = tag if limit is not None: params['limit'] = limit if offset is not None: params['offset'] = offset if mintmdate is not None: params['mintmdate'] = mintmdate if with_count is not None: params['count'] = with_count if stream is not None: params['stream'] = stream if domain is not None: params['domain'] = domain response = self.session.get(self.tags_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) tags = [Tag.from_json(t) for t in response.json()['tags']] if with_count and params.get('offset') is None: return tags, response.json()['count'] return tags
[docs] def get_all_tags(self, id = None, invitation = None, parent_invitations = None, forum = None, note = None, profile = None, signature = None, tag = None, domain=None): """ Gets a list of Tag objects based on the filters provided. The Tags that will be returned match all the criteria passed in the parameters. :param id: A Tag ID. If provided, returns Tags whose ID matches the given ID. :type id: str, optional :param forum: A Note ID. If provided, returns Tags whose forum matches the given ID. :type forum: str, optional :param invitation: An Invitation ID. If provided, returns Tags whose "invitation" field is this Invitation ID. :type invitation: str, optional :return: List of tags :rtype: list[Tag] """ params = { 'id': id, 'invitation': invitation, 'parent_invitations': parent_invitations, 'forum': forum, 'note': note, 'profile': profile, 'signature': signature, 'tag': tag, 'domain': domain, 'stream': True } return self.get_tags(**params)
[docs] def get_edges(self, id = None, invitation = None, head = None, tail = None, label = None, limit = None, offset = None, with_count=None, trash=None, select=None, stream=None, domain=None): """Get a list of Edge objects based on the filters provided. Returns Edges matching all the criteria passed in the parameters. When ``with_count`` is True and ``offset`` is not set, returns a tuple of ``(edges, count)``. :param id: An Edge ID. If provided, returns the Edge whose ID matches. :type id: str, optional :param invitation: An Invitation ID. If provided, returns Edges whose ``invitation`` field matches. :type invitation: str, optional :param head: ID of the Edge head entity (type defined by the edge invitation, e.g., a Note ID or Profile ID). :type head: str, optional :param tail: ID of the Edge tail entity (type defined by the edge invitation, e.g., a Note ID or Profile ID). :type tail: str, optional :param label: Label value to filter Edges by. :type label: str, optional :param limit: Maximum number of Edges to return. Default is determined by the server. :type limit: int, optional :param offset: Number of Edges to skip (for pagination). :type offset: int, optional :param with_count: If True, also returns the total count of matching Edges. :type with_count: bool, optional :param trash: If True, includes soft-deleted Edges in the results. :type trash: bool, optional :param select: Comma-separated list of fields to include in the response (e.g., ``id,head,tail``). :type select: str, optional :param stream: If True, returns all matching Edges using server-side streaming (ignores ``limit``/``offset``). :type stream: bool, optional :param domain: Venue domain ID; improves query efficiency when the caller is a venue organizer. :type domain: str, optional :return: List of Edge objects, or a tuple ``(list[Edge], int)`` when ``with_count`` is True and ``offset`` is None. :rtype: list[Edge] | tuple[list[Edge], int] """ params = {} params['id'] = id params['invitation'] = invitation params['head'] = head params['tail'] = tail params['label'] = label params['limit'] = limit params['offset'] = offset params['trash'] = trash if select is not None: params['select'] = select if stream is not None: params['stream'] = stream if with_count is not None: params['count'] = with_count if domain is not None: params['domain'] = domain response = self.session.get(self.edges_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) edges = [Edge.from_json(e) for e in response.json()['edges']] if with_count and params.get('offset') is None: return edges, response.json()['count'] return edges
[docs] def get_all_edges(self, id = None, invitation = None, head = None, tail = None, label = None, trash=None, select=None, domain=None): """Get all Edge objects matching the filters using server-side streaming. Convenience wrapper around :meth:`get_edges` with ``stream=True``, which retrieves all matching Edges without manual pagination. :param id: An Edge ID. If provided, returns the Edge whose ID matches. :type id: str, optional :param invitation: An Invitation ID. If provided, returns Edges whose ``invitation`` field matches. :type invitation: str, optional :param head: ID of the Edge head entity. :type head: str, optional :param tail: ID of the Edge tail entity. :type tail: str, optional :param label: Label value to filter Edges by. :type label: str, optional :param trash: If True, includes soft-deleted Edges in the results. :type trash: bool, optional :param select: Comma-separated list of fields to include in the response. :type select: str, optional :param domain: Venue domain ID; improves query efficiency when the caller is a venue organizer. :type domain: str, optional :return: List of all matching Edge objects. :rtype: list[Edge] """ params = { 'id': id, 'invitation': invitation, 'head': head, 'tail': tail, 'label': label, 'trash': trash, 'select': select, 'domain': domain, 'stream': True } return self.get_edges(**params)
[docs] def get_edges_count(self, id=None, invitation=None, head=None, tail=None, label=None, domain=None): """Return the count of Edge objects matching the filters provided. If ``domain`` is not provided but ``invitation`` is, the method attempts to infer the domain from the invitation for more efficient querying. :param id: An Edge ID. If provided, counts only the Edge whose ID matches. :type id: str, optional :param invitation: An Invitation ID. If provided, counts Edges whose ``invitation`` field matches. :type invitation: str, optional :param head: ID of the Edge head entity. :type head: str, optional :param tail: ID of the Edge tail entity. :type tail: str, optional :param label: Label value to filter Edges by. :type label: str, optional :param domain: Venue domain ID; improves query efficiency when the caller is a venue organizer. :type domain: str, optional :return: Number of Edges matching the filters. :rtype: int """ params = {} params['id'] = id params['invitation'] = invitation params['head'] = head params['tail'] = tail params['label'] = label if domain is not None: params['domain'] = domain elif invitation is not None: try: edges_invitation = self.get_invitation(invitation) params['domain'] = edges_invitation.domain except: pass response = self.session.get(self.edges_count_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) return response.json()['count']
[docs] def get_grouped_edges(self, invitation=None, head=None, tail=None, label=None, groupby='head', select=None, limit=None, offset=None, trash=None, domain=None): """Get Edges grouped by a specified field. Returns a list of JSON objects where each one represents a group of Edges. For example, with ``groupby='head'`` each group has the form: ``{id: {head: paper-1}, values: [{tail: user-1}, {tail: user-2}]}``. The ``limit`` applies to the number of groups returned, not the number of Edges within each group. :param invitation: An Invitation ID. If provided, returns Edges whose ``invitation`` field matches. :type invitation: str, optional :param head: ID of the Edge head entity to filter by. :type head: str, optional :param tail: ID of the Edge tail entity to filter by. :type tail: str, optional :param label: Label value to filter Edges by. :type label: str, optional :param groupby: Field to group Edges by. Defaults to ``head``. :type groupby: str, optional :param select: Comma-separated list of fields to include in each group's values. :type select: str, optional :param limit: Maximum number of groups to return. :type limit: int, optional :param offset: Number of groups to skip (for pagination). :type offset: int, optional :param trash: If True, includes soft-deleted Edges in the results. :type trash: bool, optional :param domain: Venue domain ID; improves query efficiency when the caller is a venue organizer. :type domain: str, optional :return: List of grouped edge dictionaries, each containing ``id`` and ``values`` keys. :rtype: list[dict] """ params = {} params['id'] = None params['invitation'] = invitation params['head'] = head params['tail'] = tail params['label'] = label params['groupBy'] = groupby params['select'] = select params['limit'] = limit params['offset'] = offset params['trash'] = trash params['domain'] = domain response = self.session.get(self.edges_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) json = response.json() return json['groupedEdges'] # a list of JSON objects holding information about an edge
[docs] def get_archived_edges(self, invitation): """ Returns a list of Edge objects based on the filters provided. :arg invitation: an Invitation ID. If provided, returns Edges whose "invitation" field is this Invitation ID. """ params = {'invitation': invitation} print('tools.format_params(params)', tools.format_params(params)) response = self.session.get(self.edges_archive, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) edges = [Edge.from_json(e) for e in response.json()['edges']] return edges
[docs] def post_edge(self, edge): """Post a single Edge to the server. Creates or updates an Edge. Upon success, returns the posted Edge object with its server-assigned ``id``. :param edge: Edge object to post. :type edge: Edge :return: The posted Edge object. :rtype: Edge """ response = self.session.post(self.edges_url, json = edge.to_json(), headers = self.headers) response = self.__handle_response(response) return Edge.from_json(response.json())
[docs] def post_edges (self, edges): ''' Posts the list of Edges. Returns a list Edge objects updated with their ids. ''' send_json = [edge.to_json() for edge in edges] response = self.session.post(self.bulk_edges_url, json = send_json, headers = self.headers) response = self.__handle_response(response) received_json_array = response.json() edge_objects = [Edge.from_json(edge) for edge in received_json_array] return edge_objects
[docs] def rename_edges(self, current_id, new_id): """Rename all Edges that reference a given ID, replacing it with a new ID. Updates the ``head`` and ``tail`` fields of all Edges that contain ``current_id``, replacing occurrences with ``new_id``. :param current_id: The current ID to find in Edge head/tail fields (e.g., a profile tilde ID). :type current_id: str :param new_id: The new ID to replace it with. :type new_id: str :return: List of updated Edge objects. :rtype: list[Edge] """ response = self.session.post( self.edges_rename, json = { 'currentId': current_id, 'newId': new_id }, headers = self.headers) response = self.__handle_response(response) #print('RESPONSE: ', response.json()) received_json_array = response.json() edge_objects = [Edge.from_json(edge) for edge in received_json_array] return edge_objects
[docs] def post_venue(self, venue): """ Posts the venue. Upon success, returns the posted Venue object. """ response = self.session.post(self.venues_url, json=venue, headers=self.headers) response = self.__handle_response(response) return response.json()
[docs] def restrict(self, venue_id): """ Restricts a domain/venue, preventing non-authorized users from accessing its data. :param venue_id: the domain/venue ID to restrict :type venue_id: str :return: the API response :rtype: dict """ response = self.session.post(self.domains_restriction, json={'domain': venue_id, 'action': 'restrict'}, headers=self.headers) response = self.__handle_response(response) return response.json()
[docs] def unrestrict(self, venue_id): """ Removes the restriction from a domain/venue, restoring normal data access. :param venue_id: the domain/venue ID to unrestrict :type venue_id: str :return: the API response :rtype: dict """ response = self.session.post(self.domains_restriction, json={'domain': venue_id, 'action': 'unrestrict'}, headers=self.headers) response = self.__handle_response(response) return response.json()
[docs] def delete_edges(self, invitation, id=None, label=None, head=None, tail=None, wait_to_finish=False, soft_delete=False): """ Deletes edges by a combination of invitation id and one or more of the optional filters. :param invitation: an invitation ID :type invitation: str :param label: a matching label ID :type label: str, optional :param head: id of the edge head (head type defined by the edge invitation) :type head: str, optional :param tail: id of the edge tail (tail type defined by the edge invitation) :type tail: str, optional :param wait_to_finish: True if execution should pause until deletion of edges is finished :type wait_to_finish: bool, optional :return: a {status = 'ok'} in case of a successful deletion and an OpenReview exception otherwise :rtype: dict """ delete_query = {'invitation': invitation} if label: delete_query['label'] = label if head: delete_query['head'] = head if tail: delete_query['tail'] = tail if id: delete_query['id'] = id delete_query['waitToFinish'] = wait_to_finish delete_query['softDelete'] = soft_delete response = self.session.delete(self.edges_url, json = delete_query, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def delete_tags(self, invitation, id=None, label=None, wait_to_finish=False, soft_delete=False): """ Deletes tags by a combination of invitation id and one or more of the optional filters. :param invitation: an invitation ID :type invitation: str :param label: a matching label ID :type label: str, optional :param wait_to_finish: True if execution should pause until deletion of tags is finished :type wait_to_finish: bool, optional :param soft_delete: True if the tag should be soft deleted, False if it should be hard deleted :type soft_delete: bool, optional :return: a {status = 'ok'} in case of a successful deletion and an OpenReview exception otherwise :rtype: dict """ delete_query = {'invitation': invitation} if label: delete_query['label'] = label if id: delete_query['id'] = id delete_query['waitToFinish'] = wait_to_finish delete_query['softDelete'] = soft_delete response = self.session.delete(self.tags_url, json = delete_query, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def delete_note(self, note_id): """ Deletes the note :param note_id: ID of Note to be deleted :type note_id: str :return: a {status = 'ok'} in case of a successful deletion and an OpenReview exception otherwise :rtype: dict """ response = self.session.delete(self.notes_url, json = {'id': note_id}, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def delete_profile_reference(self, reference_id): ''' Deletes the Profile Reference specified by `reference_id`. :param reference_id: ID of the Profile Reference to be deleted. :type reference_id: str :return: a {status = 'ok'} in case of a successful deletion and an OpenReview exception otherwise :rtype: dict ''' response = self.session.delete(self.profiles_url + '/reference', json = {'id': reference_id}, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def delete_group(self, group_id): """ Deletes the group :param group_id: ID of Group to be deleted :type group_id: str :return: a {status = 'ok'} in case of a successful deletion and an OpenReview exception otherwise :rtype: dict """ response = self.session.delete(self.groups_url, json = {'id': group_id}, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def delete_institution(self, institution_id): """ Deletes the institution :param institution_id: ID of Institution to be deleted :type institution_id: str :return: a {status = 'ok'} in case of a successful deletion and an OpenReview exception otherwise :rtype: dict """ response = self.session.delete(self.institutions_url + '/' + institution_id, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def post_message(self, subject, recipients, message, invitation=None, signature=None, ignoreRecipients=None, sender=None, replyTo=None, parentGroup=None, use_job=None): """ Posts a message to the recipients and consequently sends them emails :param subject: Subject of the e-mail :type subject: str :param recipients: Recipients of the e-mail. Valid inputs would be tilde username or emails registered in OpenReview :type recipients: list[str] :param message: Message in the e-mail :type message: str :param invitation: Invitation ID of the invitation that allows to send the message :type invitation: str :param signature: Signature of the user sending the message :type signature: str :param ignoreRecipients: List of groups ids to be ignored from the recipient list :type subject: list[str] :param sender: Specify the from address and name of the email, the dictionary should have two keys: 'name' and 'email' :type sender: dict :param replyTo: e-mail address used when recipients reply to this message :type replyTo: str :param parentGroup: parent group recipients of e-mail belong to :type parentGroup: str :param use_job: If True, the message will be sent using the job queue :type use_job: bool :return: Contains the message that was sent to each Group :rtype: dict """ return self.post_message_request(subject, recipients, message, invitation=invitation, signature=signature, ignoreRecipients=ignoreRecipients, sender=sender, replyTo=replyTo, parentGroup=parentGroup, use_job=use_job)
[docs] def post_message_request(self, subject, recipients, message, invitation=None, signature=None, ignoreRecipients=None, sender=None, replyTo=None, parentGroup=None, use_job=None): """ Posts a message to the recipients and consequently sends them emails :param subject: Subject of the e-mail :type subject: str :param recipients: Recipients of the e-mail. Valid inputs would be tilde username or emails registered in OpenReview :type recipients: list[str] :param message: Message in the e-mail :type message: str :param invitation: Invitation ID of the invitation that allows to send the message :type invitation: str :param signature: Signature of the user sending the message :type signature: str :param ignoreRecipients: List of groups ids to be ignored from the recipient list :type subject: list[str] :param sender: Specify the from address and name of the email, the dictionary should have two keys: 'name' and 'email' :type sender: dict :param replyTo: e-mail address used when recipients reply to this message :type replyTo: str :param parentGroup: parent group recipients of e-mail belong to :type parentGroup: str :param use_job: If True, the message will be sent using the job queue :type use_job: bool :return: Contains the message that was sent to each Group :rtype: dict """ if parentGroup: recipients = self.get_group(parentGroup).transform_to_anon_ids(recipients) json = { 'groups': recipients, 'subject': subject , 'message': message } if invitation: json['invitation'] = invitation if signature: json['signature'] = signature if ignoreRecipients: json['ignoreGroups'] = ignoreRecipients if sender: json['fromName'] = sender.get('fromName') json['fromEmail'] = sender.get('fromEmail') if replyTo: json['replyTo'] = replyTo if parentGroup: json['parentGroup'] = parentGroup if use_job is not None: json['useJob'] = use_job response = self.session.post(self.messages_requests_url, json = json, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def get_message_requests(self, id=None, invitation=None): """ Posts a message to the recipients and consequently sends them emails :param id: ID of the message request :type id: str :param invitation: Invitation ID of the invitation that allows to send the message :type invitation: str :return: Contains the message request used to send the messages :rtype: dict """ params = {} if id: params['id'] = id if invitation: params['invitation'] = invitation if params: response = self.session.get(self.messages_requests_url, params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) return response.json()['requests'] else: return []
[docs] def post_direct_message(self, subject, recipients, message, sender=None): """ Posts a message to the recipients and consequently sends them emails :param subject: Subject of the e-mail :type subject: str :param recipients: Recipients of the e-mail. Valid inputs would be tilde username or emails registered in OpenReview :type recipients: list[str] :param message: Message in the e-mail :type message: str :return: Contains the message that was sent to each Group :rtype: dict """ response = self.session.post(self.messages_direct_url, json = { 'groups': recipients, 'subject': subject , 'message': message, 'from': sender }, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def add_members_to_group(self, group, members): """ Adds members to a group :param group: Group (or Group's id) to which the members will be added :type group: Group or str :param members: Members that will be added to the group. Members should be in a string, unicode or a list format :type members: str, list, unicode :return: Group with the members added :rtype: Group """ def add_member(group, members): group = self.get_group(group) if type(group) in string_types else group self.post_group_edit(invitation = f'{group.domain}/-/Edit', signatures = group.signatures, group = Group( id = group.id, members = { 'add': list(set(members)) } ), readers=group.signatures, writers=group.signatures ) return self.get_group(group.id) member_type = type(members) if member_type in string_types: return add_member(group, [members]) if member_type == list: return add_member(group, members) raise OpenReviewException("add_members_to_group()- members '"+str(members)+"' ("+str(member_type)+") must be a str, unicode or list, but got " + repr(member_type) + " instead")
[docs] def remove_members_from_group(self, group, members): """ Removes members from a group :param group: Group (or Group's id) from which the members will be removed :type group: Group or str :param members: Members that will be removed. Members should be in a string, unicode or a list format :type members: str, list, unicode :return: Group without the members that were removed :type: Group """ def remove_member(group, members): members_to_remove = list(set(members)) group = self.get_group(group if type(group) in string_types else group.id) members_to_remove = group.transform_to_anon_ids(members_to_remove) self.post_group_edit(invitation = f'{group.domain}/-/Edit', signatures = group.signatures, group = Group( id = group.id, members = { 'remove': members_to_remove } ), readers=group.signatures, writers=group.signatures ) return self.get_group(group.id) member_type = type(members) if member_type in string_types: return remove_member(group, [members]) if member_type == list: return remove_member(group, members)
[docs] def search_notes(self, term, content = 'all', group = 'all', source='all', limit = None, offset = None): """ Searches notes based on term, content, group and source as the criteria. Unlike :meth:`~openreview.Client.get_notes`, this method uses Elasticsearch to retrieve the Notes :param term: Term used to look for the Notes :type term: str :param content: Specifies whether to look in all the content, authors, or keywords. Valid inputs: 'all', 'authors', 'keywords' :type content: str, optional :param group: Specifies under which Group to look. E.g. 'all', 'ICLR', 'UAI', etc. :type group: str, optional :param source: Whether to look in papers, replies or all :type source: str, optional :param limit: Maximum amount of Notes that this method will return. The limit parameter can range between 0 and 1000 inclusive. If a bigger number is provided, only 1000 Notes will be returned :type limit: int, optional :param offset: Indicates the position to start retrieving Notes. For example, if there are 10 Notes and you want to obtain the last 3, then the offset would need to be 7. :type offset: int, optional :return: List of notes :rtype: list[Note] """ params = { 'term': term, 'content': content, 'group': group, 'source': source } if limit is not None: params['limit'] = limit if offset is not None: params['offset'] = offset response = self.session.get(self.notes_url + '/search', params=tools.format_params(params), headers = self.headers) response = self.__handle_response(response) return [Note.from_json(n) for n in response.json()['notes']]
def get_notes_by_ids(self, ids): response = self.session.post(self.notes_url + '/search', json = { 'ids': ids }, headers = self.headers) response = self.__handle_response(response) return [Note.from_json(n) for n in response.json()['notes']]
[docs] def get_tildeusername(self, fullname): """ Gets next possible tilde user name corresponding to the specified full name :param fullname: Full name of the user :type fullname: str :return: next possible tilde user name corresponding to the specified full name :rtype: dict """ response = self.session.get(self.tilde_url, params = { 'fullname': fullname }, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def get_messages(self, to = None, subject = None, status = None, offset = None, limit = None): """ **Only for Super User**. Retrieves all the messages sent to a list of usernames or emails and/or a particular e-mail subject :param to: Tilde user names or emails :type to: list[str], optional :param subject: Subject of the e-mail :type subject: str, optional :param status: Commad separated list of status values corresponding to the message: delivered, bounce, droppped, etc :type status: str, optional :return: Messages that match the passed parameters :rtype: dict """ response = self.session.get(self.messages_url, params = { 'to': to, 'subject': subject, 'status': status, 'offset': offset, 'limit': limit }, headers = self.headers) response = self.__handle_response(response) return response.json()['messages']
[docs] def get_process_logs(self, id = None, invitation = None, status = None, min_sdate = None): """Retrieve process function execution logs. **Only for Super User.** Returns log entries for process functions triggered by edits or invitation date processes. :param id: Edit ID (the ``id`` returned by a ``post_*_edit`` call) that triggered the process function. :type id: str, optional :param invitation: Invitation ID to filter logs by the invitation whose process function produced them. :type invitation: str, optional :param status: Filter by execution status (e.g., ``ok``, ``error``, ``running``). :type status: str, optional :param min_sdate: Minimum start date in epoch milliseconds. Returns logs started on or after this time. :type min_sdate: int, optional :return: List of process log entry dictionaries, each containing ``id``, ``status``, ``log``, and timestamp fields. :rtype: list[dict] """ response = self.session.get(self.process_logs_url, params = { 'id': id, 'invitation': invitation, 'status': status, 'minsdate': min_sdate }, headers = self.headers) response = self.__handle_response(response) return response.json()['logs']
[docs] def post_institution(self, institution): """ Requires Super User permission. Adds an institution if the institution id is not found in the database, otherwise, the institution is updated. :param institution: institution to be posted :type institution: dict :return: The posted institution :rtype: dict """ response = self.session.post(self.institutions_url, json = institution, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def post_invitation_edit(self, invitations, readers=None, writers=None, signatures=None, invitation=None, content=None, replacement=None, domain=None, await_process=False): """Create or update an Invitation via the edit system. Posts an edit that creates a new Invitation or modifies an existing one. The edit is validated against the parent invitation(s) specified by ``invitations``. :param invitations: Parent invitation ID that authorizes this edit (e.g., ``venue/-/Edit``). :type invitations: str :param readers: List of group IDs that can read this edit. :type readers: list[str], optional :param writers: List of group IDs that can modify this edit. :type writers: list[str], optional :param signatures: List of group IDs signing this edit. :type signatures: list[str], optional :param invitation: Invitation object containing the fields to create or update. Use ``invitation.id`` to target an existing Invitation. :type invitation: Invitation, optional :param content: Additional content fields for the edit itself. :type content: dict, optional :param replacement: If True, the edit fully replaces the existing Invitation rather than merging fields. :type replacement: bool, optional :param domain: Domain (venue ID) that this edit belongs to. :type domain: str, optional :param await_process: If True, blocks until the server-side process function completes; raises OpenReviewException on error. :type await_process: bool, optional :return: Dictionary containing the posted edit, including the assigned edit ``id``. :rtype: dict """ edit_json = {} if invitations is not None: edit_json['invitations'] = invitations if readers is not None: edit_json['readers'] = readers if writers is not None: edit_json['writers'] = writers if signatures is not None: edit_json['signatures'] = signatures if content is not None: edit_json['content'] = content if replacement is not None: edit_json['replacement'] = replacement if invitation is not None: edit_json['invitation'] = invitation.to_json() if domain is not None: edit_json['domain'] = domain response = self.session.post(self.invitation_edits_url, json = edit_json, headers = self.headers) response = self.__handle_response(response) if await_process: self.__await_process(response.json()['id']) return response.json()
[docs] def post_note_edit(self, invitation, signatures, note=None, readers=None, writers=None, nonreaders=None, content=None, await_process=False): """Create or update a Note via the edit system. Posts an edit that creates a new Note or modifies an existing one. The edit is validated against the specified invitation's schema. To update an existing Note, set ``note.id`` to the target Note's ID. :param invitation: Invitation ID that defines the schema and permissions for this edit (e.g., ``venue/-/Submission``). :type invitation: str :param signatures: List of group IDs signing this edit. :type signatures: list[str] :param note: Note object containing the fields to create or update. Use ``note.id`` to target an existing Note. :type note: Note, optional :param readers: List of group IDs that can read this edit. :type readers: list[str], optional :param writers: List of group IDs that can modify this edit. :type writers: list[str], optional :param nonreaders: List of group IDs excluded from reading this edit. :type nonreaders: list[str], optional :param content: Additional content fields for the edit itself (not the Note). :type content: dict, optional :param await_process: If True, blocks until the server-side process function completes; raises OpenReviewException on error. :type await_process: bool, optional :return: Dictionary containing the posted edit, including the assigned edit ``id`` and the ``note`` with its ``id``. :rtype: dict """ edit_json = { 'invitation': invitation, 'note': note.to_json() if note else {} } if signatures is not None: edit_json['signatures'] = signatures if readers is not None: edit_json['readers'] = readers if writers is not None: edit_json['writers'] = writers if nonreaders is not None: edit_json['nonreaders'] = nonreaders if content is not None: edit_json['content'] = content response = self.session.post(self.note_edits_url, json = edit_json, headers = self.headers) response = self.__handle_response(response) if await_process: self.__await_process(response.json()['id']) return response.json()
[docs] def post_group_edit(self, invitation, signatures=None, group=None, readers=None, writers=None, content=None, replacement=None, await_process=False, flush_members_cache=True): """Create or update a Group via the edit system. Posts an edit that creates a new Group or modifies an existing one. The edit is validated against the specified invitation's schema. When the edit modifies group members and the signature matches the domain, the members cache is automatically flushed unless ``flush_members_cache`` is False. :param invitation: Invitation ID that defines the schema and permissions for this edit. :type invitation: str :param signatures: List of group IDs signing this edit. :type signatures: list[str], optional :param group: Group object containing the fields to create or update. Use ``group.id`` to target an existing Group. :type group: Group, optional :param readers: List of group IDs that can read this edit. :type readers: list[str], optional :param writers: List of group IDs that can modify this edit. :type writers: list[str], optional :param content: Additional content fields for the edit itself. :type content: dict, optional :param replacement: If True, the edit fully replaces the existing Group rather than merging fields. :type replacement: bool, optional :param await_process: If True, blocks until the server-side process function completes; raises OpenReviewException on error. :type await_process: bool, optional :param flush_members_cache: If True (default), flushes the members cache for affected members when the domain signs the edit. :type flush_members_cache: bool, optional :return: Dictionary containing the posted edit, including the assigned edit ``id``. :rtype: dict """ edit_json = { 'invitation': invitation } if group is not None: edit_json['group'] = group.to_json() if signatures is not None: edit_json['signatures'] = signatures if readers is not None: edit_json['readers'] = readers if writers is not None: edit_json['writers'] = writers if content is not None: edit_json['content'] = content if replacement is not None: edit_json['replacement'] = replacement response = self.session.post(self.group_edits_url, json = edit_json, headers = self.headers) response = self.__handle_response(response) posted_edit = response.json() members = posted_edit.get('group', {}).get('members') if posted_edit['domain'] in posted_edit['signatures']: if flush_members_cache: members_to_flush = [] if isinstance(members, dict): if 'add' in members: for member in members['add']: members_to_flush.append(member) elif 'remove' in members: for member in members['remove']: members_to_flush.append(member) if isinstance(members, list): members_to_flush = members for member in members_to_flush: self.flush_members_cache(member) if await_process: self.__await_process(response.json()['id']) return response.json()
[docs] def post_edit(self, edit): """Post an Edit object, routing it to the correct endpoint based on its contents. Inspects the serialized edit for the presence of ``note``, ``group``, or ``invitation`` keys and POSTs to the corresponding edits endpoint (``/notes/edits``, ``/groups/edits``, or ``/invitations/edits``). :param edit: Edit object to post. Must contain exactly one of ``note``, ``group``, or ``invitation``. :type edit: Edit :return: Dictionary containing the posted edit, including the assigned edit ``id``. :rtype: dict """ edit_json = edit.to_json() if 'note' in edit_json: response = self.session.post(self.note_edits_url, json = edit_json, headers = self.headers) elif 'group' in edit_json: response = self.session.post(self.group_edits_url, json = edit_json, headers = self.headers) elif 'invitation' in edit_json: response = self.session.post(self.invitation_edits_url, json = edit_json, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def get_jobs_status(self): """ **Only for Super User**. Retrieves the jobs status of the queue :return: Jobs status :rtype: dict """ response = self.session.get(self.jobs_status, headers=self.headers) response = self.__handle_response(response) return response.json()
[docs] def request_raw_expertise(self, expertise_request, baseurl=None): """ Calls the Expertise API with a raw expertise request. :param expertise_request: Dictionary containing the expertise request to be sent to the Expertise API :type expertise_request: dict :param baseurl: URL to the host, example: https://api.openreview.net (should be replaced by 'host' name). If none is provided, it defaults to the environment variable `OPENREVIEW_API_BASEURL_V2` :type baseurl: str, optional :return: Dictionary containing the response from the Expertise API :rtype: dict """ base_url = baseurl if baseurl else self.baseurl response = self.session.post(base_url + '/expertise', json = expertise_request, headers = self.headers) response = self.__handle_response(response) return response.json()
def request_expertise(self, name, group_id, venue_id, submission_content=None, alternate_match_group = None, alternate_expertise_selection_id = None, expertise_selection_id=None, model=None, baseurl=None, weight=None, top_recent_pubs=None, percentile_selection=None): # Build entityA from group_id entityA = { 'type': 'Group', 'memberOf': group_id } if expertise_selection_id and tools.get_invitation(self, expertise_selection_id): expertise = { 'invitation': expertise_selection_id } entityA['expertise'] = expertise # Build entityB from alternate_match_group or venue_id if alternate_match_group: entityB = { 'type': 'Group', 'memberOf': alternate_match_group } if alternate_expertise_selection_id and tools.get_invitation(self, alternate_expertise_selection_id): expertise = { 'invitation': alternate_expertise_selection_id } entityB['expertise'] = expertise else: entityB = { 'type': 'Note', 'withVenueid': venue_id, 'withContent': submission_content } expertise_request = { 'name': name, 'entityA': entityA, 'entityB': entityB, 'model': { 'name': model } } if weight: expertise_request['dataset'] = { 'weightSpecification': weight } if top_recent_pubs: expertise_request['dataset'] = { 'topRecentPubs': top_recent_pubs } if percentile_selection: expertise_request['model']['percentileSelect'] = percentile_selection base_url = baseurl if baseurl else self.baseurl response = self.session.post(base_url + '/expertise', json = expertise_request, headers = self.headers) response = self.__handle_response(response) return response.json() def request_single_paper_expertise(self, name, group_id, paper_id, expertise_selection_id=None, model=None, baseurl=None): # Build entityA from group_id entityA = { 'type': 'Group', 'memberOf': group_id } if expertise_selection_id and tools.get_invitation(self, expertise_selection_id): expertise = { 'invitation': expertise_selection_id } entityA['expertise'] = expertise # Build entityB from paper_id entityB = { 'type': 'Note', 'id': paper_id } expertise_request = { 'name': name, 'entityA': entityA, 'entityB': entityB, 'model': { 'name': model } } base_url = baseurl if baseurl else self.baseurl if base_url.startswith('http://localhost'): return {} print('compute expertise', {'name': name, 'match_group': group_id , 'paper_id': paper_id, 'model': model}) response = self.session.post(base_url + '/expertise', json = expertise_request, headers = self.headers) print('response json', response.json()) response = self.__handle_response(response) return response.json()
[docs] def request_paper_similarity(self, name, venue_id=None, alternate_venue_id=None, invitation=None, alternate_invitation=None, submissions=None, alternate_submissions=None,model='specter2+scincl', sparse_value=400, baseurl=None): """ Call to the Expertise API to compute paper-to-paper similarity scores. This can be between 2 different venues or between submissions of the same venue. :param name: name of the job :type name: str :param venue_id: paper venue id for entity A, e.g. venue_id/Submission for active papers :type venue_id: str, optional :param alternate_venue_id: paper venue id for entity B, e.g. venue_id/Submission for active papers :type alternate_venue_id: str, optional :param invitation: invitation to retrieve papers for entity A, e.g. venue_id/-/Submission :type invitation: str, optional :param alternate_invitation: invitation to retrieve papers for entity B, e.g. venue_id/-/Submission :type alternate_invitation: str, optional :param submissions: list of submission notes for entity A :type submissions: list :param alternate_submissions: list of submission notes for entity B :type alternate_submissions: list :param model: model used to compute scores, e.g. "specter2+scincl" :type model: str, optional :param sparse_value: number of top scores to retain per paper. Default and max is 400. :type sparse_value: int, optional :param baseurl: URL to the host, example: https://api.openreview.net (should be replaced by 'host' name). If none is provided, it defaults to the environment variable `OPENREVIEW_API_BASEURL_V2` :type baseurl: str, optional :return: Dictionary containing the job id :rtype: dict """ # Check entity A params if sum(map(bool, [venue_id, invitation, submissions])) != 1: raise OpenReviewException('Provide exactly one of the following: venue_id, invitation, submissions') # Check entity B params if sum(map(bool, [alternate_venue_id, alternate_invitation, alternate_submissions])) != 1: raise OpenReviewException('Provide exactly one of the following: alternate_venue_id, alternate_invitation, alternate_submissions') if sparse_value > 400: raise OpenReviewException('Sparse value should be no greater than 400') entityA = { 'type': "Note" } entityB = { 'type': "Note" } # Build entity A if venue_id: entityA['withVenueid'] = venue_id elif invitation: entityA['invitation'] = invitation elif submissions: formatted_submissions = [ { 'id': submission.id, 'title': submission.content.get('title', {}).get('value', ''), 'abstract': submission.content.get('abstract', {}).get('value', '') } for submission in submissions ] entityA['submissions'] = formatted_submissions # Build entity B if alternate_venue_id: entityB['withVenueid'] = alternate_venue_id elif alternate_invitation: entityB['invitation'] = alternate_invitation elif alternate_submissions: formatted_submissions = [ { 'id': submission.id, 'title': submission.content.get('title', {}).get('value', ''), 'abstract': submission.content.get('abstract', {}).get('value', '') } for submission in alternate_submissions ] entityB['submissions'] = formatted_submissions expertise_request = { "name": name, "entityA": entityA, "entityB": entityB, "model": { "name": model, 'useTitle': True, 'useAbstract': True, 'skipSpecter': False, 'scoreComputation': 'max', 'sparseValue': sparse_value } } base_url = baseurl if baseurl else self.baseurl response = self.session.post(base_url + '/expertise', json = expertise_request, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def request_paper_subset_expertise(self, name, submissions, group_id, expertise_selection_id=None, model='specter2+scincl', weight=None, baseurl=None): """ Call to the Expertise API to compute scores for a subset of papers to a group. :param name: name of the job :type name: str :param submissions: list of submission notes :type submissions: list :param group_id: id of group to compute scores against :type group_id: str :param expertise_selection_id: id of expertise selection invitation for group :type expertise_selection_id: str, optional :param model: model used to compute scores, e.g. "specter2+scincl" :type model: str, optional :param weight: list of dictionaries that specify weights for publications :type weight: list[dict], optional :param baseurl: URL to the host, example: https://api.openreview.net (should be replaced by 'host' name). If none is provided, it defaults to the environment variable `OPENREVIEW_API_BASEURL_V2` :type baseurl: str, optional :return: Dictionary containing the job id :rtype: dict """ # Build entityA from group_id entityA = { 'type': 'Group', 'memberOf': group_id } if expertise_selection_id and tools.get_invitation(self, expertise_selection_id): expertise = { 'invitation': expertise_selection_id } entityA['expertise'] = expertise # Build entityB using submissions formatted_submissions = [ { 'id': submission.id, 'title': submission.content.get('title', {}).get('value', ''), 'abstract': submission.content.get('abstract', {}).get('value', '') } for submission in submissions ] entityB = { 'type': "Note", 'submissions': formatted_submissions } model_config = { 'name': model, 'normalizeScores': False } expertise_request = { "name": name, "entityA": entityA, "entityB": entityB, "model": model_config } if weight: expertise_request['dataset'] = { 'weightSpecification': weight } base_url = baseurl if baseurl else self.baseurl response = self.session.post(base_url + '/expertise', json = expertise_request, headers = self.headers) response = self.__handle_response(response) return response.json()
[docs] def request_user_subset_expertise(self, name, members, expertise_selection_id=None, venue_id=None, invitation=None, model='specter2+scincl', weight=None, baseurl=None): """ Call to the Expertise API to compute scores for a subset of users to papers. :param name: name of the job :type name: str :param members: list of profile IDs for which to compute scores :type members: list[str] :param expertise_selection_id: id of expertise selection invitation for members :type expertise_selection_id: str, optional :param venue_id: paper venue id used to retrieve papers, e.g. venue_id/Submission for active papers :type venue_id: str, optional :param invitation: invitation used to retrieve papers, e.g. venue_id/-/Submission :type invitation: str, optional :param model: model used to compute scores, e.g. "specter2+scincl" :type model: str, optional :param weight: list of dictionaries that specify weights for publications :type weight: list[dict], optional :param baseurl: URL to the host, example: https://api.openreview.net (should be replaced by 'host' name). If none is provided, it defaults to the environment variable `OPENREVIEW_API_BASEURL_V2` :type baseurl: str, optional :return: Dictionary containing the job id :rtype: dict """ # Check entity B params if bool(venue_id) == bool(invitation): raise OpenReviewException('Provide exactly one of the following: venue_id, invitation') # Build entityA from members entityA = { 'type': "Group", 'reviewerIds': members } if expertise_selection_id and tools.get_invitation(self, expertise_selection_id): expertise = { 'invitation': expertise_selection_id } entityA['expertise'] = expertise # Build entityB entityB = { 'type': "Note" } if venue_id: entityB['withVenueid'] = venue_id elif invitation: entityB['invitation'] = invitation model_config = { 'name': model, 'normalizeScores': False } expertise_request = { "name": name, "entityA": entityA, "entityB": entityB, "model": model_config } if weight: expertise_request['dataset'] = { 'weightSpecification': weight } base_url = baseurl if baseurl else self.baseurl response = self.session.post(base_url + '/expertise', json = expertise_request, headers = self.headers) response = self.__handle_response(response) return response.json()
def get_expertise_status(self, job_id=None, group_id=None, paper_id=None, baseurl=None): print('get expertise status', baseurl, job_id, group_id, paper_id) base_url = baseurl if baseurl else self.baseurl if base_url.startswith('http://localhost'): print('get expertise status localhost, return Completed') if job_id: return { 'status': 'Completed', 'jobId': job_id } return { 'results': [{ 'status': 'Completed', 'jobId': None }]} params = {} if job_id: params['jobId'] = job_id if group_id: params['entityA.memberOf'] = group_id if paper_id: params['entityB.id'] = paper_id response = self.session.get(base_url + '/expertise/status', params = params, headers = self.headers) response = self.__handle_response(response) response_json = response.json() print('get expertise status', response_json) return response_json def get_expertise_jobs(self, status=None, baseurl=None): print('get expertise jobs', baseurl, status) base_url = baseurl if baseurl else self.baseurl if base_url.startswith('http://localhost'): print('get expertise jobs localhost, return []') return { 'results': [] } params = {} if status: params['status'] = status response = self.session.get(base_url + '/expertise/status/all', params = params, headers = self.headers) response = self.__handle_response(response) response_json = response.json() print('get expertise jobs', response_json) return response_json def get_expertise_metadata(self, job_id, baseurl=None): print('get expertise metadata', baseurl, job_id) base_url = baseurl if baseurl else self.baseurl if base_url.startswith('http://localhost'): print('get expertise metadata localhost, return {}') return {} response = self.session.get(base_url + '/expertise/metadata', params = {'jobId': job_id}, headers = self.headers) response = self.__handle_response(response) response_json = response.json() print('get expertise metadata', response_json) return response_json def get_expertise_results(self, job_id, baseurl=None, wait_for_complete=False, format='json'): print('get expertise results', baseurl, job_id) base_url = baseurl if baseurl else self.baseurl call_max = 500 if base_url.startswith('http://localhost'): print('return expertise results localhost, return []') return iter([]) if format == 'csv' else { 'results': [] } if wait_for_complete: call_count = 0 status_response = self.get_expertise_status(job_id, baseurl=base_url) status = status_response.get('status') status_text = status if isinstance(status, str) else '' while 'Completed' != status_text and 'Error' not in status_text and call_count < call_max: time.sleep(60) status_response = self.get_expertise_status(job_id, baseurl=base_url) status = status_response.get('status') status_text = status if isinstance(status, str) else '' call_count += 1 if 'Completed' == status_text: return self.get_expertise_results(job_id, baseurl=base_url, format=format) if 'Error' in status_text: raise OpenReviewException('There was an error computing scores, description: ' + status_response.get('description')) if call_count == call_max: raise OpenReviewException('Time out computing scores, description: ' + status_response.get('description')) raise OpenReviewException('Unknown error, description: ' + status_response.get('description')) else: if format == 'csv': response = self.session.get(base_url + '/expertise/results', params = {'jobId': job_id, 'format': 'csv'}, headers = self.headers, stream = True) response = self.__handle_response(response) print('return expertise results', baseurl, job_id) def _iter_csv_results(response): try: yield from csv.DictReader(response.iter_lines(decode_unicode=True)) finally: response.close() return _iter_csv_results(response) response = self.session.get(base_url + '/expertise/results', params = {'jobId': job_id}, headers = self.headers) response = self.__handle_response(response) print('return expertise results', baseurl, job_id) return response.json()
[docs] class Edit(object): """ :param id: Edit id :type id: str :param readers: List of readers in the Edit, each reader is a Group id :type readers: list[str], optional :param writers: List of writers in the Edit, each writer is a Group id :type writers: list[str], optional :param signatures: List of signatures in the Edit, each signature is a Group id :type signatures: list[str], optional :param note: Template of the Note that will be created :type note: dict, optional :param invitation: Template of the Invitation that will be created :type invitation: dict, optional :param nonreaders: List of nonreaders in the Edit, each nonreader is a Group id :type nonreaders: list[str], optional :param cdate: Creation date :type cdate: int, optional :param ddate: Deletion date :type ddate: int, optional :param tcdate: True creation date :type tcdate: int, optional :param tmdate: Modification date :type tmdate: int, optional """ def __init__(self, id = None, domain = None, invitations = None, readers = None, writers = None, signatures = None, content = None, note = None, group = None, invitation = None, nonreaders = None, cdate = None, tcdate = None, tmdate = None, ddate = None, tauthor = None): self.id = id self.domain = domain self.invitations = invitations self.cdate = cdate self.tcdate = tcdate self.tmdate = tmdate self.ddate = ddate self.readers = readers self.nonreaders = nonreaders self.writers = writers self.signatures = signatures self.content = content self.note = note self.group = group self.invitation = invitation self.tauthor = tauthor def __repr__(self): content = ','.join([("%s = %r" % (attr, value)) for attr, value in vars(self).items()]) return 'Edit(' + content + ')' def __str__(self): pp = pprint.PrettyPrinter() return pp.pformat(vars(self))
[docs] def to_json(self): """ Converts Edit instance to a dictionary. The instance variable names are the keys and their values the values of the dictinary. :return: Dictionary containing all the parameters of a Edit instance :rtype: dict """ body = {} if (self.id): body['id'] = self.id if self.invitations: body['invitations'] = self.invitations if (self.readers): body['readers'] = self.readers if (self.nonreaders): body['nonreaders'] = self.nonreaders if (self.writers): body['writers'] = self.writers if (self.signatures): body['signatures'] = self.signatures if (self.content): body['content'] = self.content if (self.note): body['note'] = self.note.to_json() if (self.group): body['group'] = self.group.to_json() if isinstance(self.invitation, Invitation): body['invitation'] = self.invitation.to_json() if isinstance(self.invitation, str): body['invitation'] = self.invitation if (self.ddate): body['ddate'] = self.ddate return body
[docs] @classmethod def from_json(Edit,e): """ Creates an Edit object from a dictionary that contains keys values equivalent to the name of the instance variables of the Edit class :param i: Dictionary containing key-value pairs, where the keys values are equivalent to the name of the instance variables in the Edit class :type i: dict :return: Edit whose instance variables contain the values from the dictionary :rtype: Edit """ edit = Edit(e.get('id'), domain = e.get('domain'), invitations = e.get('invitations'), cdate = e.get('cdate'), tcdate = e.get('tcdate'), tmdate = e.get('tmdate'), ddate = e.get('ddate'), readers = e.get('readers'), nonreaders = e.get('nonreaders'), writers = e.get('writers'), signatures = e.get('signatures'), content = e.get('content'), note = Note.from_json(e['note']) if 'note' in e else None, group = Group.from_json(e['group']) if 'group' in e else None, invitation = e.get('invitation'), tauthor = e.get('tauthor') ) if isinstance(edit.invitation, dict): edit.invitation = Invitation.from_json(edit.invitation) return edit
[docs] class Note(object): """ TODO: write docs """ def __init__(self, invitations=None, parent_invitations=None, readers=None, writers=None, signatures=None, content=None, id=None, external_id=None, external_ids=None, number=None, cdate=None, pdate=None, odate=None, mdate=None, tcdate=None, tmdate=None, ddate=None, forum=None, replyto=None, nonreaders=None, domain=None, details = None, license=None): self.id = id self.external_id = external_id self.external_ids = external_ids self.number = number self.cdate = cdate self.pdate = pdate self.odate = odate self.mdate = mdate self.tcdate = tcdate self.tmdate = tmdate self.ddate = ddate self.content = content self.forum = forum self.replyto = replyto self.readers = readers self.nonreaders = nonreaders self.signatures = signatures self.writers = writers self.number = number self.details = details self.invitations = invitations self.parent_invitations = parent_invitations self.domain = domain self.license = license def __repr__(self): content = ','.join([("%s = %r" % (attr, value)) for attr, value in vars(self).items()]) return 'Note(' + content + ')' def __str__(self): pp = pprint.PrettyPrinter() return pp.pformat(vars(self)) @property def authors(self): """ Returns the authors as a canonical list of ``{'fullname', 'username'}`` dicts, regardless of whether the underlying content stores them as a list of objects (current schema) or as parallel ``authors``/``authorids`` arrays (legacy schema). """ if not self.content: return [] authors_value = self.content.get('authors', {}).get('value') or [] if authors_value and isinstance(authors_value[0], dict): return [{'fullname': a.get('fullname', ''), 'username': a.get('username', '')} for a in authors_value] authorids_value = self.content.get('authorids', {}).get('value') or [] return [ { 'fullname': authors_value[i] if i < len(authors_value) else '', 'username': authorids_value[i] if i < len(authorids_value) else '' } for i in range(max(len(authors_value), len(authorids_value))) ]
[docs] def to_json(self): """ Converts Note instance to a dictionary. The instance variable names are the keys and their values the values of the dictinary. :return: Dictionary containing all the parameters of a Note instance :rtype: dict """ body = { } if self.id: body['id'] = self.id if self.external_id: body['externalId'] = self.external_id if self.forum: body['forum'] = self.forum if self.replyto: body['replyto'] = self.replyto if self.content: body['content'] = self.content if self.invitations: body['invitations'] = self.invitations if self.parent_invitations: body['parentInvitations'] = self.parent_invitations if self.cdate: body['cdate'] = self.cdate if self.pdate: body['pdate'] = self.pdate if self.odate: body['odate'] = self.odate if self.mdate: body['mdate'] = self.mdate if self.ddate: body['ddate'] = self.ddate if self.nonreaders is not None: body['nonreaders'] = self.nonreaders if self.signatures: body['signatures'] = self.signatures if self.writers: body['writers'] = self.writers if self.readers: body['readers'] = self.readers if self.license: body['license'] = self.license return body
[docs] @classmethod def from_json(Note,n): """ Creates a Note object from a dictionary that contains keys values equivalent to the name of the instance variables of the Note class :param n: Dictionary containing key-value pairs, where the keys values are equivalent to the name of the instance variables in the Note class :type n: dict :return: Note whose instance variables contain the values from the dictionary :rtype: Note """ note = Note( id = n.get('id'), external_ids = n.get('externalIds'), number = n.get('number'), cdate = n.get('cdate'), mdate = n.get('mdate'), pdate = n.get('pdate'), odate = n.get('odate'), tcdate = n.get('tcdate'), tmdate =n.get('tmdate'), ddate=n.get('ddate'), content=n.get('content'), forum=n.get('forum'), invitations=n.get('invitations'), parent_invitations=n.get('parentInvitations'), replyto=n.get('replyto'), readers=n.get('readers'), nonreaders=n.get('nonreaders'), signatures=n.get('signatures'), writers=n.get('writers'), details=n.get('details'), domain=n.get('domain'), license=n.get('license') ) return note
[docs] class Invitation(object): """ """ def __init__(self, id = None, invitations = None, parent_invitations = None, domain = None, readers = None, writers = None, invitees = None, signatures = None, edit = None, edge = None, tag = None, message = None, type = 'Note', noninvitees = None, nonreaders = None, web = None, process = None, preprocess = None, date_processes = None, post_processes = None, duedate = None, expdate = None, cdate = None, ddate = None, tcdate = None, tmdate = None, minReplies = None, maxReplies = None, bulk = None, content = None, reply_forum_views = [], responseArchiveDate = None, details = None, description = None, instructions = None, guestPosting = None, secret = None, humanVerificationRequired = None): self.id = id self.invitations = invitations self.parent_invitations = parent_invitations self.domain = domain self.cdate = cdate self.ddate = ddate self.duedate = duedate self.expdate = expdate self.readers = readers self.nonreaders = nonreaders self.writers = writers self.invitees = invitees self.noninvitees = noninvitees self.signatures = signatures self.minReplies = minReplies self.maxReplies = maxReplies self.edit = edit self.edge = edge self.tag = tag self.message = message self.type = type self.tcdate = tcdate self.tmdate = tmdate self.bulk = bulk self.details = details self.reply_forum_views = reply_forum_views self.responseArchiveDate = responseArchiveDate self.web = web self.process = process self.preprocess = preprocess self.date_processes = date_processes self.post_processes = post_processes self.content = content self.description = description self.instructions = instructions self.guestPosting = guestPosting self.secret = secret self.humanVerificationRequired = humanVerificationRequired def __repr__(self): content = ','.join([("%s = %r" % (attr, value)) for attr, value in vars(self).items()]) return 'Invitation(' + content + ')' def __str__(self): pp = pprint.PrettyPrinter() return pp.pformat(vars(self)) def is_active(self): now = tools.datetime_millis(datetime.datetime.now()) cdate = self.cdate if self.cdate else now edate = self.expdate if self.expdate else now return cdate <= now and now <= edate def get_content_value(self, field_name, default_value=None): if self.content: return self.content.get(field_name, {}).get('value', default_value) return default_value def pretty_id(self): tokens = self.id.split('/')[-2:] filtered_tokens = [] for token in tokens: token = token.replace('_', ' ').strip() if token.startswith('~'): token = tools.pretty_id(token) if token != '-': filtered_tokens.append(token) return (' ').join(filtered_tokens)
[docs] def to_json(self): """ Converts Invitation instance to a dictionary. The instance variable names are the keys and their values the values of the dictinary. :return: Dictionary containing all the parameters of a Invitation instance :rtype: dict """ body = {} if self.id: body['id'] = self.id if self.parent_invitations: body['parentInvitations'] = self.parent_invitations if self.cdate: body['cdate'] = self.cdate if self.ddate: body['ddate'] = self.ddate if self.duedate: body['duedate'] = self.duedate if self.expdate: body['expdate'] = self.expdate if self.readers: body['readers'] = self.readers if self.nonreaders: body['nonreaders'] = self.nonreaders if self.writers: body['writers'] = self.writers if self.invitees: body['invitees'] = self.invitees if self.noninvitees: body['noninvitees'] = self.noninvitees if self.signatures: body['signatures'] = self.signatures if self.reply_forum_views: body['replyForumViews'] = self.reply_forum_views if self.content: body['content'] = self.content if self.responseArchiveDate: body['responseArchiveDate'] = self.responseArchiveDate if self.description: body['description'] = self.description if self.instructions: body['instructions'] = self.instructions if self.minReplies: body['minReplies']=self.minReplies if self.maxReplies: body['maxReplies']=self.maxReplies if self.web: body['web']=self.web if self.process: body['process']=self.process if self.preprocess: body['preprocess']=self.preprocess if self.date_processes: body['dateprocesses']=self.date_processes if self.post_processes: body['postprocesses']=self.post_processes if self.edit is not None: if self.type == 'Note': body['edit']=self.edit if self.type == 'Edge': body['edge']=self.edit if self.type == 'Tag': body['tag']=self.edit if self.edge: body['edge']=self.edge if self.tag: body['tag']=self.tag if self.message: body['message']=self.message if self.bulk is not None: body['bulk']=self.bulk if self.guestPosting is not None: body['guestPosting']=self.guestPosting if self.secret is not None: body['secret']=self.secret if self.humanVerificationRequired is not None: body['humanVerificationRequired']=self.humanVerificationRequired return body
[docs] @classmethod def from_json(Invitation,i): """ Creates an Invitation object from a dictionary that contains keys values equivalent to the name of the instance variables of the Invitation class :param i: Dictionary containing key-value pairs, where the keys values are equivalent to the name of the instance variables in the Invitation class :type i: dict :return: Invitation whose instance variables contain the values from the dictionary :rtype: Invitation """ invitation = Invitation(i['id'], invitations = i.get('invitations'), parent_invitations = i.get('parentInvitations'), domain = i.get('domain'), cdate = i.get('cdate'), ddate = i.get('ddate'), tcdate = i.get('tcdate'), tmdate = i.get('tmdate'), duedate = i.get('duedate'), expdate = i.get('expdate'), readers = i.get('readers'), nonreaders = i.get('nonreaders'), writers = i.get('writers'), invitees = i.get('invitees'), noninvitees = i.get('noninvitees'), signatures = i.get('signatures'), minReplies = i.get('minReplies'), edit = i.get('edit'), maxReplies = i.get('maxReplies'), details = i.get('details'), reply_forum_views = i.get('replyForumViews'), responseArchiveDate = i.get('responseArchiveDate'), description = i.get('description'), instructions = i.get('instructions'), bulk = i.get('bulk') ) if 'content' in i: invitation.content = i['content'] if 'web' in i: invitation.web = i['web'] if 'process' in i: invitation.process = i['process'] if 'transform' in i: invitation.transform = i['transform'] if 'preprocess' in i: invitation.preprocess = i['preprocess'] if 'dateprocesses' in i: invitation.date_processes = i['dateprocesses'] if 'postprocesses' in i: invitation.post_processes = i['postprocesses'] if 'edge' in i: invitation.edit = i['edge'] invitation.type = 'Edge' if 'tag' in i: invitation.edit = i['tag'] invitation.type = 'Tag' if 'message' in i: invitation.message = i['message'] invitation.type = 'Message' if 'guestPosting' in i: invitation.guestPosting = i['guestPosting'] if 'secret' in i: invitation.secret = i['secret'] if 'humanVerificationRequired' in i: invitation.humanVerificationRequired = i['humanVerificationRequired'] return invitation
class Edge(object): def __init__(self, head, tail, invitation, domain=None, readers=None, writers=None, signatures=None, id=None, weight=None, label=None, cdate=None, ddate=None, nonreaders=None, tcdate=None, tmdate=None, tddate=None, tauthor=None): self.id = id self.invitation = invitation self.domain = domain self.head = head self.tail = tail self.weight = weight self.label = label self.cdate = cdate self.ddate = ddate self.readers = readers self.nonreaders = nonreaders self.writers = writers self.signatures = signatures self.tcdate = tcdate self.tmdate = tmdate self.tddate = tddate self.tauthor = tauthor def to_json(self): ''' Returns serialized json string for a given object ''' body = { 'invitation': self.invitation, 'head': self.head, 'tail': self.tail } if self.id: body['id'] = self.id if self.ddate: body['ddate'] = self.ddate if self.readers is not None: body['readers'] = self.readers if self.writers is not None: body['writers'] = self.writers if self.nonreaders is not None: body['nonreaders'] = self.nonreaders if self.signatures is not None: body['signatures'] = self.signatures if self.weight is not None: body['weight'] = self.weight if self.label is not None: body['label'] = self.label if self.cdate is not None: body['cdate'] = self.cdate return body @classmethod def from_json(Edge, e): ''' Returns a deserialized object from a json string :arg t: The json string consisting of a serialized object of type "Edge" ''' edge = Edge( id = e.get('id'), domain = e.get('domain'), cdate = e.get('cdate'), tcdate = e.get('tcdate'), tmdate = e.get('tmdate'), ddate = e.get('ddate'), tddate = e.get('tddate'), invitation = e.get('invitation'), readers = e.get('readers'), nonreaders = e.get('nonreaders'), writers = e.get('writers'), signatures = e.get('signatures'), head = e.get('head'), tail = e.get('tail'), weight = e.get('weight'), label = e.get('label'), tauthor=e.get('tauthor') ) return edge def __repr__(self): content = ','.join([("%s = %r" % (attr, value)) for attr, value in vars(self).items()]) return 'Edge(' + content + ')' def __str__(self): pp = pprint.PrettyPrinter() return pp.pformat(vars(self))
[docs] class Group(object): """ When a user is created, it is automatically assigned to certain groups that give him different privileges. A username is also a group, therefore, groups can be members of other groups. :param id: id of the Group :type id: str :param readers: List of readers in the Group, each reader is a Group id :type readers: list[str] :param writers: List of writers in the Group, each writer is a Group id :type writers: list[str] :param signatories: List of signatories in the Group, each writer is a Group id :type signatories: list[str] :param signatures: List of signatures in the Group, each signature is a Group id :type signatures: list[str] :param cdate: Creation date of the Group :type cdate: int, optional :param ddate: Deletion date of the Group :type ddate: int, optional :param tcdate: true creation date of the Group :type tcdate: int, optional :param tmdate: true modification date of the Group :type tmdate: int, optional :param members: List of members in the Group, each member is a Group id :type members: list[str], optional :param nonreaders: List of nonreaders in the Group, each nonreader is a Group id :type nonreaders: list[str], optional :param web: Path to a file that contains the webfield :type web: optional :param web_string: String containing the webfield for this Group :type web_string: str, optional :param details: :type details: optional """ def __init__(self, id=None, content=None, readers=None, writers=None, signatories=None, signatures=None, invitation=None, invitations=None, parent_invitations=None, cdate = None, ddate = None, tcdate=None, tmdate=None, members = None, nonreaders = None, impersonators=None, web = None, anonids= None, deanonymizers=None, host=None, domain=None, parent = None, details = None, description = None): # post attributes self.id=id self.invitation=invitation self.invitations = invitations self.parent_invitations = parent_invitations self.content = content self.cdate = cdate self.ddate = ddate self.tcdate = tcdate self.tmdate = tmdate self.writers = writers self.members = members self.readers = readers self.nonreaders = nonreaders self.signatures = signatures self.signatories = signatories self.anonids = anonids self.web=web self.impersonators = impersonators self.host = host self.domain = domain self.parent = parent self.description = description self.anonids = anonids self.deanonymizers = deanonymizers self.details = details self.anon_members = [] def get_content_value(self, field_name, default_value=None): if self.content: return self.content.get(field_name, {}).get('value', default_value) return default_value def __repr__(self): content = ','.join([("%s = %r" % (attr, value)) for attr, value in vars(self).items()]) return 'Group(' + content + ')' def __str__(self): pp = pprint.PrettyPrinter() return pp.pformat(vars(self))
[docs] def to_json(self): """ Converts Group instance to a dictionary. The instance variable names are the keys and their values the values of the dictinary. :return: Dictionary containing all the parameters of a Group instance :rtype: dict """ body = {} if self.id is not None: body['id'] = self.id if self.content is not None: body['content'] = self.content if self.signatures is not None: body['signatures'] = self.signatures if self.writers is not None: body['writers'] = self.writers if self.members is not None: body['members'] = self.members if self.readers is not None: body['readers'] = self.readers if self.invitation is not None: body['invitation'] = self.invitation if self.parent_invitations is not None: body['parentInvitations'] = self.parent_invitations if self.cdate is not None: body['cdate'] = self.cdate if self.ddate is not None: body['ddate'] = self.ddate if self.host is not None: body['host'] = self.host if self.impersonators is not None: body['impersonators'] = self.impersonators if self.anonids is not None: body['anonids'] = self.anonids if self.deanonymizers is not None: body['deanonymizers'] = self.deanonymizers if self.web is not None: body['web'] = self.web if self.nonreaders is not None: body['nonreaders'] = self.nonreaders if self.signatories is not None: body['signatories'] = self.signatories if self.description is not None: body['description'] = self.description return body
[docs] @classmethod def from_json(Group,g): """ Creates a Group object from a dictionary that contains keys values equivalent to the name of the instance variables of the Group class :param g: Dictionary containing key-value pairs, where the keys values are equivalent to the name of the instance variables in the Group class :type g: dict :return: Group whose instance variables contain the values from the dictionary :rtype: Group """ group = Group(g['id'], content=g.get('content'), invitation=g.get('invitation'), invitations=g.get('invitations'), parent_invitations=g.get('parentInvitations'), cdate = g.get('cdate'), ddate = g.get('ddate'), tcdate = g.get('tcdate'), tmdate = g.get('tmdate'), writers = g.get('writers'), members = g.get('members'), readers = g.get('readers'), nonreaders = g.get('nonreaders'), signatories = g.get('signatories'), signatures = g.get('signatures'), anonids=g.get('anonids'), deanonymizers=g.get('deanonymizers'), impersonators=g.get('impersonators'), host=g.get('host'), web=g.get('web'), domain=g.get('domain'), parent=g.get('parent'), details = g.get('details'), description = g.get('description')) return group
[docs] def add_member(self, member): """ Adds a member to the group. This is done only on the object not in OpenReview. Another method like :meth:`~openreview.Group.post` is needed for the change to show in OpenReview :param member: Member to add to the group :type member: str :return: Group with the new member added :rtype: Group """ if type(member) is Group: self.members.append(member.id) else: self.members.append(str(member)) return self
[docs] def remove_member(self, member): """ Removes a member from the group. This is done only on the object not in OpenReview. Another method like :meth:`~openreview.Group.post` is needed for the change to show in OpenReview :param member: Member to remove from the group :type member: str :return: Group after the member was removed :rtype: Group """ if type(member) is Group: try: self.members.remove(member.id) except(ValueError): pass else: try: self.members.remove(str(member)) except(ValueError): pass return self
[docs] def add_webfield(self, web): """ Adds a webfield to the group :param web: Path to the file that contains the webfield :type web: str """ with open(web) as f: self.web = f.read()
[docs] def post(self, client): """ Posts a group to OpenReview :param client: Client that will post the Group :type client: Client """ client.post_group(self)
def transform_to_anon_ids(self, elements): if self.anonids: for index, element in enumerate(elements): if element in self.members and self.anon_members: elements[index] = self.anon_members[self.members.index(element)] else: elements[index] = element return elements
class Tag(object): """ :param tag: Content of the tag :type tag: str :param invitation: Invitation id :type invitation: str :param readers: List of readers in the Invitation, each reader is a Group id :type readers: list[str] :param signature: Signature in the Invitation, signature is a Group id :type signature: str :param id: Tag id :type id: str, optional :param cdate: Creation date :type cdate: int, optional :param tcdate: True creation date :type tcdate: int, optional :param ddate: Deletion date :type ddate: int, optional :param forum: Forum id :type forum: str, optional :param nonreaders: List of nonreaders in the Invitation, each nonreader is a Group id :type nonreaders: list[str], optional """ def __init__(self, invitation, signature=None, tag=None, readers=None, writers=None, id=None, parent_invitations=None, cdate=None, tcdate=None, tmdate=None, ddate=None, forum=None, nonreaders=None, profile=None, weight=None, label=None, note=None): self.id = id self.cdate = cdate self.tcdate = tcdate self.tmdate = tmdate self.ddate = ddate self.tag = tag self.parent_invitations = parent_invitations self.forum = forum self.invitation = invitation self.readers = readers self.writers = writers self.nonreaders = [] if nonreaders is None else nonreaders self.signature = signature self.profile = profile self.weight = weight self.label = label self.note = note def to_json(self): """ Converts Tag instance to a dictionary. The instance variable names are the keys and their values the values of the dictinary. :return: Dictionary containing all the parameters of a Tag instance :rtype: dict """ body = {} if self.id: body['id'] = self.id if self.ddate: body['ddate'] = self.ddate if self.tag: body['tag'] = self.tag if self.parent_invitations: body['parentInvitations'] = self.parent_invitations if self.forum: body['forum'] = self.forum if self.invitation: body['invitation'] = self.invitation if self.readers: body['readers'] = self.readers if self.writers: body['writers'] = self.writers if self.nonreaders: body['nonreaders'] = self.nonreaders if self.signature: body['signature'] = self.signature if self.profile: body['profile'] = self.profile if self.weight is not None: body['weight'] = self.weight if self.label: body['label'] = self.label if self.note: body['note'] = self.note if self.cdate: body['cdate'] = self.cdate return body @classmethod def from_json(Tag, t): """ Creates a Tag object from a dictionary that contains keys values equivalent to the name of the instance variables of the Tag class :param n: Dictionary containing key-value pairs, where the keys values are equivalent to the name of the instance variables in the Tag class :type n: dict :return: Tag whose instance variables contain the values from the dictionary :rtype: Tag """ tag = Tag( id = t.get('id'), cdate = t.get('cdate'), tcdate = t.get('tcdate'), tmdate = t.get('tmdate'), ddate = t.get('ddate'), tag = t.get('tag'), parent_invitations = t.get('parentInvitations'), forum = t.get('forum'), invitation = t.get('invitation'), readers = t.get('readers'), writers = t.get('writers'), nonreaders = t.get('nonreaders'), signature = t.get('signature'), profile = t.get('profile'), weight = t.get('weight'), label = t.get('label'), note = t.get('note') ) return tag def __repr__(self): content = ','.join([("%s = %r" % (attr, value)) for attr, value in vars(self).items()]) return 'Tag(' + content + ')' def __str__(self): pp = pprint.PrettyPrinter() return pp.pformat(vars(self))