#!/usr/bin/python
from __future__ import absolute_import, division, print_function, unicode_literals
import datetime
import sys
if sys.version_info[0] < 3:
string_types = [str, unicode]
else:
string_types = [str]
from importlib.metadata import version as get_package_version, PackageNotFoundError
from .. import tools
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import pprint
import os
import re
import time
import jwt
import json
import csv
from ..openreview import Profile
from ..openreview import OpenReviewException
from ..openreview import MfaRequiredException
from .. import tools
from .. import mfa
class LogRetry(Retry):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def increment(self, method=None, url=None, response=None, error=None, _pool=None, _stacktrace=None):
# Log retry information before calling the parent class method
response_string = 'no response'
if response:
if 'application/json' in response.headers.get('Content-Type', ''):
response_string = json.loads(response.data.decode('utf-8'))
elif response.data:
response_string = response.data
else:
response_string = response.reason
print(f"Retrying request: {method} {url}, response: {response_string}, error: {error}")
# Call the parent class method to perform the actual retry increment
return super().increment(method=method, url=url, response=response, error=error, _pool=_pool, _stacktrace=_stacktrace)
[docs]
class OpenReviewClient(object):
"""
:param baseurl: URL to the host, example: https://api.openreview.net (should be replaced by 'host' name). If none is provided, it defaults to the environment variable `OPENREVIEW_API_BASEURL_V2`
:type baseurl: str, optional
:param username: OpenReview username. If none is provided, it defaults to the environment variable `OPENREVIEW_USERNAME`
:type username: str, optional
:param password: OpenReview password. If none is provided, it defaults to the environment variable `OPENREVIEW_PASSWORD`
:type password: str, optional
:param token: Session token. This token can be provided instead of the username and password if the user had already logged in
:type token: str, optional
:param expiresIn: Time in seconds before the token expires. If none is set the value will be set automatically to one hour. The max value that it can be set to is 1 week.
:type expiresIn: number, optional
"""
def __init__(self, baseurl = None, username = None, password = None, token= None, tokenExpiresIn=None):
self.baseurl = baseurl if baseurl is not None else os.environ.get('OPENREVIEW_API_BASEURL_V2', 'http://localhost:3001')
if any(url in self.baseurl for url in tools.V1_REMOTE_URLS):
correct_baseurl = tools.get_base_urls(self)[1]
raise OpenReviewException(f'Please use "{correct_baseurl}" as the baseurl for the OpenReview API or use the old client openreview.Client')
self.groups_url = self.baseurl + '/groups'
self.login_url = self.baseurl + '/login'
self.register_url = self.baseurl + '/register'
self.alternate_confirm_url = self.baseurl + '/user/confirm'
self.invitations_url = self.baseurl + '/invitations'
self.mail_url = self.baseurl + '/mail'
self.notes_url = self.baseurl + '/notes'
self.tags_url = self.baseurl + '/tags'
self.bulk_tags_url = self.baseurl + '/tags/bulk'
self.tags_rename = self.baseurl + '/tags/rename'
self.edges_url = self.baseurl + '/edges'
self.bulk_edges_url = self.baseurl + '/edges/bulk'
self.edges_count_url = self.baseurl + '/edges/count'
self.edges_rename = self.baseurl + '/edges/rename'
self.edges_archive = self.baseurl + '/edges/archive'
self.profiles_url = self.baseurl + '/profiles'
self.profiles_search_url = self.baseurl + '/profiles/search'
self.profiles_merge_url = self.baseurl + '/profiles/merge'
self.profiles_rename = self.baseurl + '/profiles/rename'
self.relation_readers_url = self.baseurl + '/settings/relationReaders'
self.profiles_moderate = self.baseurl + '/profile/moderate'
self.reference_url = self.baseurl + '/references'
self.tilde_url = self.baseurl + '/tildeusername'
self.pdf_url = self.baseurl + '/pdf'
self.pdf_revisions_url = self.baseurl + '/references/pdf'
self.messages_url = self.baseurl + '/messages'
self.messages_requests_url = self.baseurl + '/messages/requests'
self.messages_direct_url = self.baseurl + '/messages/direct'
self.process_logs_url = self.baseurl + '/logs/process'
self.institutions_url = self.baseurl + '/settings/institutions'
self.jobs_status = self.baseurl + '/jobs/status'
self.venues_url = self.baseurl + '/venues'
self.note_edits_url = self.baseurl + '/notes/edits'
self.invitation_edits_url = self.baseurl + '/invitations/edits'
self.group_edits_url = self.baseurl + '/groups/edits'
self.activatelink_url = self.baseurl + '/activatelink'
self.domains_rename = self.baseurl + '/domains/rename'
self.domains_restriction = self.baseurl + '/domains/restriction'
self.groups_members_cache_url = self.baseurl + '/groups/members/cache'
self.mfa_challenge_url = self.baseurl + '/mfa/challenge'
self.mfa_verify_url = self.baseurl + '/mfa/verify'
# Build User-Agent string: openreview-py/{package_version} (Python/{python_version})
try:
package_version = get_package_version('openreview-py')
except PackageNotFoundError:
package_version = 'unknown'
python_version = f"{sys.version_info.major}.{sys.version_info.minor}"
self.user_agent = f"openreview-py/{package_version} (Python/{python_version})"
self.limit = 1000
self.token = token.replace('Bearer ', '') if token else None
self.profile = None
self.headers = {
'User-Agent': self.user_agent,
'Accept': 'application/json'
}
retry_strategy = LogRetry(total=3, backoff_factor=1, status_forcelist=[ 500, 502, 503, 504 ], respect_retry_after_header=True)
self.session = requests.Session()
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount('https://', adapter)
self.session.mount('http://', adapter)
if self.token:
self.headers['Authorization'] = 'Bearer ' + self.token
try:
payload = jwt.decode(self.token, options={"verify_signature": False})
self.user = payload.get('user', payload)
user_id = self.user.get('profile', {}).get('id') or self.user.get('id')
self.profile = self.get_profile(user_id) if user_id else None
except:
self.profile = None
else:
if not username:
username = os.environ.get('OPENREVIEW_USERNAME')
if not password:
password = os.environ.get('OPENREVIEW_PASSWORD')
if username or password:
self.login_user(username, password, expiresIn=tokenExpiresIn)
## PRIVATE FUNCTIONS
def __handle_authorization(self, response):
self.token = str(response['token'])
self.profile = Profile( id = response['user']['profile']['id'] )
self.headers['Authorization'] ='Bearer ' + self.token
# self.user = jwt.decode(self.token, options={"verify_signature": False})
self.user = response['user']
return response
def __handle_response(self,response):
try:
response.raise_for_status()
return response
except requests.exceptions.HTTPError as e:
if 'application/json' in response.headers.get('Content-Type', ''):
error = response.json()
elif response.text:
error = {
'name': 'Error',
'message': response.text
}
else:
error = {
'name': 'Error',
'message': response.reason
}
raise OpenReviewException(error)
def __request_mfa_challenge(self, mfa_pending_token, method):
"""Trigger MFA challenge (e.g., send email OTP)."""
payload = {'mfaPendingToken': mfa_pending_token, 'method': method}
response = self.session.post(self.mfa_challenge_url, headers=self.headers, json=payload)
response = self.__handle_response(response)
return response.json()
def __verify_mfa(self, mfa_pending_token, method, code):
"""Verify MFA code and complete login."""
payload = {'mfaPendingToken': mfa_pending_token, 'method': method, 'code': code}
response = self.session.post(self.mfa_verify_url, headers=self.headers, json=payload)
response = self.__handle_response(response)
return response.json()
def __resolve_mfa(self, mfa_pending_token, mfa_methods, preferred_method):
"""Resolve MFA via interactive prompt."""
supported = [m for m in mfa_methods if m in ('totp', 'emailOtp', 'passkey')]
if not supported:
raise OpenReviewException({
'name': 'MfaError',
'message': f'No supported MFA methods. Server offered: {", ".join(mfa_methods)}'
})
if not mfa._is_interactive():
raise MfaRequiredException(mfa_pending_token, mfa_methods, preferred_method)
method = mfa._default_mfa_method_chooser(mfa_methods, preferred_method)
if not method:
raise MfaRequiredException(mfa_pending_token, mfa_methods, preferred_method)
if method == 'passkey':
return self.__resolve_passkey(mfa_pending_token)
if method == 'emailOtp':
self.__request_mfa_challenge(mfa_pending_token, 'emailOtp')
print('A verification code has been sent to your email.')
code = mfa._default_mfa_code_prompt(method)
if not code:
raise MfaRequiredException(mfa_pending_token, mfa_methods, preferred_method)
return self.__verify_mfa(mfa_pending_token, method, code)
def __resolve_passkey(self, mfa_pending_token):
"""Handle passkey authentication via browser flow."""
result = mfa._passkey_browser_flow(self, mfa_pending_token)
if result and result.get('token'):
return result
raise OpenReviewException({
'name': 'MfaError',
'message': 'Passkey authentication failed or timed out.'
})
def __await_process(self, edit_id):
process_logs = self.get_process_logs(id=edit_id)
if not process_logs:
return ## no process function found
for i in range(1200): # 1200 × 0.5s = 10 minutes
if process_logs[0]['status'] == 'ok':
return
elif process_logs[0]['status'] == 'error':
raise OpenReviewException(process_logs[0].get('log', 'No log available'))
time.sleep(0.5)
process_logs = self.get_process_logs(id=edit_id)
raise OpenReviewException("Process timed out")
def get_invitation_date_process_job(self, job_id):
response = self.session.get(self.baseurl + '/jobs/queues/pyDateProcessQueueMQ/' + job_id.replace('/', '%2F'), params = {}, headers = self.headers)
response = self.__handle_response(response)
return response.json()
def reschedule_date_process_jobs(self, invitation_id):
response = self.session.post(self.baseurl + '/invitations/dateprocesses', json = { 'ids': [invitation_id]}, headers = self.headers)
response = self.__handle_response(response)
return response.json()
## PUBLIC FUNCTIONS
[docs]
def impersonate(self, group_id):
"""Impersonate a group by obtaining a new authentication token scoped to the given group.
Replaces the current client session token with a token that authorizes
requests as the specified group. The client's profile and authorization
headers are updated in place.
:param group_id: ID of the group to impersonate (e.g., a venue ID such as ``ICML.cc/2024/Conference``).
:type group_id: str
:return: Dictionary containing the new authentication token and user information.
:rtype: dict
"""
response = self.session.post(self.baseurl + '/impersonate', json={ 'groupId': group_id }, headers=self.headers)
response = self.__handle_response(response)
json_response = response.json()
self.__handle_authorization(json_response)
return json_response
[docs]
def login_user(self,username=None, password=None, expiresIn=None):
"""
Logs in a registered user. If MFA is enabled for the account, this method
will attempt to complete MFA verification automatically using the configured
an interactive terminal prompt.
:param username: OpenReview username
:type username: str, optional
:param password: OpenReview password
:type password: str, optional
:return: Dictionary containing user information and the authentication token
:rtype: dict
"""
user = { 'id': username, 'password': password, 'expiresIn': expiresIn }
response = self.session.post(self.login_url, headers=self.headers, json=user)
response = self.__handle_response(response)
json_response = response.json()
if json_response.get('mfaPending'):
json_response = self.__resolve_mfa(
json_response['mfaPendingToken'],
json_response['mfaMethods'],
json_response.get('preferredMethod')
)
self.__handle_authorization(json_response)
return json_response
[docs]
def register_user(self, email = None, fullname = None, password = None):
"""
Registers a new user
:param email: email that will be used as id to log in after the user is registered
:type email: str, optional
:param fullname: Full name of the user
:type fullname: str, optional
:param password: Password used to log into OpenReview
:type password: str, optional
:return: Dictionary containing the new user information including his id, username, email(s), readers, writers, etc.
:rtype: dict
"""
register_payload = {
'email': email,
'fullname': fullname,
'password': password
}
response = self.session.post(self.register_url, json = register_payload, headers = self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def activate_user(self, token, content):
"""
Activates a newly registered user
:param token: Activation token. If running in localhost, use email as token
:type token: str
:param content: Content of the profile to activate
:type content: dict
:return: Dictionary containing user information and the authentication token
:rtype: dict
Example:
>>> res = client.activate_user('new@user.com', {
'names': [
{
'first': 'New',
'last': 'User',
'username': '~New_User1'
}
],
'emails': ['new@user.com'],
'preferredEmail': 'new@user.com'
})
"""
response = self.session.put(self.baseurl + '/activate/' + token, json = { 'content': content }, headers = self.headers)
response = self.__handle_response(response)
json_response = response.json()
self.__handle_authorization(json_response)
return json_response
[docs]
def confirm_alternate_email(self, profile_id, alternate_email, activation_token=None):
"""
Confirms an alternate email address
:param profile_id: id of the profile
:type profile_id: str
:param alternate_email: email address to confirm
:type alternate_email: str
:return: Dictionary containing the profile information
:rtype: dict
"""
response = self.session.post(self.alternate_confirm_url + (f'/{activation_token}' if activation_token else ''), json = { 'username': profile_id, 'alternate': alternate_email }, headers = self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def activate_email_with_token(self, email, token, activation_token=None):
"""
Activates an email address
:param email: email address to activate
:type email: str
:param token: token to activate the email
:type token: str
:return: Dictionary containing the profile information
:rtype: dict
"""
response = self.session.put(self.activatelink_url + (f'/{activation_token}' if activation_token else ''), json = { 'email': email, 'token': token }, headers = self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def post_note_edit_as_guest(self, token, edit):
"""Post a note edit as a guest user using a guest token.
Submits a note edit without requiring a logged-in session. The guest
token is sent via the ``X-Guest-Token`` header instead of the standard
``Authorization`` header.
:param token: Guest authentication token (e.g., provided via an invitation link).
:type token: str
:param edit: Dictionary representing the note edit to post, following the same schema as :meth:`post_note_edit`.
:type edit: dict
:return: Dictionary containing the posted edit, including the assigned edit ``id``.
:rtype: dict
"""
headers = {
'User-Agent': self.user_agent,
'Accept': 'application/json',
'X-Guest-Token': token
}
response = self.session.post(self.note_edits_url, json = edit, headers = headers)
response = self.__handle_response(response)
return response.json()
[docs]
def flush_members_cache(self, group_id=None):
"""
Flushes the members cache for a group
:param group_id: id of the group to flush the cache for
:type group_id: str, optional
:return: Dictionary containing the status of the request
:rtype: dict
"""
if not group_id:
return
if '/' in group_id:
group_id = group_id.replace('/', '%2F')
response = self.session.delete(self.groups_members_cache_url + '/' + group_id, params= {}, headers=self.headers)
response = self.__handle_response(response)
return response.json()
def get_activatable(self, token = None):
response = self.session.get(self.baseurl + '/activatable/' + token, params = {}, headers = self.headers)
response = self.__handle_response(response)
self.__handle_authorization(response.json()['activatable'])
return self.token
[docs]
def get_institutions(self, id=None, domain=None):
"""
Get a single Institution by id or domain if available
:param id: id of the Institution as saved in the database
:type id: str
:param domain: domain of the Institution
:type domain: str
:return: Dictionary with the Institution information
:rtype: dict
Example:
>>> institution = client.get_institutions(domain='umass.edu')
"""
params = {}
if id:
params['id'] = id
if domain:
params['domain'] = domain
response = self.session.get(self.institutions_url, params = tools.format_params(params), headers = self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def get_group(self, id, details=None):
"""
Get a single Group by id if available
:param id: id of the group
:type id: str
:return: Dictionary with the group information
:rtype: Group
Example:
>>> group = client.get_group('your-email@domain.com')
"""
response = self.session.get(self.groups_url, params = {'id':id, 'details': details}, headers = self.headers)
response = self.__handle_response(response)
g = response.json()['groups'][0]
group = Group.from_json(g)
if group.anonids:
anon_prefix = (group.id[:-1] if group.id.endswith('s') else group.id) + '_'
members_by_anonid = { g.id:g.members[0] for g in self.get_groups(prefix=anon_prefix) if g.members }
members = []
anon_members = []
for member in group.members:
if member in members_by_anonid:
anon_members.append(member)
members.append(members_by_anonid[member])
else:
members.append(member)
group.anon_members = anon_members
group.members = members
return group
[docs]
def get_invitation(self, id):
"""
Get a single invitation by id if available
:param id: id of the invitation
:type id: str
:return: Invitation matching the passed id
:rtype: Invitation
"""
response = self.session.get(self.invitations_url, params = {'id': id}, headers = self.headers)
response = self.__handle_response(response)
i = response.json()['invitations'][0]
return Invitation.from_json(i)
[docs]
def get_note(self, id, details=None):
"""
Get a single Note by id if available
:param id: id of the note
:type id: str
:return: Note matching the passed id
:rtype: Note
"""
response = self.session.get(self.notes_url, params = {'id':id, 'details': details}, headers = self.headers)
response = self.__handle_response(response)
n = response.json()['notes'][0]
return Note.from_json(n)
[docs]
def get_tag(self, id):
"""
Get a single Tag by id if available
:param id: id of the Tag
:type id: str
:return: Tag with the Tag information
:rtype: Tag
"""
response = self.session.get(self.tags_url, params = {'id': id}, headers = self.headers)
response = self.__handle_response(response)
t = response.json()['tags'][0]
return Tag.from_json(t)
[docs]
def get_edge(self, id, trash=False):
"""
Get a single Edge by id if available
:param id: id of the Edge
:type id: str
return: Edge object with its information
:rtype: Edge
"""
response = self.session.get(self.edges_url, params = {'id': id, 'trash': 'true' if trash == True else 'false'}, headers=self.headers)
response = self.__handle_response(response)
edges = response.json()['edges']
if edges:
return Edge.from_json(edges[0])
else:
raise OpenReviewException('Edge not found')
[docs]
def get_profile(self, email_or_id = None):
"""
Get a single Profile by id, if available
:param email_or_id: e-mail or id of the profile
:type email_or_id: str, optional
:return: Profile object with its information
:rtype: Profile
"""
params = {}
if email_or_id:
tildematch = re.compile('~.+')
if tildematch.match(email_or_id):
att = 'id'
else:
att = 'email'
email_or_id = email_or_id.lower()
params[att] = email_or_id
response = self.session.get(self.profiles_url, params=tools.format_params(params), headers = self.headers)
response = self.__handle_response(response)
profiles = response.json()['profiles']
if profiles:
return Profile.from_json(profiles[0])
else:
raise OpenReviewException(['Profile Not Found'])
[docs]
def get_profiles(self, id=None, trash=None, with_blocked=None, state=None, offset=None, limit=None, sort=None):
"""
Get a list of Profiles
:param trash: Indicates if the returned profiles are trashed
:type trash: bool, optional
:param with_blocked: Indicates if the returned profiles are blocked
:type with_blocked: bool, optional
:param state: Filter profiles by state (e.g. 'Needs Moderation', 'Active', 'Rejected')
:type state: str, optional
:param offset: Indicates the position to start retrieving Profiles
:type offset: int, optional
:param limit: Maximum amount of Profiles that this method will return
:type limit: int, optional
:return: List of Profile objects
:rtype: list[Profile]
"""
params = {}
if id is not None:
params['id'] = id
if trash == True:
params['trash'] = True
if with_blocked == True:
params['withBlocked'] = True
if state is not None:
params['state'] = state
if offset is not None:
params['offset'] = offset
if limit is not None:
params['limit'] = limit
if sort is not None:
params['sort'] = sort
response = self.session.get(self.profiles_url, params=tools.format_params(params), headers = self.headers)
response = self.__handle_response(response)
return [Profile.from_json(p) for p in response.json()['profiles']]
[docs]
def search_profiles(self, confirmedEmails = None, emails = None, ids = None, term = None, first = None, middle = None, last = None, fullname=None, relation=None, use_ES = False):
"""
Gets a list of profiles using either their ids or corresponding emails
:param confirmedEmails: List of confirmed emails registered in OpenReview
:type confirmedEmails: list, optional
:param emails: List of emails registered in OpenReview
:type emails: list, optional
:param ids: List of OpenReview username ids
:type ids: list, optional
:param term: Substring in the username or e-mail to be searched
:type term: str, optional
:param first: First name of user
:type first: str, optional
:param middle: Middle name of user
:type middle: str, optional
:param last: Last name of user
:type last: str, optional
:return: List of profiles, if emails is present then a dictionary of { emails: profiles } is returned. If confirmedEmails is present then a dictionary of { confirmedEmails: profile } is returned
:rtype: list[Profile]
"""
def batches(items, batch_size=1000):
batch = []
for item in items:
if len(batch) == batch_size:
yield batch
batch = []
batch.append(item)
if batch:
yield batch
if term:
response = self.session.get(self.profiles_search_url, params = { 'term': term, 'es': 'true' if use_ES else 'false' }, headers = self.headers)
response = self.__handle_response(response)
return [Profile.from_json(p) for p in response.json()['profiles']]
if fullname:
response = self.session.get(self.profiles_search_url, params = { 'fullname': fullname, 'es': 'true' if use_ES else 'false' }, headers = self.headers)
response = self.__handle_response(response)
return [Profile.from_json(p) for p in response.json()['profiles']]
if emails:
emails = [email.lower() for email in emails]
full_response = []
for email_batch in batches(emails):
response = self.session.post(self.profiles_search_url, json = {'emails': email_batch}, headers = self.headers)
response = self.__handle_response(response)
full_response.extend(response.json()['profiles'])
profiles_by_email = {}
for p in full_response:
if p['email'] not in profiles_by_email:
profiles_by_email[p['email']] = []
profiles_by_email[p['email']].append(Profile.from_json(p))
return profiles_by_email
if confirmedEmails:
confirmedEmails = [email.lower() for email in confirmedEmails]
full_response = []
for email_batch in batches(confirmedEmails):
response = self.session.post(self.profiles_search_url, json = {'confirmedEmails': email_batch}, headers = self.headers)
response = self.__handle_response(response)
full_response.extend(response.json()['profiles'])
profiles_by_email = {}
for p in full_response:
profile_confirmed_emails = p.get('confirmedEmails', p['content'].get('emailsConfirmed', []))
for email in profile_confirmed_emails:
profiles_by_email[email] = Profile.from_json(p)
return profiles_by_email
if ids:
full_response = []
for id_batch in batches(ids):
response = self.session.post(self.profiles_search_url, json = {'ids': id_batch}, headers = self.headers)
response = self.__handle_response(response)
full_response.extend(response.json()['profiles'])
return [Profile.from_json(p) for p in full_response]
if first or middle or last:
response = self.session.get(self.profiles_url, params = {'first': first, 'middle': middle, 'last': last, 'es': 'true' if use_ES else 'false'}, headers = self.headers)
response = self.__handle_response(response)
return [Profile.from_json(p) for p in response.json()['profiles']]
if relation:
response = self.session.get(self.profiles_url, params = {'relation': relation }, headers = self.headers)
response = self.__handle_response(response)
return [Profile.from_json(p) for p in response.json()['profiles']]
return []
[docs]
def get_pdf(self, id, is_reference=False):
"""
Gets the binary content of a pdf using the provided note/reference id
If the pdf is not found then this returns an error message with "status":404.
Use the note id when trying to get the latest pdf version and reference id
when trying to get a previous version of the pdf
:param id: Note id or Reference id of the pdf
:type id: str
:param is_reference: Indicates that the passed id is a reference id instead of a note id
:type is_reference: bool, optional
:return: The binary content of a pdf
:rtype: bytes
Example:
>>> f = get_pdf(id='Place Note-ID here')
>>> with open('output.pdf','wb') as op: op.write(f)
"""
params = {}
params['id'] = id
headers = self.headers.copy()
headers['content-type'] = 'application/pdf'
url = self.pdf_revisions_url if is_reference else self.pdf_url
response = self.session.get(url, params=tools.format_params(params), headers = headers)
response = self.__handle_response(response)
return response.content
[docs]
def get_attachment(self, field_name, id=None, ids=None, group_id=None, invitation_id=None):
"""
Gets the binary content of a attachment using the provided note id
If the pdf is not found then this returns an error message with "status":404.
:param field_name: name of the field associated with the attachment file
:type field_name: str
:param id: Note id or Reference id of the pdf
:type id: str
:param ids: List of Note ids or Reference ids. The max number of ids is 50
:type id: list[str]
:param group_id: Id of group where attachment is stored
:type group_id: str
:param invitation_id: Id of invitation where attachment is stored
:type invitation_id: str
:return: The binary content of a pdf
:rtype: bytes
Example:
>>> f = get_attachment(id='Place Note-ID here', field_name='pdf')
>>> with open('output.pdf','wb') as op: op.write(f)
"""
if not any([id, ids, group_id, invitation_id]):
raise OpenReviewException('Provide exactly one of the following: id, ids, group_id, invitation_id')
params = {}
params['name'] = field_name
if id:
url = self.baseurl
params['id'] = id
elif ids:
url = self.baseurl
params['ids'] = ','.join(ids)
elif group_id:
url = self.groups_url
params['id'] = group_id
elif invitation_id:
url = self.invitations_url
params['id'] = invitation_id
response = self.session.get(url + '/attachment', params=tools.format_params(params), headers = self.headers)
response = self.__handle_response(response)
return response.content
[docs]
def get_venues(self, id=None, ids=None, invitations=None):
"""Get a list of Venue objects based on the filters provided.
Returns Venues matching all the criteria passed in the parameters.
:param id: A Venue ID. If provided, returns the Venue whose ID matches the given ID.
:type id: str, optional
:param ids: A list of Venue IDs. If provided, returns Venues whose IDs are in this list.
:type ids: list[str], optional
:param invitations: A list of Invitation IDs. If provided, returns Venues whose ``invitation`` field matches one of these IDs.
:type invitations: list[str], optional
:return: List of Venues.
:rtype: list[dict]
"""
params = {}
if id is not None:
params['id'] = id
if ids is not None:
params['ids'] = ','.join(ids)
if invitations is not None:
params['invitations'] = ','.join(invitations)
response = self.session.get(self.venues_url, params=tools.format_params(params), headers=self.headers)
response = self.__handle_response(response)
return response.json()['venues']
[docs]
def rename_venue(self, old_venue_id, new_venue_id, request_form=None, additional_renames=None):
"""
Updates the domain for an entire venue
:param old_domain: Current domain
:param new_domain: New domain
:return: Status of the request. The process can be tracked in the queue.
:rtype: dict
"""
json = {
'oldDomain': old_venue_id,
'newDomain': new_venue_id,
'requestForm': request_form
}
if additional_renames:
json['additionalRenames'] = additional_renames
response = self.session.post(
self.domains_rename,
json = json,
headers = self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def put_attachment(self, file_path, invitation, name):
"""Upload a file attachment to the OpenReview server.
:param file_path: Path to the local file to upload.
:type file_path: str
:param invitation: Invitation ID of the note that requires the attachment.
:type invitation: str
:param name: Name of the note content field where the attachment URL will be stored (e.g., ``pdf``, ``supplementary_material``).
:type name: str
:return: A relative URL for the uploaded file, to be used as the field value in a Note.
:rtype: str
"""
headers = self.headers.copy()
with open(file_path, 'rb') as f:
response = self.session.put(self.baseurl + '/attachment', files=(
('invitationId', (None, invitation)),
('name', (None, name)),
('file', (file_path, f))
), headers = headers)
response = self.__handle_response(response)
return response.json()['url']
[docs]
def post_profile(self, profile):
"""
Updates a Profile
:param profile: Profile object
:type profile: Profile
:return: The new updated Profile
:rtype: Profile
"""
response = self.session.post(
self.profiles_url,
json = profile.to_json(),
headers = self.headers)
response = self.__handle_response(response)
return Profile.from_json(response.json())
[docs]
def rename_domain(self, old_domain, new_domain, request_form, additional_renames=None):
"""
Updates the domain for an entire venue
:param old_domain: Current domain
:param new_domain: New domain
:return: Status of the request. The process can be tracked in the queue.
:rtype: dict
"""
json = {
'oldDomain': old_domain,
'newDomain': new_domain,
'requestForm': request_form
}
if additional_renames:
json['additionalRenames'] = additional_renames
response = self.session.post(
self.domains_rename,
json = json,
headers = self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def rename_profile(self, current_id, new_id):
"""Rename a Profile by changing its tilde ID.
:param current_id: Current profile ID (e.g., ``~Old_Name1``).
:type current_id: str
:param new_id: New profile ID (e.g., ``~New_Name1``).
:type new_id: str
:return: The updated Profile with the new ID.
:rtype: Profile
"""
response = self.session.post(
self.profiles_rename,
json = {
'currentId': current_id,
'newId': new_id
},
headers = self.headers)
response = self.__handle_response(response)
return Profile.from_json(response.json())
[docs]
def merge_profiles(self, profileTo, profileFrom):
"""
Merges two Profiles
:param profileTo: Profile object to merge to
:type profileTo: Profile
:param profileFrom: Profile object to merge from (this profile will be deleted)
:type: profileFrom: Profile
:return: The new updated Profile
:rtype: Profile
"""
response = self.session.post(
self.profiles_merge_url,
json = {
'to': profileTo,
'from': profileFrom
},
headers = self.headers)
response = self.__handle_response(response)
return Profile.from_json(response.json())
[docs]
def moderate_profile(self, profile_id, decision, reason=None):
"""
Moderates a Profile
:param profile_id: Profile id to moderate
:type profile_id: str
:param decision: Moderation decision (accept, reject, block, unblock, delete, restore, limit)
:type decision: str
:param reason: Reason for the decision. When rejecting, this text is emailed to the user.
:type reason: str, optional
:return: The new updated Profile
:rtype: Profile
"""
body = {
'id': profile_id,
'decision': decision
}
if reason is not None:
body['reason'] = reason
response = self.session.post(
self.profiles_moderate,
json = body,
headers = self.headers)
response = self.__handle_response(response)
return Profile.from_json(response.json())
[docs]
def update_relation_readers(self, update):
"""
Updates the relation readers available in the profile. This is an admin method.
:param update: Dictionary that accepts the keys append or remove with a list of group ids.
:type update: dict
:return: The new list of relation readers
:rtype: list
"""
response = self.session.patch(
self.relation_readers_url,
json = update,
headers = self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def get_groups(self, id=None, invitation=None, prefix=None, member=None, members=None, signatory=None, web=None, limit=None, offset=None, after=None, stream=None, sort=None, with_count=None, domain=None):
"""
Gets list of Group objects based on the filters provided. The Groups that will be returned match all the criteria passed in the parameters.
:param id: id of the Group
:type id: str, optional
:param prefix: Prefix that matches several Group ids
:type prefix: str, optional
:param member: Groups that that are transitive members of the member value
:type member: str, optional
:param members: Groups that contain the value members in the members field
:type members: str, optional
:param signatory: Groups that contain this signatory
:type signatory: str, optional
:param web: Groups that contain a web field value
:type web: bool, optional
:param limit: Maximum amount of Groups that this method will return. The limit parameter can range between 0 and 1000 inclusive. If a bigger number is provided, only 1000 Groups will be returned
:type limit: int, optional
:param offset: Indicates the position to start retrieving Groups. For example, if there are 10 Groups and you want to obtain the last 3, then the offset would need to be 7.
:type offset: int, optional
:return: List of Groups
:rtype: list[Group]
"""
params = {}
if id is not None:
params['id'] = id
if invitation is not None:
params['invitation'] = invitation
if prefix is not None:
params['prefix'] = prefix
if member is not None:
params['member'] = member
if members is not None:
params['members'] = members
if signatory is not None:
params['signatory'] = signatory
if sort is not None:
params['sort'] = sort
if web is not None:
params['web'] = web
if limit is not None:
params['limit'] = limit
if offset is not None:
params['offset'] = offset
if after is not None:
params['after'] = after
if stream is not None:
params['stream'] = stream
if with_count is not None:
params['count'] = with_count
if domain is not None:
params['domain'] = domain
response = self.session.get(self.groups_url, params=tools.format_params(params), headers = self.headers)
response = self.__handle_response(response)
groups = [Group.from_json(g) for g in response.json()['groups']]
if with_count and params.get('offset') is None:
return groups, response.json()['count']
return groups
[docs]
def get_all_groups(self, id=None, invitation=None, parent=None, prefix=None, member=None, members=None, domain=None, signatory=None, web=None, sort=None, with_count=None):
"""
Gets list of Group objects based on the filters provided. The Groups that will be returned match all the criteria passed in the parameters.
:param id: id of the Group
:type id: str, optional
:param parent: id of the parent Group
:type parent: str, optional
:param prefix: Prefix that matches several Group ids
:type prefix: str, optional
:param member: Groups that that are trasitive members of the member value
:type member: str, optional
:param members: Groups that contain the value members in the members field
:type members: str, optional
:param signatory: Groups that contain this signatory
:type signatory: str, optional
:param web: Groups that contain a web field value
:type web: bool, optional
:param limit: Maximum amount of Groups that this method will return. The limit parameter can range between 0 and 1000 inclusive. If a bigger number is provided, only 1000 Groups will be returned
:type limit: int, optional
:param offset: Indicates the position to start retrieving Groups. For example, if there are 10 Groups and you want to obtain the last 3, then the offset would need to be 7.
:type offset: int, optional
:param after: Group id to start getting the list of groups from.
:type after: str, optional
:return: List of Groups
:rtype: list[Group]
"""
params = {
'stream': True
}
if id is not None:
params['id'] = id
if invitation is not None:
params['invitation'] = invitation
if parent is not None:
params['parent'] = parent
if prefix is not None:
params['prefix'] = prefix
if member is not None:
params['member'] = member
if members is not None:
params['members'] = members
if signatory is not None:
params['signatory'] = signatory
if domain is not None:
params['domain'] = domain
if web is not None:
params['web'] = web
if sort is not None:
params['sort'] = sort
if with_count is not None:
params['with_count'] = with_count
return self.get_groups(**params)
[docs]
def get_invitations(self,
id = None,
ids = None,
invitee = None,
replytoNote = None,
replyForum = None,
signature = None,
note = None,
prefix = None,
tags = None,
limit = None,
offset = None,
after = None,
minduedate = None,
duedate = None,
pastdue = None,
replyto = None,
details = None,
expired = None,
sort = None,
type = None,
with_count=None,
invitation = None,
trash = None,
stream = None,
domain = None
):
"""
Gets list of Invitation objects based on the filters provided. The Invitations that will be returned match all the criteria passed in the parameters.
:param id: id of the Invitation
:type id: str, optional
:param ids: Comma separated Invitation IDs. If provided, returns invitations whose "id" value is any of the passed Invitation IDs.
:type ids: str, optional
:param invitee: Invitations that contain this invitee
:type invitee: str, optional
:param replytoNote: Invitations that contain this replytoNote
:type replytoNote: str, optional
:param replyForum: Invitations that contain this replyForum
:type replyForum: str, optional
:param signature: Invitations that contain this signature
:type signature: optional
:param note: Invitations that contain this note
:type note: str, optional
:param prefix: Invitation ids that match this prefix
:type prefix: str, optional
:param tags: Invitations that contain these tags
:type tags: Tag, optional
:param int limit: Maximum amount of Invitations that this method will return. The limit parameter can range between 0 and 1000 inclusive. If a bigger number is provided, only 1000 Invitations will be returned
:type limit: int, optional
:param int offset: Indicates the position to start retrieving Invitations. For example, if there are 10 Invitations and you want to obtain the last 3, then the offset would need to be 7.
:type offset: int, optional
:param after: Invitation id to start getting the list of invitations from.
:type after: str, optional
:param minduedate: Invitations that have at least this value as due date
:type minduedate: int, optional
:param duedate: Invitations that contain this due date
:type duedate: int, optional
:param pastdue: Invitaions that are past due
:type pastdue: bool, optional
:param replyto: Invitations that contain this replyto
:type replyto: optional
:param details: TODO: What is a valid value for this field?
:type details: dict, optional
:param expired: If true, retrieves the Invitations that have expired, otherwise, the ones that have not expired
:type expired: bool, optional
:param trash: If true, retrieves the Invitations that have been trashed, otherwise, the ones that have not been trashed
:type trash: bool, optional
:return: List of Invitations
:rtype: list[Invitation]
"""
params = {}
if id is not None:
params['id'] = id
if ids is not None:
params['ids'] = ids
if invitee is not None:
params['invitee'] = invitee
if replytoNote is not None:
params['replytoNote'] = replytoNote
if replyForum is not None:
params['replyForum'] = replyForum
if signature is not None:
params['signature'] = signature
if note is not None:
params['note']=note
if prefix is not None:
params['prefix'] = prefix
if tags is not None:
params['tags'] = tags
if minduedate is not None:
params['minduedate'] = minduedate
if replyto is not None:
params['replyto'] = replyto
if duedate is not None:
params['duedate'] = duedate
if pastdue is not None:
params['pastdue'] = pastdue
if details is not None:
params['details'] = details
if limit is not None:
params['limit'] = limit
if offset is not None:
params['offset'] = offset
if after is not None:
params['after'] = after
if sort is not None:
params['sort'] = sort
if expired is not None:
params['expired'] = expired
if type is not None:
params['type'] = type
if invitation is not None:
params['invitation'] = invitation
if with_count is not None:
params['count'] = with_count
if trash is not None:
params['trash'] = trash
if domain is not None:
params['domain'] = domain
if stream is not None:
params['stream'] = stream
response = self.session.get(self.invitations_url, params=tools.format_params(params), headers=self.headers)
response = self.__handle_response(response)
invitations = [Invitation.from_json(i) for i in response.json()['invitations']]
if with_count and params.get('offset') is None:
return invitations, response.json()['count']
return invitations
[docs]
def get_all_invitations(self,
id = None,
ids = None,
invitee = None,
replytoNote = None,
replyForum = None,
signature = None,
note = None,
prefix = None,
tags = None,
minduedate = None,
duedate = None,
pastdue = None,
replyto = None,
expired = None,
sort = None,
type = None,
invitation = None,
trash = None,
domain = None
):
"""
Gets list of Invitation objects based on the filters provided. The Invitations that will be returned match all the criteria passed in the parameters.
:param id: id of the Invitation
:type id: str, optional
:param ids: Comma separated Invitation IDs. If provided, returns invitations whose "id" value is any of the passed Invitation IDs.
:type ids: str, optional
:param invitee: Invitations that contain this invitee
:type invitee: str, optional
:param replytoNote: Invitations that contain this replytoNote
:type replytoNote: str, optional
:param replyForum: Invitations that contain this replyForum
:type replyForum: str, optional
:param signature: Invitations that contain this signature
:type signature: optional
:param note: Invitations that contain this note
:type note: str, optional
:param prefix: Invitation ids that match this prefix
:type prefix: str, optional
:param tags: Invitations that contain these tags
:type tags: Tag, optional
:param minduedate: Invitations that have at least this value as due date
:type minduedate: int, optional
:param duedate: Invitations that contain this due date
:type duedate: int, optional
:param pastdue: Invitaions that are past due
:type pastdue: bool, optional
:param replyto: Invitations that contain this replyto
:type replyto: optional
:param details: TODO: What is a valid value for this field?
:type details: dict, optional
:param expired: If true, retrieves the Invitations that have expired, otherwise, the ones that have not expired
:type expired: bool, optional
:return: List of Invitations
:rtype: list[Invitation]
"""
params = {
'stream': True
}
if id is not None:
params['id'] = id
if ids is not None:
params['ids'] = ids
if invitee is not None:
params['invitee'] = invitee
if replytoNote is not None:
params['replytoNote'] = replytoNote
if replyForum is not None:
params['replyForum'] = replyForum
if signature is not None:
params['signature'] = signature
if note is not None:
params['note'] = note
if prefix is not None:
params['prefix'] = prefix
if tags is not None:
params['tags'] = tags
if minduedate is not None:
params['minduedate'] = minduedate
if duedate is not None:
params['duedate'] = duedate
if pastdue is not None:
params['pastdue'] = pastdue
if replyto is not None:
params['replyto'] = replyto
if expired is not None:
params['expired'] = expired
if sort is not None:
params['sort'] = sort
if type is not None:
params['type'] = type
if invitation is not None:
params['invitation'] = invitation
if trash is not None:
params['trash'] = trash
if domain is not None:
params['domain'] = domain
return self.get_invitations(**params)
[docs]
def get_invitation_edit(self, id):
"""
Get a single edit by id if available
:param id: id of the edit
:type id: str
:return: edit matching the passed id
:rtype: Note
"""
response = self.session.get(self.invitation_edits_url, params = {'id':id}, headers = self.headers)
response = self.__handle_response(response)
n = response.json()['edits'][0]
return Edit.from_json(n)
[docs]
def get_invitation_edits(self, invitation_id = None, invitation = None, with_count=None, sort=None):
"""
Gets a list of edits for a note. The edits that will be returned match all the criteria passed in the parameters.
:return: List of edits
:rtype: list[Edit]
"""
params = {}
if invitation_id:
params['invitation.id'] = invitation_id
if invitation:
params['invitation'] = invitation
if sort:
params['sort'] = sort
if with_count is not None:
params['count'] = with_count
response = self.session.get(self.invitation_edits_url, params=tools.format_params(params), headers = self.headers)
response = self.__handle_response(response)
edits = [Edit.from_json(n) for n in response.json()['edits']]
if with_count and params.get('offset') is None:
return edits, response.json()['count']
return edits
[docs]
def get_notes(self, id = None,
external_id = None,
paperhash = None,
forum = None,
invitation = None,
parent_invitations = None,
replyto = None,
tauthor = None,
signature = None,
transitive_members = None,
signatures = None,
writer = None,
trash = None,
number = None,
content = None,
limit = None,
offset = None,
after = None,
mintcdate = None,
domain = None,
paper_hash = None,
details = None,
sort = None,
with_count=None,
stream=None
):
"""
Gets list of Note objects based on the filters provided. The Notes that will be returned match all the criteria passed in the parameters.
:param id: a Note ID. If provided, returns Notes whose ID matches the given ID.
:type id: str, optional
:param paperhash: A "paperhash" for a note. If provided, returns Notes whose paperhash matches this argument.
(A paperhash is a human-interpretable string built from the Note's title and list of authors to uniquely
identify the Note)
:type paperhash: str, optional
:param forum: A Note ID. If provided, returns Notes whose forum matches the given ID.
:type forum: str, optional
:param invitation: An Invitation ID. If provided, returns Notes whose "invitation" field is this Invitation ID.
:type invitation: str, optional
:param replyto: A Note ID. If provided, returns Notes whose replyto field matches the given ID.
:type replyto: str, optional
:param tauthor: A Group ID. If provided, returns Notes whose tauthor field ("true author") matches the given ID, or is a transitive member of the Group represented by the given ID.
:type tauthor: str, optional
:param signature: A Group ID. If provided, returns Notes whose signatures field contains the given Group ID.
:type signature: str, optional
:param transitive_members: If true, returns Notes whose tauthor field is a transitive member of the Group represented by the given Group ID.
:type transitive_members: bool, optional
:param signatures: Group IDs. If provided, returns Notes whose signatures field contains the given Group IDs.
:type signatures: list[str], optional
:param writer: A Group ID. If provided, returns Notes whose writers field contains the given Group ID.
:type writer: str, optional
:param trash: If True, includes Notes that have been deleted (i.e. the ddate field is less than the
current date)
:type trash: bool, optional
:param number: If present, includes Notes whose number field equals the given integer.
:type number: int, optional
:param content: If present, includes Notes whose each key is present in the content field and it is equals the given value.
:type content: dict, optional
:param limit: Maximum amount of Notes that this method will return. The limit parameter can range between 0 and 1000 inclusive. If a bigger number is provided, only 1000 Notes will be returned
:type limit: int, optional
:param offset: Indicates the position to start retrieving Notes. For example, if there are 10 Notes and you want to obtain the last 3, then the offset would need to be 7.
:type offset: int, optional
:param after: Note id to start getting the list of notes from.
:type after: str, optional
:param mintcdate: Represents an Epoch time timestamp, in milliseconds. If provided, returns Notes
whose "true creation date" (tcdate) is at least equal to the value of mintcdate.
:type mintcdate: int, optional
:param domain: If provided, returns Notes whose domain field matches the given domain.
:param details: TODO: What is a valid value for this field?
:type details: optional
:param sort: Sorts the output by field depending on the string passed. Possible values: number, cdate, ddate, tcdate, tmdate, replyCount (Invitation id needed in the invitation field).
:type sort: str, optional
:return: List of Notes
:rtype: list[Note]
"""
params = {}
if id is not None:
params['id'] = id
if external_id is not None:
params['externalId'] = external_id
if paperhash is not None:
params['paperhash'] = paperhash
if forum is not None:
params['forum'] = forum
if invitation is not None:
params['invitation'] = invitation
if parent_invitations is not None:
params['parentInvitations'] = parent_invitations
if replyto is not None:
params['replyto'] = replyto
if tauthor is not None:
params['tauthor'] = tauthor
if signature is not None:
params['signature'] = signature
if transitive_members is not None:
params['transitiveMembers'] = transitive_members
if signatures is not None:
params['signatures'] = signatures
if writer is not None:
params['writer'] = writer
if trash == True:
params['trash']=True
if number is not None:
params['number'] = number
if content is not None:
for k in content:
params['content.' + k] = content[k]
if limit is not None:
params['limit'] = limit
if offset is not None:
params['offset'] = offset
if mintcdate is not None:
params['mintcdate'] = mintcdate
if domain is not None:
params['domain'] = domain
if paper_hash is not None:
params['paperhash'] = paper_hash
if details is not None:
params['details'] = details
if after is not None:
params['after'] = after
if sort is not None:
params['sort'] = sort
if with_count is not None:
params['count'] = with_count
if stream is not None:
params['stream'] = stream
response = self.session.get(self.notes_url, params=tools.format_params(params), headers = self.headers)
response = self.__handle_response(response)
notes = [Note.from_json(n) for n in response.json()['notes']]
if with_count and params.get('offset') is None:
return notes, response.json()['count']
return notes
[docs]
def get_all_notes(self, id = None,
paperhash = None,
forum = None,
invitation = None,
parent_invitations = None,
replyto = None,
signature = None,
transitive_members = None,
signatures = None,
writer = None,
trash = None,
number = None,
content = None,
mintcdate = None,
details = None,
select = None,
sort = None,
domain=None
):
"""
Gets list of Note objects based on the filters provided. The Notes that will be returned match all the criteria passed in the parameters.
:param id: a Note ID. If provided, returns Notes whose ID matches the given ID.
:type id: str, optional
:param paperhash: A "paperhash" for a note. If provided, returns Notes whose paperhash matches this argument.
(A paperhash is a human-interpretable string built from the Note's title and list of authors to uniquely
identify the Note)
:type paperhash: str, optional
:param forum: A Note ID. If provided, returns Notes whose forum matches the given ID.
:type forum: str, optional
:param invitation: An Invitation ID. If provided, returns Notes whose "invitation" field is this Invitation ID.
:type invitation: str, optional
:param parent_invitations: An Invitation ID. If provided, returns Notes whose parentInvitations field contains the given Invitation ID.
:type parent_invitations: str, optional
:param replyto: A Note ID. If provided, returns Notes whose replyto field matches the given ID.
:type replyto: str, optional
:param signature: A Group ID. If provided, returns Notes whose signatures field contains the given Group ID.
:type signature: str, optional
:param transitive_members: If true, returns Notes whose tauthor field is a transitive member of the Group represented by the given Group ID.
:type transitive_members: bool, optional
:param signatures: Group IDs. If provided, returns Notes whose signatures field contains the given Group IDs.
:type signatures: list[str], optional
:param writer: A Group ID. If provided, returns Notes whose writers field contains the given Group ID.
:type writer: str, optional
:param trash: If True, includes Notes that have been deleted (i.e. the ddate field is less than the
current date)
:type trash: bool, optional
:param number: If present, includes Notes whose number field equals the given integer.
:type number: int, optional
:param content: If present, includes Notes whose each key is present in the content field and it is equals the given value.
:type content: dict, optional
:param after: Note id to start getting the list of notes from.
:type after: str, optional
:param mintcdate: Represents an Epoch time timestamp, in milliseconds. If provided, returns Notes
whose "true creation date" (tcdate) is at least equal to the value of mintcdate.
:type mintcdate: int, optional
:param details: TODO: What is a valid value for this field?
:type details: optional
:param sort: Sorts the output by field depending on the string passed. Possible values: number, cdate, ddate, tcdate, tmdate, replyCount (Invitation id needed in the invitation field).
:type sort: str, optional
:return: List of Notes
:rtype: list[Note]
"""
params = {}
if id is not None:
params['id'] = id
if paperhash is not None:
params['paperhash'] = paperhash
if forum is not None:
params['forum'] = forum
if invitation is not None:
params['invitation'] = invitation
if parent_invitations is not None:
params['parent_invitations'] = parent_invitations
if replyto is not None:
params['replyto'] = replyto
if signature is not None:
params['signature'] = signature
if signatures is not None:
params['signatures'] = signatures
if transitive_members is not None:
params['transitive_members'] = transitive_members
if writer is not None:
params['writer'] = writer
if trash == True:
params['trash']=True
if number is not None:
params['number'] = number
if content is not None:
params['content'] = content
if mintcdate is not None:
params['mintcdate'] = mintcdate
if details is not None:
params['details'] = details
if select:
params['select'] = select
if sort is not None:
params['sort'] = sort
if domain is not None:
params['domain'] = domain
if 'details' not in params:
params['stream'] = True
# Handle sort param for local sorting
sort_key = None
reverse = False
if 'sort' in params:
# Accept format like "number:asc", "tcdate:desc", etc.
valid_fields = {
'number': lambda n: n.number,
'tcdate': lambda n: n.tcdate,
'tmdate': lambda n: n.tmdate,
'cdate': lambda n: n.cdate,
'mdate': lambda n: n.mdate
}
if ':' in sort:
field, direction = sort.split(':', 1)
else:
field, direction = sort, 'desc'
if field in valid_fields:
sort_key = valid_fields[field]
reverse = direction == 'desc'
params['sort'] = None # Remove for API call, sort locally
results = self.get_notes(**params)
if sort_key:
return sorted(results, key=sort_key, reverse=reverse)
return results
return list(tools.efficient_iterget(self.get_notes, desc='Getting V2 Notes', **params))
[docs]
def get_note_edit(self, id, trash=None):
"""
Get a single edit by id if available
:param id: id of the edit
:type id: str
:return: edit matching the passed id
:rtype: Note
"""
response = self.session.get(self.note_edits_url, params = {'id':id, 'trash': 'true' if trash == True else 'false'}, headers = self.headers)
response = self.__handle_response(response)
n = response.json()['edits'][0]
return Edit.from_json(n)
[docs]
def get_note_edits(self, note_id = None, invitation = None, with_count=None, sort=None, trash=None, limit=None):
"""Get a list of Edit objects for a Note matching the filters provided.
Returns edits that match all the criteria passed in the parameters. When
``with_count`` is True, returns a tuple of ``(edits, count)``.
:param note_id: ID of the Note whose edits to retrieve.
:type note_id: str, optional
:param invitation: Invitation ID to filter edits by.
:type invitation: str, optional
:param with_count: If True, also returns the total count of matching edits.
:type with_count: bool, optional
:param sort: Field to sort results by (e.g., ``tcdate``, ``tmdate``).
:type sort: str, optional
:param trash: If True, includes soft-deleted edits in the results.
:type trash: bool, optional
:param limit: Maximum number of edits to return.
:type limit: int, optional
:return: List of Edit objects, or a tuple ``(list[Edit], int)`` when ``with_count`` is True.
:rtype: list[Edit] | tuple[list[Edit], int]
"""
params = {}
if note_id:
params['note.id'] = note_id
if invitation:
params['invitation'] = invitation
if sort:
params['sort'] = sort
if trash:
params['trash'] = trash
if with_count is not None:
params['count'] = with_count
if limit is not None:
params['limit'] = limit
response = self.session.get(self.note_edits_url, params=tools.format_params(params), headers = self.headers)
response = self.__handle_response(response)
edits = [Edit.from_json(n) for n in response.json()['edits']]
if with_count and params.get('offset') is None:
return edits, response.json()['count']
return edits
[docs]
def get_group_edit(self, id):
"""
Get a single edit by id if available
:param id: id of the edit
:type id: str
:return: edit matching the passed id
:rtype: Group
"""
response = self.session.get(self.group_edits_url, params = {'id':id}, headers = self.headers)
response = self.__handle_response(response)
n = response.json()['edits'][0]
return Edit.from_json(n)
[docs]
def get_group_edits(self, group_id = None, invitation = None, with_count = False, sort = None, trash = None):
"""
Gets a list of edits for a group. The edits that will be returned match all the criteria passed in the parameters.
:return: List of edits
:rtype: list[Edit]
"""
params = {}
if group_id:
params['group.id'] = group_id
if invitation:
params['invitation'] = invitation
if sort:
params['sort'] = sort
if trash:
params['trash'] = trash
if with_count is not None:
params['count'] = with_count
response = self.session.get(self.group_edits_url, params=tools.format_params(params), headers = self.headers)
response = self.__handle_response(response)
edits = [Edit.from_json(n) for n in response.json()['edits']]
if with_count and params.get('offset') is None:
return edits, response.json()['count']
return edits
[docs]
def post_tag(self, tag):
"""
Posts the tag.
:param tag: Tag to be posted
:type tag: Tag
:return Tag: The posted Tag
"""
response = self.session.post(self.tags_url, json = tag.to_json(), headers = self.headers)
response = self.__handle_response(response)
return Tag.from_json(response.json())
[docs]
def post_tags(self, tags):
'''
Posts the list of Tags. Returns a list Tag objects updated with their ids.
'''
send_json = [tag.to_json() for tag in tags]
response = self.session.post(self.bulk_tags_url, json = send_json, headers = self.headers)
response = self.__handle_response(response)
received_json_array = response.json()
tag_objects = [Tag.from_json(tag) for tag in received_json_array]
return tag_objects
#return response.json()
[docs]
def get_edges(self, id = None, invitation = None, head = None, tail = None, label = None, limit = None, offset = None, with_count=None, trash=None, select=None, stream=None, domain=None):
"""Get a list of Edge objects based on the filters provided.
Returns Edges matching all the criteria passed in the parameters. When
``with_count`` is True and ``offset`` is not set, returns a tuple of
``(edges, count)``.
:param id: An Edge ID. If provided, returns the Edge whose ID matches.
:type id: str, optional
:param invitation: An Invitation ID. If provided, returns Edges whose ``invitation`` field matches.
:type invitation: str, optional
:param head: ID of the Edge head entity (type defined by the edge invitation, e.g., a Note ID or Profile ID).
:type head: str, optional
:param tail: ID of the Edge tail entity (type defined by the edge invitation, e.g., a Note ID or Profile ID).
:type tail: str, optional
:param label: Label value to filter Edges by.
:type label: str, optional
:param limit: Maximum number of Edges to return. Default is determined by the server.
:type limit: int, optional
:param offset: Number of Edges to skip (for pagination).
:type offset: int, optional
:param with_count: If True, also returns the total count of matching Edges.
:type with_count: bool, optional
:param trash: If True, includes soft-deleted Edges in the results.
:type trash: bool, optional
:param select: Comma-separated list of fields to include in the response (e.g., ``id,head,tail``).
:type select: str, optional
:param stream: If True, returns all matching Edges using server-side streaming (ignores ``limit``/``offset``).
:type stream: bool, optional
:param domain: Venue domain ID; improves query efficiency when the caller is a venue organizer.
:type domain: str, optional
:return: List of Edge objects, or a tuple ``(list[Edge], int)`` when ``with_count`` is True and ``offset`` is None.
:rtype: list[Edge] | tuple[list[Edge], int]
"""
params = {}
params['id'] = id
params['invitation'] = invitation
params['head'] = head
params['tail'] = tail
params['label'] = label
params['limit'] = limit
params['offset'] = offset
params['trash'] = trash
if select is not None:
params['select'] = select
if stream is not None:
params['stream'] = stream
if with_count is not None:
params['count'] = with_count
if domain is not None:
params['domain'] = domain
response = self.session.get(self.edges_url, params=tools.format_params(params), headers = self.headers)
response = self.__handle_response(response)
edges = [Edge.from_json(e) for e in response.json()['edges']]
if with_count and params.get('offset') is None:
return edges, response.json()['count']
return edges
[docs]
def get_all_edges(self, id = None, invitation = None, head = None, tail = None, label = None, trash=None, select=None, domain=None):
"""Get all Edge objects matching the filters using server-side streaming.
Convenience wrapper around :meth:`get_edges` with ``stream=True``, which
retrieves all matching Edges without manual pagination.
:param id: An Edge ID. If provided, returns the Edge whose ID matches.
:type id: str, optional
:param invitation: An Invitation ID. If provided, returns Edges whose ``invitation`` field matches.
:type invitation: str, optional
:param head: ID of the Edge head entity.
:type head: str, optional
:param tail: ID of the Edge tail entity.
:type tail: str, optional
:param label: Label value to filter Edges by.
:type label: str, optional
:param trash: If True, includes soft-deleted Edges in the results.
:type trash: bool, optional
:param select: Comma-separated list of fields to include in the response.
:type select: str, optional
:param domain: Venue domain ID; improves query efficiency when the caller is a venue organizer.
:type domain: str, optional
:return: List of all matching Edge objects.
:rtype: list[Edge]
"""
params = {
'id': id,
'invitation': invitation,
'head': head,
'tail': tail,
'label': label,
'trash': trash,
'select': select,
'domain': domain,
'stream': True
}
return self.get_edges(**params)
[docs]
def get_edges_count(self, id=None, invitation=None, head=None, tail=None, label=None, domain=None):
"""Return the count of Edge objects matching the filters provided.
If ``domain`` is not provided but ``invitation`` is, the method attempts
to infer the domain from the invitation for more efficient querying.
:param id: An Edge ID. If provided, counts only the Edge whose ID matches.
:type id: str, optional
:param invitation: An Invitation ID. If provided, counts Edges whose ``invitation`` field matches.
:type invitation: str, optional
:param head: ID of the Edge head entity.
:type head: str, optional
:param tail: ID of the Edge tail entity.
:type tail: str, optional
:param label: Label value to filter Edges by.
:type label: str, optional
:param domain: Venue domain ID; improves query efficiency when the caller is a venue organizer.
:type domain: str, optional
:return: Number of Edges matching the filters.
:rtype: int
"""
params = {}
params['id'] = id
params['invitation'] = invitation
params['head'] = head
params['tail'] = tail
params['label'] = label
if domain is not None:
params['domain'] = domain
elif invitation is not None:
try:
edges_invitation = self.get_invitation(invitation)
params['domain'] = edges_invitation.domain
except:
pass
response = self.session.get(self.edges_count_url, params=tools.format_params(params), headers = self.headers)
response = self.__handle_response(response)
return response.json()['count']
[docs]
def get_grouped_edges(self, invitation=None, head=None, tail=None, label=None, groupby='head', select=None, limit=None, offset=None, trash=None, domain=None):
"""Get Edges grouped by a specified field.
Returns a list of JSON objects where each one represents a group of Edges.
For example, with ``groupby='head'`` each group has the form:
``{id: {head: paper-1}, values: [{tail: user-1}, {tail: user-2}]}``.
The ``limit`` applies to the number of groups returned, not the number of
Edges within each group.
:param invitation: An Invitation ID. If provided, returns Edges whose ``invitation`` field matches.
:type invitation: str, optional
:param head: ID of the Edge head entity to filter by.
:type head: str, optional
:param tail: ID of the Edge tail entity to filter by.
:type tail: str, optional
:param label: Label value to filter Edges by.
:type label: str, optional
:param groupby: Field to group Edges by. Defaults to ``head``.
:type groupby: str, optional
:param select: Comma-separated list of fields to include in each group's values.
:type select: str, optional
:param limit: Maximum number of groups to return.
:type limit: int, optional
:param offset: Number of groups to skip (for pagination).
:type offset: int, optional
:param trash: If True, includes soft-deleted Edges in the results.
:type trash: bool, optional
:param domain: Venue domain ID; improves query efficiency when the caller is a venue organizer.
:type domain: str, optional
:return: List of grouped edge dictionaries, each containing ``id`` and ``values`` keys.
:rtype: list[dict]
"""
params = {}
params['id'] = None
params['invitation'] = invitation
params['head'] = head
params['tail'] = tail
params['label'] = label
params['groupBy'] = groupby
params['select'] = select
params['limit'] = limit
params['offset'] = offset
params['trash'] = trash
params['domain'] = domain
response = self.session.get(self.edges_url, params=tools.format_params(params), headers = self.headers)
response = self.__handle_response(response)
json = response.json()
return json['groupedEdges'] # a list of JSON objects holding information about an edge
[docs]
def get_archived_edges(self, invitation):
"""
Returns a list of Edge objects based on the filters provided.
:arg invitation: an Invitation ID. If provided, returns Edges whose "invitation" field is this Invitation ID.
"""
params = {'invitation': invitation}
print('tools.format_params(params)', tools.format_params(params))
response = self.session.get(self.edges_archive, params=tools.format_params(params), headers = self.headers)
response = self.__handle_response(response)
edges = [Edge.from_json(e) for e in response.json()['edges']]
return edges
[docs]
def post_edge(self, edge):
"""Post a single Edge to the server.
Creates or updates an Edge. Upon success, returns the posted Edge
object with its server-assigned ``id``.
:param edge: Edge object to post.
:type edge: Edge
:return: The posted Edge object.
:rtype: Edge
"""
response = self.session.post(self.edges_url, json = edge.to_json(), headers = self.headers)
response = self.__handle_response(response)
return Edge.from_json(response.json())
[docs]
def post_edges (self, edges):
'''
Posts the list of Edges. Returns a list Edge objects updated with their ids.
'''
send_json = [edge.to_json() for edge in edges]
response = self.session.post(self.bulk_edges_url, json = send_json, headers = self.headers)
response = self.__handle_response(response)
received_json_array = response.json()
edge_objects = [Edge.from_json(edge) for edge in received_json_array]
return edge_objects
[docs]
def rename_edges(self, current_id, new_id):
"""Rename all Edges that reference a given ID, replacing it with a new ID.
Updates the ``head`` and ``tail`` fields of all Edges that contain
``current_id``, replacing occurrences with ``new_id``.
:param current_id: The current ID to find in Edge head/tail fields (e.g., a profile tilde ID).
:type current_id: str
:param new_id: The new ID to replace it with.
:type new_id: str
:return: List of updated Edge objects.
:rtype: list[Edge]
"""
response = self.session.post(
self.edges_rename,
json = {
'currentId': current_id,
'newId': new_id
},
headers = self.headers)
response = self.__handle_response(response)
#print('RESPONSE: ', response.json())
received_json_array = response.json()
edge_objects = [Edge.from_json(edge) for edge in received_json_array]
return edge_objects
[docs]
def post_venue(self, venue):
"""
Posts the venue. Upon success, returns the posted Venue object.
"""
response = self.session.post(self.venues_url, json=venue, headers=self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def restrict(self, venue_id):
"""
Restricts a domain/venue, preventing non-authorized users from accessing its data.
:param venue_id: the domain/venue ID to restrict
:type venue_id: str
:return: the API response
:rtype: dict
"""
response = self.session.post(self.domains_restriction, json={'domain': venue_id, 'action': 'restrict'}, headers=self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def unrestrict(self, venue_id):
"""
Removes the restriction from a domain/venue, restoring normal data access.
:param venue_id: the domain/venue ID to unrestrict
:type venue_id: str
:return: the API response
:rtype: dict
"""
response = self.session.post(self.domains_restriction, json={'domain': venue_id, 'action': 'unrestrict'}, headers=self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def delete_edges(self, invitation, id=None, label=None, head=None, tail=None, wait_to_finish=False, soft_delete=False):
"""
Deletes edges by a combination of invitation id and one or more of the optional filters.
:param invitation: an invitation ID
:type invitation: str
:param label: a matching label ID
:type label: str, optional
:param head: id of the edge head (head type defined by the edge invitation)
:type head: str, optional
:param tail: id of the edge tail (tail type defined by the edge invitation)
:type tail: str, optional
:param wait_to_finish: True if execution should pause until deletion of edges is finished
:type wait_to_finish: bool, optional
:return: a {status = 'ok'} in case of a successful deletion and an OpenReview exception otherwise
:rtype: dict
"""
delete_query = {'invitation': invitation}
if label:
delete_query['label'] = label
if head:
delete_query['head'] = head
if tail:
delete_query['tail'] = tail
if id:
delete_query['id'] = id
delete_query['waitToFinish'] = wait_to_finish
delete_query['softDelete'] = soft_delete
response = self.session.delete(self.edges_url, json = delete_query, headers = self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def delete_note(self, note_id):
"""
Deletes the note
:param note_id: ID of Note to be deleted
:type note_id: str
:return: a {status = 'ok'} in case of a successful deletion and an OpenReview exception otherwise
:rtype: dict
"""
response = self.session.delete(self.notes_url, json = {'id': note_id}, headers = self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def delete_profile_reference(self, reference_id):
'''
Deletes the Profile Reference specified by `reference_id`.
:param reference_id: ID of the Profile Reference to be deleted.
:type reference_id: str
:return: a {status = 'ok'} in case of a successful deletion and an OpenReview exception otherwise
:rtype: dict
'''
response = self.session.delete(self.profiles_url + '/reference', json = {'id': reference_id}, headers = self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def delete_group(self, group_id):
"""
Deletes the group
:param group_id: ID of Group to be deleted
:type group_id: str
:return: a {status = 'ok'} in case of a successful deletion and an OpenReview exception otherwise
:rtype: dict
"""
response = self.session.delete(self.groups_url, json = {'id': group_id}, headers = self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def delete_institution(self, institution_id):
"""
Deletes the institution
:param institution_id: ID of Institution to be deleted
:type institution_id: str
:return: a {status = 'ok'} in case of a successful deletion and an OpenReview exception otherwise
:rtype: dict
"""
response = self.session.delete(self.institutions_url + '/' + institution_id, headers = self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def post_message(self, subject, recipients, message, invitation=None, signature=None, ignoreRecipients=None, sender=None, replyTo=None, parentGroup=None, use_job=None):
"""
Posts a message to the recipients and consequently sends them emails
:param subject: Subject of the e-mail
:type subject: str
:param recipients: Recipients of the e-mail. Valid inputs would be tilde username or emails registered in OpenReview
:type recipients: list[str]
:param message: Message in the e-mail
:type message: str
:param invitation: Invitation ID of the invitation that allows to send the message
:type invitation: str
:param signature: Signature of the user sending the message
:type signature: str
:param ignoreRecipients: List of groups ids to be ignored from the recipient list
:type subject: list[str]
:param sender: Specify the from address and name of the email, the dictionary should have two keys: 'name' and 'email'
:type sender: dict
:param replyTo: e-mail address used when recipients reply to this message
:type replyTo: str
:param parentGroup: parent group recipients of e-mail belong to
:type parentGroup: str
:param use_job: If True, the message will be sent using the job queue
:type use_job: bool
:return: Contains the message that was sent to each Group
:rtype: dict
"""
return self.post_message_request(subject, recipients, message, invitation=invitation, signature=signature, ignoreRecipients=ignoreRecipients, sender=sender, replyTo=replyTo, parentGroup=parentGroup, use_job=use_job)
[docs]
def post_message_request(self, subject, recipients, message, invitation=None, signature=None, ignoreRecipients=None, sender=None, replyTo=None, parentGroup=None, use_job=None):
"""
Posts a message to the recipients and consequently sends them emails
:param subject: Subject of the e-mail
:type subject: str
:param recipients: Recipients of the e-mail. Valid inputs would be tilde username or emails registered in OpenReview
:type recipients: list[str]
:param message: Message in the e-mail
:type message: str
:param invitation: Invitation ID of the invitation that allows to send the message
:type invitation: str
:param signature: Signature of the user sending the message
:type signature: str
:param ignoreRecipients: List of groups ids to be ignored from the recipient list
:type subject: list[str]
:param sender: Specify the from address and name of the email, the dictionary should have two keys: 'name' and 'email'
:type sender: dict
:param replyTo: e-mail address used when recipients reply to this message
:type replyTo: str
:param parentGroup: parent group recipients of e-mail belong to
:type parentGroup: str
:param use_job: If True, the message will be sent using the job queue
:type use_job: bool
:return: Contains the message that was sent to each Group
:rtype: dict
"""
if parentGroup:
recipients = self.get_group(parentGroup).transform_to_anon_ids(recipients)
json = {
'groups': recipients,
'subject': subject ,
'message': message
}
if invitation:
json['invitation'] = invitation
if signature:
json['signature'] = signature
if ignoreRecipients:
json['ignoreGroups'] = ignoreRecipients
if sender:
json['fromName'] = sender.get('fromName')
json['fromEmail'] = sender.get('fromEmail')
if replyTo:
json['replyTo'] = replyTo
if parentGroup:
json['parentGroup'] = parentGroup
if use_job is not None:
json['useJob'] = use_job
response = self.session.post(self.messages_requests_url, json = json, headers = self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def get_message_requests(self, id=None, invitation=None):
"""
Posts a message to the recipients and consequently sends them emails
:param id: ID of the message request
:type id: str
:param invitation: Invitation ID of the invitation that allows to send the message
:type invitation: str
:return: Contains the message request used to send the messages
:rtype: dict
"""
params = {}
if id:
params['id'] = id
if invitation:
params['invitation'] = invitation
if params:
response = self.session.get(self.messages_requests_url, params=tools.format_params(params), headers = self.headers)
response = self.__handle_response(response)
return response.json()['requests']
else:
return []
[docs]
def post_direct_message(self, subject, recipients, message, sender=None):
"""
Posts a message to the recipients and consequently sends them emails
:param subject: Subject of the e-mail
:type subject: str
:param recipients: Recipients of the e-mail. Valid inputs would be tilde username or emails registered in OpenReview
:type recipients: list[str]
:param message: Message in the e-mail
:type message: str
:return: Contains the message that was sent to each Group
:rtype: dict
"""
response = self.session.post(self.messages_direct_url, json = {
'groups': recipients,
'subject': subject ,
'message': message,
'from': sender
}, headers = self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def add_members_to_group(self, group, members):
"""
Adds members to a group
:param group: Group (or Group's id) to which the members will be added
:type group: Group or str
:param members: Members that will be added to the group. Members should be in a string, unicode or a list format
:type members: str, list, unicode
:return: Group with the members added
:rtype: Group
"""
def add_member(group, members):
group = self.get_group(group) if type(group) in string_types else group
self.post_group_edit(invitation = f'{group.domain}/-/Edit',
signatures = group.signatures,
group = Group(
id = group.id,
members = {
'add': list(set(members))
}
),
readers=group.signatures,
writers=group.signatures
)
return self.get_group(group.id)
member_type = type(members)
if member_type in string_types:
return add_member(group, [members])
if member_type == list:
return add_member(group, members)
raise OpenReviewException("add_members_to_group()- members '"+str(members)+"' ("+str(member_type)+") must be a str, unicode or list, but got " + repr(member_type) + " instead")
[docs]
def remove_members_from_group(self, group, members):
"""
Removes members from a group
:param group: Group (or Group's id) from which the members will be removed
:type group: Group or str
:param members: Members that will be removed. Members should be in a string, unicode or a list format
:type members: str, list, unicode
:return: Group without the members that were removed
:type: Group
"""
def remove_member(group, members):
members_to_remove = list(set(members))
group = self.get_group(group if type(group) in string_types else group.id)
members_to_remove = group.transform_to_anon_ids(members_to_remove)
self.post_group_edit(invitation = f'{group.domain}/-/Edit',
signatures = group.signatures,
group = Group(
id = group.id,
members = {
'remove': members_to_remove
}
),
readers=group.signatures,
writers=group.signatures
)
return self.get_group(group.id)
member_type = type(members)
if member_type in string_types:
return remove_member(group, [members])
if member_type == list:
return remove_member(group, members)
[docs]
def search_notes(self, term, content = 'all', group = 'all', source='all', limit = None, offset = None):
"""
Searches notes based on term, content, group and source as the criteria. Unlike :meth:`~openreview.Client.get_notes`, this method uses Elasticsearch to retrieve the Notes
:param term: Term used to look for the Notes
:type term: str
:param content: Specifies whether to look in all the content, authors, or keywords. Valid inputs: 'all', 'authors', 'keywords'
:type content: str, optional
:param group: Specifies under which Group to look. E.g. 'all', 'ICLR', 'UAI', etc.
:type group: str, optional
:param source: Whether to look in papers, replies or all
:type source: str, optional
:param limit: Maximum amount of Notes that this method will return. The limit parameter can range between 0 and 1000 inclusive. If a bigger number is provided, only 1000 Notes will be returned
:type limit: int, optional
:param offset: Indicates the position to start retrieving Notes. For example, if there are 10 Notes and you want to obtain the last 3, then the offset would need to be 7.
:type offset: int, optional
:return: List of notes
:rtype: list[Note]
"""
params = {
'term': term,
'content': content,
'group': group,
'source': source
}
if limit is not None:
params['limit'] = limit
if offset is not None:
params['offset'] = offset
response = self.session.get(self.notes_url + '/search', params=tools.format_params(params), headers = self.headers)
response = self.__handle_response(response)
return [Note.from_json(n) for n in response.json()['notes']]
def get_notes_by_ids(self, ids):
response = self.session.post(self.notes_url + '/search', json = { 'ids': ids }, headers = self.headers)
response = self.__handle_response(response)
return [Note.from_json(n) for n in response.json()['notes']]
[docs]
def get_tildeusername(self, fullname):
"""
Gets next possible tilde user name corresponding to the specified full name
:param fullname: Full name of the user
:type fullname: str
:return: next possible tilde user name corresponding to the specified full name
:rtype: dict
"""
response = self.session.get(self.tilde_url, params = { 'fullname': fullname }, headers = self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def get_messages(self, to = None, subject = None, status = None, offset = None, limit = None):
"""
**Only for Super User**. Retrieves all the messages sent to a list of usernames or emails and/or a particular e-mail subject
:param to: Tilde user names or emails
:type to: list[str], optional
:param subject: Subject of the e-mail
:type subject: str, optional
:param status: Commad separated list of status values corresponding to the message: delivered, bounce, droppped, etc
:type status: str, optional
:return: Messages that match the passed parameters
:rtype: dict
"""
response = self.session.get(self.messages_url, params = { 'to': to, 'subject': subject, 'status': status, 'offset': offset, 'limit': limit }, headers = self.headers)
response = self.__handle_response(response)
return response.json()['messages']
[docs]
def get_process_logs(self, id = None, invitation = None, status = None, min_sdate = None):
"""Retrieve process function execution logs.
**Only for Super User.** Returns log entries for process functions
triggered by edits or invitation date processes.
:param id: Edit ID (the ``id`` returned by a ``post_*_edit`` call) that triggered the process function.
:type id: str, optional
:param invitation: Invitation ID to filter logs by the invitation whose process function produced them.
:type invitation: str, optional
:param status: Filter by execution status (e.g., ``ok``, ``error``, ``running``).
:type status: str, optional
:param min_sdate: Minimum start date in epoch milliseconds. Returns logs started on or after this time.
:type min_sdate: int, optional
:return: List of process log entry dictionaries, each containing ``id``, ``status``, ``log``, and timestamp fields.
:rtype: list[dict]
"""
response = self.session.get(self.process_logs_url, params = { 'id': id, 'invitation': invitation, 'status': status, 'minsdate': min_sdate }, headers = self.headers)
response = self.__handle_response(response)
return response.json()['logs']
[docs]
def post_institution(self, institution):
"""
Requires Super User permission.
Adds an institution if the institution id is not found in the database,
otherwise, the institution is updated.
:param institution: institution to be posted
:type institution: dict
:return: The posted institution
:rtype: dict
"""
response = self.session.post(self.institutions_url, json = institution, headers = self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def post_invitation_edit(self, invitations, readers=None, writers=None, signatures=None, invitation=None, content=None, replacement=None, domain=None, await_process=False):
"""Create or update an Invitation via the edit system.
Posts an edit that creates a new Invitation or modifies an existing one.
The edit is validated against the parent invitation(s) specified by
``invitations``.
:param invitations: Parent invitation ID that authorizes this edit (e.g., ``venue/-/Edit``).
:type invitations: str
:param readers: List of group IDs that can read this edit.
:type readers: list[str], optional
:param writers: List of group IDs that can modify this edit.
:type writers: list[str], optional
:param signatures: List of group IDs signing this edit.
:type signatures: list[str], optional
:param invitation: Invitation object containing the fields to create or update. Use ``invitation.id`` to target an existing Invitation.
:type invitation: Invitation, optional
:param content: Additional content fields for the edit itself.
:type content: dict, optional
:param replacement: If True, the edit fully replaces the existing Invitation rather than merging fields.
:type replacement: bool, optional
:param domain: Domain (venue ID) that this edit belongs to.
:type domain: str, optional
:param await_process: If True, blocks until the server-side process function completes; raises OpenReviewException on error.
:type await_process: bool, optional
:return: Dictionary containing the posted edit, including the assigned edit ``id``.
:rtype: dict
"""
edit_json = {}
if invitations is not None:
edit_json['invitations'] = invitations
if readers is not None:
edit_json['readers'] = readers
if writers is not None:
edit_json['writers'] = writers
if signatures is not None:
edit_json['signatures'] = signatures
if content is not None:
edit_json['content'] = content
if replacement is not None:
edit_json['replacement'] = replacement
if invitation is not None:
edit_json['invitation'] = invitation.to_json()
if domain is not None:
edit_json['domain'] = domain
response = self.session.post(self.invitation_edits_url, json = edit_json, headers = self.headers)
response = self.__handle_response(response)
if await_process:
self.__await_process(response.json()['id'])
return response.json()
[docs]
def post_note_edit(self, invitation, signatures, note=None, readers=None, writers=None, nonreaders=None, content=None, await_process=False):
"""Create or update a Note via the edit system.
Posts an edit that creates a new Note or modifies an existing one. The
edit is validated against the specified invitation's schema. To update an
existing Note, set ``note.id`` to the target Note's ID.
:param invitation: Invitation ID that defines the schema and permissions for this edit (e.g., ``venue/-/Submission``).
:type invitation: str
:param signatures: List of group IDs signing this edit.
:type signatures: list[str]
:param note: Note object containing the fields to create or update. Use ``note.id`` to target an existing Note.
:type note: Note, optional
:param readers: List of group IDs that can read this edit.
:type readers: list[str], optional
:param writers: List of group IDs that can modify this edit.
:type writers: list[str], optional
:param nonreaders: List of group IDs excluded from reading this edit.
:type nonreaders: list[str], optional
:param content: Additional content fields for the edit itself (not the Note).
:type content: dict, optional
:param await_process: If True, blocks until the server-side process function completes; raises OpenReviewException on error.
:type await_process: bool, optional
:return: Dictionary containing the posted edit, including the assigned edit ``id`` and the ``note`` with its ``id``.
:rtype: dict
"""
edit_json = {
'invitation': invitation,
'note': note.to_json() if note else {}
}
if signatures is not None:
edit_json['signatures'] = signatures
if readers is not None:
edit_json['readers'] = readers
if writers is not None:
edit_json['writers'] = writers
if nonreaders is not None:
edit_json['nonreaders'] = nonreaders
if content is not None:
edit_json['content'] = content
response = self.session.post(self.note_edits_url, json = edit_json, headers = self.headers)
response = self.__handle_response(response)
if await_process:
self.__await_process(response.json()['id'])
return response.json()
[docs]
def post_group_edit(self, invitation, signatures=None, group=None, readers=None, writers=None, content=None, replacement=None, await_process=False, flush_members_cache=True):
"""Create or update a Group via the edit system.
Posts an edit that creates a new Group or modifies an existing one. The
edit is validated against the specified invitation's schema. When the edit
modifies group members and the signature matches the domain, the members
cache is automatically flushed unless ``flush_members_cache`` is False.
:param invitation: Invitation ID that defines the schema and permissions for this edit.
:type invitation: str
:param signatures: List of group IDs signing this edit.
:type signatures: list[str], optional
:param group: Group object containing the fields to create or update. Use ``group.id`` to target an existing Group.
:type group: Group, optional
:param readers: List of group IDs that can read this edit.
:type readers: list[str], optional
:param writers: List of group IDs that can modify this edit.
:type writers: list[str], optional
:param content: Additional content fields for the edit itself.
:type content: dict, optional
:param replacement: If True, the edit fully replaces the existing Group rather than merging fields.
:type replacement: bool, optional
:param await_process: If True, blocks until the server-side process function completes; raises OpenReviewException on error.
:type await_process: bool, optional
:param flush_members_cache: If True (default), flushes the members cache for affected members when the domain signs the edit.
:type flush_members_cache: bool, optional
:return: Dictionary containing the posted edit, including the assigned edit ``id``.
:rtype: dict
"""
edit_json = {
'invitation': invitation
}
if group is not None:
edit_json['group'] = group.to_json()
if signatures is not None:
edit_json['signatures'] = signatures
if readers is not None:
edit_json['readers'] = readers
if writers is not None:
edit_json['writers'] = writers
if content is not None:
edit_json['content'] = content
if replacement is not None:
edit_json['replacement'] = replacement
response = self.session.post(self.group_edits_url, json = edit_json, headers = self.headers)
response = self.__handle_response(response)
posted_edit = response.json()
members = posted_edit.get('group', {}).get('members')
if posted_edit['domain'] in posted_edit['signatures']:
if flush_members_cache:
members_to_flush = []
if isinstance(members, dict):
if 'add' in members:
for member in members['add']:
members_to_flush.append(member)
elif 'remove' in members:
for member in members['remove']:
members_to_flush.append(member)
if isinstance(members, list):
members_to_flush = members
for member in members_to_flush:
self.flush_members_cache(member)
if await_process:
self.__await_process(response.json()['id'])
return response.json()
[docs]
def post_edit(self, edit):
"""Post an Edit object, routing it to the correct endpoint based on its contents.
Inspects the serialized edit for the presence of ``note``, ``group``, or
``invitation`` keys and POSTs to the corresponding edits endpoint
(``/notes/edits``, ``/groups/edits``, or ``/invitations/edits``).
:param edit: Edit object to post. Must contain exactly one of ``note``, ``group``, or ``invitation``.
:type edit: Edit
:return: Dictionary containing the posted edit, including the assigned edit ``id``.
:rtype: dict
"""
edit_json = edit.to_json()
if 'note' in edit_json:
response = self.session.post(self.note_edits_url, json = edit_json, headers = self.headers)
elif 'group' in edit_json:
response = self.session.post(self.group_edits_url, json = edit_json, headers = self.headers)
elif 'invitation' in edit_json:
response = self.session.post(self.invitation_edits_url, json = edit_json, headers = self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def get_jobs_status(self):
"""
**Only for Super User**. Retrieves the jobs status of the queue
:return: Jobs status
:rtype: dict
"""
response = self.session.get(self.jobs_status, headers=self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def request_raw_expertise(self, expertise_request, baseurl=None):
"""
Calls the Expertise API with a raw expertise request.
:param expertise_request: Dictionary containing the expertise request to be sent to the Expertise API
:type expertise_request: dict
:param baseurl: URL to the host, example: https://api.openreview.net (should be replaced by 'host' name). If none is provided, it defaults to the environment variable `OPENREVIEW_API_BASEURL_V2`
:type baseurl: str, optional
:return: Dictionary containing the response from the Expertise API
:rtype: dict
"""
base_url = baseurl if baseurl else self.baseurl
response = self.session.post(base_url + '/expertise', json = expertise_request, headers = self.headers)
response = self.__handle_response(response)
return response.json()
def request_expertise(self,
name,
group_id,
venue_id,
submission_content=None,
alternate_match_group = None,
alternate_expertise_selection_id = None,
expertise_selection_id=None,
model=None,
baseurl=None,
weight=None,
top_recent_pubs=None,
percentile_selection=None):
# Build entityA from group_id
entityA = {
'type': 'Group',
'memberOf': group_id
}
if expertise_selection_id and tools.get_invitation(self, expertise_selection_id):
expertise = { 'invitation': expertise_selection_id }
entityA['expertise'] = expertise
# Build entityB from alternate_match_group or venue_id
if alternate_match_group:
entityB = {
'type': 'Group',
'memberOf': alternate_match_group
}
if alternate_expertise_selection_id and tools.get_invitation(self, alternate_expertise_selection_id):
expertise = { 'invitation': alternate_expertise_selection_id }
entityB['expertise'] = expertise
else:
entityB = {
'type': 'Note',
'withVenueid': venue_id,
'withContent': submission_content
}
expertise_request = {
'name': name,
'entityA': entityA,
'entityB': entityB,
'model': {
'name': model
}
}
if weight:
expertise_request['dataset'] = {
'weightSpecification': weight
}
if top_recent_pubs:
expertise_request['dataset'] = {
'topRecentPubs': top_recent_pubs
}
if percentile_selection:
expertise_request['model']['percentileSelect'] = percentile_selection
base_url = baseurl if baseurl else self.baseurl
response = self.session.post(base_url + '/expertise', json = expertise_request, headers = self.headers)
response = self.__handle_response(response)
return response.json()
def request_single_paper_expertise(self, name, group_id, paper_id, expertise_selection_id=None, model=None, baseurl=None):
# Build entityA from group_id
entityA = {
'type': 'Group',
'memberOf': group_id
}
if expertise_selection_id and tools.get_invitation(self, expertise_selection_id):
expertise = { 'invitation': expertise_selection_id }
entityA['expertise'] = expertise
# Build entityB from paper_id
entityB = {
'type': 'Note',
'id': paper_id
}
expertise_request = {
'name': name,
'entityA': entityA,
'entityB': entityB,
'model': {
'name': model
}
}
base_url = baseurl if baseurl else self.baseurl
if base_url.startswith('http://localhost'):
return {}
print('compute expertise', {'name': name, 'match_group': group_id , 'paper_id': paper_id, 'model': model})
response = self.session.post(base_url + '/expertise', json = expertise_request, headers = self.headers)
print('response json', response.json())
response = self.__handle_response(response)
return response.json()
[docs]
def request_paper_similarity(self, name, venue_id=None, alternate_venue_id=None, invitation=None, alternate_invitation=None, submissions=None, alternate_submissions=None,model='specter2+scincl', sparse_value=400, baseurl=None):
"""
Call to the Expertise API to compute paper-to-paper similarity scores. This can be between 2 different venues or between submissions of the same venue.
:param name: name of the job
:type name: str
:param venue_id: paper venue id for entity A, e.g. venue_id/Submission for active papers
:type venue_id: str, optional
:param alternate_venue_id: paper venue id for entity B, e.g. venue_id/Submission for active papers
:type alternate_venue_id: str, optional
:param invitation: invitation to retrieve papers for entity A, e.g. venue_id/-/Submission
:type invitation: str, optional
:param alternate_invitation: invitation to retrieve papers for entity B, e.g. venue_id/-/Submission
:type alternate_invitation: str, optional
:param submissions: list of submission notes for entity A
:type submissions: list
:param alternate_submissions: list of submission notes for entity B
:type alternate_submissions: list
:param model: model used to compute scores, e.g. "specter2+scincl"
:type model: str, optional
:param sparse_value: number of top scores to retain per paper. Default and max is 400.
:type sparse_value: int, optional
:param baseurl: URL to the host, example: https://api.openreview.net (should be replaced by 'host' name). If none is provided, it defaults to the environment variable `OPENREVIEW_API_BASEURL_V2`
:type baseurl: str, optional
:return: Dictionary containing the job id
:rtype: dict
"""
# Check entity A params
if sum(map(bool, [venue_id, invitation, submissions])) != 1:
raise OpenReviewException('Provide exactly one of the following: venue_id, invitation, submissions')
# Check entity B params
if sum(map(bool, [alternate_venue_id, alternate_invitation, alternate_submissions])) != 1:
raise OpenReviewException('Provide exactly one of the following: alternate_venue_id, alternate_invitation, alternate_submissions')
if sparse_value > 400:
raise OpenReviewException('Sparse value should be no greater than 400')
entityA = {
'type': "Note"
}
entityB = {
'type': "Note"
}
# Build entity A
if venue_id:
entityA['withVenueid'] = venue_id
elif invitation:
entityA['invitation'] = invitation
elif submissions:
formatted_submissions = [
{
'id': submission.id,
'title': submission.content.get('title', {}).get('value', ''),
'abstract': submission.content.get('abstract', {}).get('value', '')
}
for submission in submissions
]
entityA['submissions'] = formatted_submissions
# Build entity B
if alternate_venue_id:
entityB['withVenueid'] = alternate_venue_id
elif alternate_invitation:
entityB['invitation'] = alternate_invitation
elif alternate_submissions:
formatted_submissions = [
{
'id': submission.id,
'title': submission.content.get('title', {}).get('value', ''),
'abstract': submission.content.get('abstract', {}).get('value', '')
}
for submission in alternate_submissions
]
entityB['submissions'] = formatted_submissions
expertise_request = {
"name": name,
"entityA": entityA,
"entityB": entityB,
"model": {
"name": model,
'useTitle': True,
'useAbstract': True,
'skipSpecter': False,
'scoreComputation': 'max',
'sparseValue': sparse_value
}
}
base_url = baseurl if baseurl else self.baseurl
response = self.session.post(base_url + '/expertise', json = expertise_request, headers = self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def request_paper_subset_expertise(self, name, submissions, group_id, expertise_selection_id=None, model='specter2+scincl', weight=None, baseurl=None):
"""
Call to the Expertise API to compute scores for a subset of papers to a group.
:param name: name of the job
:type name: str
:param submissions: list of submission notes
:type submissions: list
:param group_id: id of group to compute scores against
:type group_id: str
:param expertise_selection_id: id of expertise selection invitation for group
:type expertise_selection_id: str, optional
:param model: model used to compute scores, e.g. "specter2+scincl"
:type model: str, optional
:param weight: list of dictionaries that specify weights for publications
:type weight: list[dict], optional
:param baseurl: URL to the host, example: https://api.openreview.net (should be replaced by 'host' name). If none is provided, it defaults to the environment variable `OPENREVIEW_API_BASEURL_V2`
:type baseurl: str, optional
:return: Dictionary containing the job id
:rtype: dict
"""
# Build entityA from group_id
entityA = {
'type': 'Group',
'memberOf': group_id
}
if expertise_selection_id and tools.get_invitation(self, expertise_selection_id):
expertise = { 'invitation': expertise_selection_id }
entityA['expertise'] = expertise
# Build entityB using submissions
formatted_submissions = [
{
'id': submission.id,
'title': submission.content.get('title', {}).get('value', ''),
'abstract': submission.content.get('abstract', {}).get('value', '')
}
for submission in submissions
]
entityB = {
'type': "Note",
'submissions': formatted_submissions
}
model_config = {
'name': model,
'normalizeScores': False
}
expertise_request = {
"name": name,
"entityA": entityA,
"entityB": entityB,
"model": model_config
}
if weight:
expertise_request['dataset'] = {
'weightSpecification': weight
}
base_url = baseurl if baseurl else self.baseurl
response = self.session.post(base_url + '/expertise', json = expertise_request, headers = self.headers)
response = self.__handle_response(response)
return response.json()
[docs]
def request_user_subset_expertise(self, name, members, expertise_selection_id=None, venue_id=None, invitation=None, model='specter2+scincl', weight=None, baseurl=None):
"""
Call to the Expertise API to compute scores for a subset of users to papers.
:param name: name of the job
:type name: str
:param members: list of profile IDs for which to compute scores
:type members: list[str]
:param expertise_selection_id: id of expertise selection invitation for members
:type expertise_selection_id: str, optional
:param venue_id: paper venue id used to retrieve papers, e.g. venue_id/Submission for active papers
:type venue_id: str, optional
:param invitation: invitation used to retrieve papers, e.g. venue_id/-/Submission
:type invitation: str, optional
:param model: model used to compute scores, e.g. "specter2+scincl"
:type model: str, optional
:param weight: list of dictionaries that specify weights for publications
:type weight: list[dict], optional
:param baseurl: URL to the host, example: https://api.openreview.net (should be replaced by 'host' name). If none is provided, it defaults to the environment variable `OPENREVIEW_API_BASEURL_V2`
:type baseurl: str, optional
:return: Dictionary containing the job id
:rtype: dict
"""
# Check entity B params
if bool(venue_id) == bool(invitation):
raise OpenReviewException('Provide exactly one of the following: venue_id, invitation')
# Build entityA from members
entityA = {
'type': "Group",
'reviewerIds': members
}
if expertise_selection_id and tools.get_invitation(self, expertise_selection_id):
expertise = { 'invitation': expertise_selection_id }
entityA['expertise'] = expertise
# Build entityB
entityB = {
'type': "Note"
}
if venue_id:
entityB['withVenueid'] = venue_id
elif invitation:
entityB['invitation'] = invitation
model_config = {
'name': model,
'normalizeScores': False
}
expertise_request = {
"name": name,
"entityA": entityA,
"entityB": entityB,
"model": model_config
}
if weight:
expertise_request['dataset'] = {
'weightSpecification': weight
}
base_url = baseurl if baseurl else self.baseurl
response = self.session.post(base_url + '/expertise', json = expertise_request, headers = self.headers)
response = self.__handle_response(response)
return response.json()
def get_expertise_status(self, job_id=None, group_id=None, paper_id=None, baseurl=None):
print('get expertise status', baseurl, job_id, group_id, paper_id)
base_url = baseurl if baseurl else self.baseurl
if base_url.startswith('http://localhost'):
print('get expertise status localhost, return Completed')
if job_id:
return { 'status': 'Completed', 'jobId': job_id }
return { 'results': [{ 'status': 'Completed', 'jobId': None }]}
params = {}
if job_id:
params['jobId'] = job_id
if group_id:
params['entityA.memberOf'] = group_id
if paper_id:
params['entityB.id'] = paper_id
response = self.session.get(base_url + '/expertise/status', params = params, headers = self.headers)
response = self.__handle_response(response)
response_json = response.json()
print('get expertise status', response_json)
return response_json
def get_expertise_jobs(self, status=None, baseurl=None):
print('get expertise jobs', baseurl, status)
base_url = baseurl if baseurl else self.baseurl
if base_url.startswith('http://localhost'):
print('get expertise jobs localhost, return []')
return { 'results': [] }
params = {}
if status:
params['status'] = status
response = self.session.get(base_url + '/expertise/status/all', params = params, headers = self.headers)
response = self.__handle_response(response)
response_json = response.json()
print('get expertise jobs', response_json)
return response_json
def get_expertise_metadata(self, job_id, baseurl=None):
print('get expertise metadata', baseurl, job_id)
base_url = baseurl if baseurl else self.baseurl
if base_url.startswith('http://localhost'):
print('get expertise metadata localhost, return {}')
return {}
response = self.session.get(base_url + '/expertise/metadata', params = {'jobId': job_id}, headers = self.headers)
response = self.__handle_response(response)
response_json = response.json()
print('get expertise metadata', response_json)
return response_json
def get_expertise_results(self, job_id, baseurl=None, wait_for_complete=False, format='json'):
print('get expertise results', baseurl, job_id)
base_url = baseurl if baseurl else self.baseurl
call_max = 500
if base_url.startswith('http://localhost'):
print('return expertise results localhost, return []')
return iter([]) if format == 'csv' else { 'results': [] }
if wait_for_complete:
call_count = 0
status_response = self.get_expertise_status(job_id, baseurl=base_url)
status = status_response.get('status')
status_text = status if isinstance(status, str) else ''
while 'Completed' != status_text and 'Error' not in status_text and call_count < call_max:
time.sleep(60)
status_response = self.get_expertise_status(job_id, baseurl=base_url)
status = status_response.get('status')
status_text = status if isinstance(status, str) else ''
call_count += 1
if 'Completed' == status_text:
return self.get_expertise_results(job_id, baseurl=base_url, format=format)
if 'Error' in status_text:
raise OpenReviewException('There was an error computing scores, description: ' + status_response.get('description'))
if call_count == call_max:
raise OpenReviewException('Time out computing scores, description: ' + status_response.get('description'))
raise OpenReviewException('Unknown error, description: ' + status_response.get('description'))
else:
if format == 'csv':
response = self.session.get(base_url + '/expertise/results', params = {'jobId': job_id, 'format': 'csv'}, headers = self.headers, stream = True)
response = self.__handle_response(response)
print('return expertise results', baseurl, job_id)
def _iter_csv_results(response):
try:
yield from csv.DictReader(response.iter_lines(decode_unicode=True))
finally:
response.close()
return _iter_csv_results(response)
response = self.session.get(base_url + '/expertise/results', params = {'jobId': job_id}, headers = self.headers)
response = self.__handle_response(response)
print('return expertise results', baseurl, job_id)
return response.json()
[docs]
class Edit(object):
"""
:param id: Edit id
:type id: str
:param readers: List of readers in the Edit, each reader is a Group id
:type readers: list[str], optional
:param writers: List of writers in the Edit, each writer is a Group id
:type writers: list[str], optional
:param signatures: List of signatures in the Edit, each signature is a Group id
:type signatures: list[str], optional
:param note: Template of the Note that will be created
:type note: dict, optional
:param invitation: Template of the Invitation that will be created
:type invitation: dict, optional
:param nonreaders: List of nonreaders in the Edit, each nonreader is a Group id
:type nonreaders: list[str], optional
:param cdate: Creation date
:type cdate: int, optional
:param ddate: Deletion date
:type ddate: int, optional
:param tcdate: True creation date
:type tcdate: int, optional
:param tmdate: Modification date
:type tmdate: int, optional
"""
def __init__(self,
id = None,
domain = None,
invitations = None,
readers = None,
writers = None,
signatures = None,
content = None,
note = None,
group = None,
invitation = None,
nonreaders = None,
cdate = None,
tcdate = None,
tmdate = None,
ddate = None,
tauthor = None):
self.id = id
self.domain = domain
self.invitations = invitations
self.cdate = cdate
self.tcdate = tcdate
self.tmdate = tmdate
self.ddate = ddate
self.readers = readers
self.nonreaders = nonreaders
self.writers = writers
self.signatures = signatures
self.content = content
self.note = note
self.group = group
self.invitation = invitation
self.tauthor = tauthor
def __repr__(self):
content = ','.join([("%s = %r" % (attr, value)) for attr, value in vars(self).items()])
return 'Edit(' + content + ')'
def __str__(self):
pp = pprint.PrettyPrinter()
return pp.pformat(vars(self))
[docs]
def to_json(self):
"""
Converts Edit instance to a dictionary. The instance variable names are the keys and their values the values of the dictinary.
:return: Dictionary containing all the parameters of a Edit instance
:rtype: dict
"""
body = {}
if (self.id):
body['id'] = self.id
if self.invitations:
body['invitations'] = self.invitations
if (self.readers):
body['readers'] = self.readers
if (self.nonreaders):
body['nonreaders'] = self.nonreaders
if (self.writers):
body['writers'] = self.writers
if (self.signatures):
body['signatures'] = self.signatures
if (self.content):
body['content'] = self.content
if (self.note):
body['note'] = self.note.to_json()
if (self.group):
body['group'] = self.group.to_json()
if isinstance(self.invitation, Invitation):
body['invitation'] = self.invitation.to_json()
if isinstance(self.invitation, str):
body['invitation'] = self.invitation
if (self.ddate):
body['ddate'] = self.ddate
return body
[docs]
@classmethod
def from_json(Edit,e):
"""
Creates an Edit object from a dictionary that contains keys values equivalent to the name of the instance variables of the Edit class
:param i: Dictionary containing key-value pairs, where the keys values are equivalent to the name of the instance variables in the Edit class
:type i: dict
:return: Edit whose instance variables contain the values from the dictionary
:rtype: Edit
"""
edit = Edit(e.get('id'),
domain = e.get('domain'),
invitations = e.get('invitations'),
cdate = e.get('cdate'),
tcdate = e.get('tcdate'),
tmdate = e.get('tmdate'),
ddate = e.get('ddate'),
readers = e.get('readers'),
nonreaders = e.get('nonreaders'),
writers = e.get('writers'),
signatures = e.get('signatures'),
content = e.get('content'),
note = Note.from_json(e['note']) if 'note' in e else None,
group = Group.from_json(e['group']) if 'group' in e else None,
invitation = e.get('invitation'),
tauthor = e.get('tauthor')
)
if isinstance(edit.invitation, dict):
edit.invitation = Invitation.from_json(edit.invitation)
return edit
[docs]
class Note(object):
"""
TODO: write docs
"""
def __init__(self,
invitations=None,
parent_invitations=None,
readers=None,
writers=None,
signatures=None,
content=None,
id=None,
external_id=None,
external_ids=None,
number=None,
cdate=None,
pdate=None,
odate=None,
mdate=None,
tcdate=None,
tmdate=None,
ddate=None,
forum=None,
replyto=None,
nonreaders=None,
domain=None,
details = None,
license=None):
self.id = id
self.external_id = external_id
self.external_ids = external_ids
self.number = number
self.cdate = cdate
self.pdate = pdate
self.odate = odate
self.mdate = mdate
self.tcdate = tcdate
self.tmdate = tmdate
self.ddate = ddate
self.content = content
self.forum = forum
self.replyto = replyto
self.readers = readers
self.nonreaders = nonreaders
self.signatures = signatures
self.writers = writers
self.number = number
self.details = details
self.invitations = invitations
self.parent_invitations = parent_invitations
self.domain = domain
self.license = license
def __repr__(self):
content = ','.join([("%s = %r" % (attr, value)) for attr, value in vars(self).items()])
return 'Note(' + content + ')'
def __str__(self):
pp = pprint.PrettyPrinter()
return pp.pformat(vars(self))
@property
def authors(self):
"""
Returns the authors as a canonical list of ``{'fullname', 'username'}`` dicts,
regardless of whether the underlying content stores them as a list of objects
(current schema) or as parallel ``authors``/``authorids`` arrays (legacy schema).
"""
if not self.content:
return []
authors_value = self.content.get('authors', {}).get('value') or []
if authors_value and isinstance(authors_value[0], dict):
return [{'fullname': a.get('fullname', ''), 'username': a.get('username', '')} for a in authors_value]
authorids_value = self.content.get('authorids', {}).get('value') or []
return [
{
'fullname': authors_value[i] if i < len(authors_value) else '',
'username': authorids_value[i] if i < len(authorids_value) else ''
}
for i in range(max(len(authors_value), len(authorids_value)))
]
[docs]
def to_json(self):
"""
Converts Note instance to a dictionary. The instance variable names are the keys and their values the values of the dictinary.
:return: Dictionary containing all the parameters of a Note instance
:rtype: dict
"""
body = {
}
if self.id:
body['id'] = self.id
if self.external_id:
body['externalId'] = self.external_id
if self.forum:
body['forum'] = self.forum
if self.replyto:
body['replyto'] = self.replyto
if self.content:
body['content'] = self.content
if self.invitations:
body['invitations'] = self.invitations
if self.parent_invitations:
body['parentInvitations'] = self.parent_invitations
if self.cdate:
body['cdate'] = self.cdate
if self.pdate:
body['pdate'] = self.pdate
if self.odate:
body['odate'] = self.odate
if self.mdate:
body['mdate'] = self.mdate
if self.ddate:
body['ddate'] = self.ddate
if self.nonreaders is not None:
body['nonreaders'] = self.nonreaders
if self.signatures:
body['signatures'] = self.signatures
if self.writers:
body['writers'] = self.writers
if self.readers:
body['readers'] = self.readers
if self.license:
body['license'] = self.license
return body
[docs]
@classmethod
def from_json(Note,n):
"""
Creates a Note object from a dictionary that contains keys values equivalent to the name of the instance variables of the Note class
:param n: Dictionary containing key-value pairs, where the keys values are equivalent to the name of the instance variables in the Note class
:type n: dict
:return: Note whose instance variables contain the values from the dictionary
:rtype: Note
"""
note = Note(
id = n.get('id'),
external_ids = n.get('externalIds'),
number = n.get('number'),
cdate = n.get('cdate'),
mdate = n.get('mdate'),
pdate = n.get('pdate'),
odate = n.get('odate'),
tcdate = n.get('tcdate'),
tmdate =n.get('tmdate'),
ddate=n.get('ddate'),
content=n.get('content'),
forum=n.get('forum'),
invitations=n.get('invitations'),
parent_invitations=n.get('parentInvitations'),
replyto=n.get('replyto'),
readers=n.get('readers'),
nonreaders=n.get('nonreaders'),
signatures=n.get('signatures'),
writers=n.get('writers'),
details=n.get('details'),
domain=n.get('domain'),
license=n.get('license')
)
return note
[docs]
class Invitation(object):
"""
"""
def __init__(self,
id = None,
invitations = None,
parent_invitations = None,
domain = None,
readers = None,
writers = None,
invitees = None,
signatures = None,
edit = None,
edge = None,
tag = None,
message = None,
type = 'Note',
noninvitees = None,
nonreaders = None,
web = None,
process = None,
preprocess = None,
date_processes = None,
post_processes = None,
duedate = None,
expdate = None,
cdate = None,
ddate = None,
tcdate = None,
tmdate = None,
minReplies = None,
maxReplies = None,
bulk = None,
content = None,
reply_forum_views = [],
responseArchiveDate = None,
details = None,
description = None,
instructions = None,
guestPosting = None,
secret = None,
humanVerificationRequired = None):
self.id = id
self.invitations = invitations
self.parent_invitations = parent_invitations
self.domain = domain
self.cdate = cdate
self.ddate = ddate
self.duedate = duedate
self.expdate = expdate
self.readers = readers
self.nonreaders = nonreaders
self.writers = writers
self.invitees = invitees
self.noninvitees = noninvitees
self.signatures = signatures
self.minReplies = minReplies
self.maxReplies = maxReplies
self.edit = edit
self.edge = edge
self.tag = tag
self.message = message
self.type = type
self.tcdate = tcdate
self.tmdate = tmdate
self.bulk = bulk
self.details = details
self.reply_forum_views = reply_forum_views
self.responseArchiveDate = responseArchiveDate
self.web = web
self.process = process
self.preprocess = preprocess
self.date_processes = date_processes
self.post_processes = post_processes
self.content = content
self.description = description
self.instructions = instructions
self.guestPosting = guestPosting
self.secret = secret
self.humanVerificationRequired = humanVerificationRequired
def __repr__(self):
content = ','.join([("%s = %r" % (attr, value)) for attr, value in vars(self).items()])
return 'Invitation(' + content + ')'
def __str__(self):
pp = pprint.PrettyPrinter()
return pp.pformat(vars(self))
def is_active(self):
now = tools.datetime_millis(datetime.datetime.now())
cdate = self.cdate if self.cdate else now
edate = self.expdate if self.expdate else now
return cdate <= now and now <= edate
def get_content_value(self, field_name, default_value=None):
if self.content:
return self.content.get(field_name, {}).get('value', default_value)
return default_value
def pretty_id(self):
tokens = self.id.split('/')[-2:]
filtered_tokens = []
for token in tokens:
token = token.replace('_', ' ').strip()
if token.startswith('~'):
token = tools.pretty_id(token)
if token != '-':
filtered_tokens.append(token)
return (' ').join(filtered_tokens)
[docs]
def to_json(self):
"""
Converts Invitation instance to a dictionary. The instance variable names are the keys and their values the values of the dictinary.
:return: Dictionary containing all the parameters of a Invitation instance
:rtype: dict
"""
body = {}
if self.id:
body['id'] = self.id
if self.parent_invitations:
body['parentInvitations'] = self.parent_invitations
if self.cdate:
body['cdate'] = self.cdate
if self.ddate:
body['ddate'] = self.ddate
if self.duedate:
body['duedate'] = self.duedate
if self.expdate:
body['expdate'] = self.expdate
if self.readers:
body['readers'] = self.readers
if self.nonreaders:
body['nonreaders'] = self.nonreaders
if self.writers:
body['writers'] = self.writers
if self.invitees:
body['invitees'] = self.invitees
if self.noninvitees:
body['noninvitees'] = self.noninvitees
if self.signatures:
body['signatures'] = self.signatures
if self.reply_forum_views:
body['replyForumViews'] = self.reply_forum_views
if self.content:
body['content'] = self.content
if self.responseArchiveDate:
body['responseArchiveDate'] = self.responseArchiveDate
if self.description:
body['description'] = self.description
if self.instructions:
body['instructions'] = self.instructions
if self.minReplies:
body['minReplies']=self.minReplies
if self.maxReplies:
body['maxReplies']=self.maxReplies
if self.web:
body['web']=self.web
if self.process:
body['process']=self.process
if self.preprocess:
body['preprocess']=self.preprocess
if self.date_processes:
body['dateprocesses']=self.date_processes
if self.post_processes:
body['postprocesses']=self.post_processes
if self.edit is not None:
if self.type == 'Note':
body['edit']=self.edit
if self.type == 'Edge':
body['edge']=self.edit
if self.type == 'Tag':
body['tag']=self.edit
if self.edge:
body['edge']=self.edge
if self.tag:
body['tag']=self.tag
if self.message:
body['message']=self.message
if self.bulk is not None:
body['bulk']=self.bulk
if self.guestPosting is not None:
body['guestPosting']=self.guestPosting
if self.secret is not None:
body['secret']=self.secret
if self.humanVerificationRequired is not None:
body['humanVerificationRequired']=self.humanVerificationRequired
return body
[docs]
@classmethod
def from_json(Invitation,i):
"""
Creates an Invitation object from a dictionary that contains keys values equivalent to the name of the instance variables of the Invitation class
:param i: Dictionary containing key-value pairs, where the keys values are equivalent to the name of the instance variables in the Invitation class
:type i: dict
:return: Invitation whose instance variables contain the values from the dictionary
:rtype: Invitation
"""
invitation = Invitation(i['id'],
invitations = i.get('invitations'),
parent_invitations = i.get('parentInvitations'),
domain = i.get('domain'),
cdate = i.get('cdate'),
ddate = i.get('ddate'),
tcdate = i.get('tcdate'),
tmdate = i.get('tmdate'),
duedate = i.get('duedate'),
expdate = i.get('expdate'),
readers = i.get('readers'),
nonreaders = i.get('nonreaders'),
writers = i.get('writers'),
invitees = i.get('invitees'),
noninvitees = i.get('noninvitees'),
signatures = i.get('signatures'),
minReplies = i.get('minReplies'),
edit = i.get('edit'),
maxReplies = i.get('maxReplies'),
details = i.get('details'),
reply_forum_views = i.get('replyForumViews'),
responseArchiveDate = i.get('responseArchiveDate'),
description = i.get('description'),
instructions = i.get('instructions'),
bulk = i.get('bulk')
)
if 'content' in i:
invitation.content = i['content']
if 'web' in i:
invitation.web = i['web']
if 'process' in i:
invitation.process = i['process']
if 'transform' in i:
invitation.transform = i['transform']
if 'preprocess' in i:
invitation.preprocess = i['preprocess']
if 'dateprocesses' in i:
invitation.date_processes = i['dateprocesses']
if 'postprocesses' in i:
invitation.post_processes = i['postprocesses']
if 'edge' in i:
invitation.edit = i['edge']
invitation.type = 'Edge'
if 'tag' in i:
invitation.edit = i['tag']
invitation.type = 'Tag'
if 'message' in i:
invitation.message = i['message']
invitation.type = 'Message'
if 'guestPosting' in i:
invitation.guestPosting = i['guestPosting']
if 'secret' in i:
invitation.secret = i['secret']
if 'humanVerificationRequired' in i:
invitation.humanVerificationRequired = i['humanVerificationRequired']
return invitation
class Edge(object):
def __init__(self, head, tail, invitation, domain=None, readers=None, writers=None, signatures=None, id=None, weight=None, label=None, cdate=None, ddate=None, nonreaders=None, tcdate=None, tmdate=None, tddate=None, tauthor=None):
self.id = id
self.invitation = invitation
self.domain = domain
self.head = head
self.tail = tail
self.weight = weight
self.label = label
self.cdate = cdate
self.ddate = ddate
self.readers = readers
self.nonreaders = nonreaders
self.writers = writers
self.signatures = signatures
self.tcdate = tcdate
self.tmdate = tmdate
self.tddate = tddate
self.tauthor = tauthor
def to_json(self):
'''
Returns serialized json string for a given object
'''
body = {
'invitation': self.invitation,
'head': self.head,
'tail': self.tail
}
if self.id:
body['id'] = self.id
if self.ddate:
body['ddate'] = self.ddate
if self.readers is not None:
body['readers'] = self.readers
if self.writers is not None:
body['writers'] = self.writers
if self.nonreaders is not None:
body['nonreaders'] = self.nonreaders
if self.signatures is not None:
body['signatures'] = self.signatures
if self.weight is not None:
body['weight'] = self.weight
if self.label is not None:
body['label'] = self.label
if self.cdate is not None:
body['cdate'] = self.cdate
return body
@classmethod
def from_json(Edge, e):
'''
Returns a deserialized object from a json string
:arg t: The json string consisting of a serialized object of type "Edge"
'''
edge = Edge(
id = e.get('id'),
domain = e.get('domain'),
cdate = e.get('cdate'),
tcdate = e.get('tcdate'),
tmdate = e.get('tmdate'),
ddate = e.get('ddate'),
tddate = e.get('tddate'),
invitation = e.get('invitation'),
readers = e.get('readers'),
nonreaders = e.get('nonreaders'),
writers = e.get('writers'),
signatures = e.get('signatures'),
head = e.get('head'),
tail = e.get('tail'),
weight = e.get('weight'),
label = e.get('label'),
tauthor=e.get('tauthor')
)
return edge
def __repr__(self):
content = ','.join([("%s = %r" % (attr, value)) for attr, value in vars(self).items()])
return 'Edge(' + content + ')'
def __str__(self):
pp = pprint.PrettyPrinter()
return pp.pformat(vars(self))
[docs]
class Group(object):
"""
When a user is created, it is automatically assigned to certain groups that give him different privileges. A username is also a group, therefore, groups can be members of other groups.
:param id: id of the Group
:type id: str
:param readers: List of readers in the Group, each reader is a Group id
:type readers: list[str]
:param writers: List of writers in the Group, each writer is a Group id
:type writers: list[str]
:param signatories: List of signatories in the Group, each writer is a Group id
:type signatories: list[str]
:param signatures: List of signatures in the Group, each signature is a Group id
:type signatures: list[str]
:param cdate: Creation date of the Group
:type cdate: int, optional
:param ddate: Deletion date of the Group
:type ddate: int, optional
:param tcdate: true creation date of the Group
:type tcdate: int, optional
:param tmdate: true modification date of the Group
:type tmdate: int, optional
:param members: List of members in the Group, each member is a Group id
:type members: list[str], optional
:param nonreaders: List of nonreaders in the Group, each nonreader is a Group id
:type nonreaders: list[str], optional
:param web: Path to a file that contains the webfield
:type web: optional
:param web_string: String containing the webfield for this Group
:type web_string: str, optional
:param details:
:type details: optional
"""
def __init__(self, id=None, content=None, readers=None, writers=None, signatories=None, signatures=None, invitation=None, invitations=None, parent_invitations=None, cdate = None, ddate = None, tcdate=None, tmdate=None, members = None, nonreaders = None, impersonators=None, web = None, anonids= None, deanonymizers=None, host=None, domain=None, parent = None, details = None, description = None):
# post attributes
self.id=id
self.invitation=invitation
self.invitations = invitations
self.parent_invitations = parent_invitations
self.content = content
self.cdate = cdate
self.ddate = ddate
self.tcdate = tcdate
self.tmdate = tmdate
self.writers = writers
self.members = members
self.readers = readers
self.nonreaders = nonreaders
self.signatures = signatures
self.signatories = signatories
self.anonids = anonids
self.web=web
self.impersonators = impersonators
self.host = host
self.domain = domain
self.parent = parent
self.description = description
self.anonids = anonids
self.deanonymizers = deanonymizers
self.details = details
self.anon_members = []
def get_content_value(self, field_name, default_value=None):
if self.content:
return self.content.get(field_name, {}).get('value', default_value)
return default_value
def __repr__(self):
content = ','.join([("%s = %r" % (attr, value)) for attr, value in vars(self).items()])
return 'Group(' + content + ')'
def __str__(self):
pp = pprint.PrettyPrinter()
return pp.pformat(vars(self))
[docs]
def to_json(self):
"""
Converts Group instance to a dictionary. The instance variable names are the keys and their values the values of the dictinary.
:return: Dictionary containing all the parameters of a Group instance
:rtype: dict
"""
body = {}
if self.id is not None:
body['id'] = self.id
if self.content is not None:
body['content'] = self.content
if self.signatures is not None:
body['signatures'] = self.signatures
if self.writers is not None:
body['writers'] = self.writers
if self.members is not None:
body['members'] = self.members
if self.readers is not None:
body['readers'] = self.readers
if self.invitation is not None:
body['invitation'] = self.invitation
if self.parent_invitations is not None:
body['parentInvitations'] = self.parent_invitations
if self.cdate is not None:
body['cdate'] = self.cdate
if self.ddate is not None:
body['ddate'] = self.ddate
if self.host is not None:
body['host'] = self.host
if self.impersonators is not None:
body['impersonators'] = self.impersonators
if self.anonids is not None:
body['anonids'] = self.anonids
if self.deanonymizers is not None:
body['deanonymizers'] = self.deanonymizers
if self.web is not None:
body['web'] = self.web
if self.nonreaders is not None:
body['nonreaders'] = self.nonreaders
if self.signatories is not None:
body['signatories'] = self.signatories
if self.description is not None:
body['description'] = self.description
return body
[docs]
@classmethod
def from_json(Group,g):
"""
Creates a Group object from a dictionary that contains keys values equivalent to the name of the instance variables of the Group class
:param g: Dictionary containing key-value pairs, where the keys values are equivalent to the name of the instance variables in the Group class
:type g: dict
:return: Group whose instance variables contain the values from the dictionary
:rtype: Group
"""
group = Group(g['id'],
content=g.get('content'),
invitation=g.get('invitation'),
invitations=g.get('invitations'),
parent_invitations=g.get('parentInvitations'),
cdate = g.get('cdate'),
ddate = g.get('ddate'),
tcdate = g.get('tcdate'),
tmdate = g.get('tmdate'),
writers = g.get('writers'),
members = g.get('members'),
readers = g.get('readers'),
nonreaders = g.get('nonreaders'),
signatories = g.get('signatories'),
signatures = g.get('signatures'),
anonids=g.get('anonids'),
deanonymizers=g.get('deanonymizers'),
impersonators=g.get('impersonators'),
host=g.get('host'),
web=g.get('web'),
domain=g.get('domain'),
parent=g.get('parent'),
details = g.get('details'),
description = g.get('description'))
return group
[docs]
def add_member(self, member):
"""
Adds a member to the group. This is done only on the object not in OpenReview. Another method like :meth:`~openreview.Group.post` is needed for the change to show in OpenReview
:param member: Member to add to the group
:type member: str
:return: Group with the new member added
:rtype: Group
"""
if type(member) is Group:
self.members.append(member.id)
else:
self.members.append(str(member))
return self
[docs]
def remove_member(self, member):
"""
Removes a member from the group. This is done only on the object not in OpenReview. Another method like :meth:`~openreview.Group.post` is needed for the change to show in OpenReview
:param member: Member to remove from the group
:type member: str
:return: Group after the member was removed
:rtype: Group
"""
if type(member) is Group:
try:
self.members.remove(member.id)
except(ValueError):
pass
else:
try:
self.members.remove(str(member))
except(ValueError):
pass
return self
[docs]
def add_webfield(self, web):
"""
Adds a webfield to the group
:param web: Path to the file that contains the webfield
:type web: str
"""
with open(web) as f:
self.web = f.read()
[docs]
def post(self, client):
"""
Posts a group to OpenReview
:param client: Client that will post the Group
:type client: Client
"""
client.post_group(self)
def transform_to_anon_ids(self, elements):
if self.anonids:
for index, element in enumerate(elements):
if element in self.members and self.anon_members:
elements[index] = self.anon_members[self.members.index(element)]
else:
elements[index] = element
return elements
class Tag(object):
"""
:param tag: Content of the tag
:type tag: str
:param invitation: Invitation id
:type invitation: str
:param readers: List of readers in the Invitation, each reader is a Group id
:type readers: list[str]
:param signature: Signature in the Invitation, signature is a Group id
:type signature: str
:param id: Tag id
:type id: str, optional
:param cdate: Creation date
:type cdate: int, optional
:param tcdate: True creation date
:type tcdate: int, optional
:param ddate: Deletion date
:type ddate: int, optional
:param forum: Forum id
:type forum: str, optional
:param nonreaders: List of nonreaders in the Invitation, each nonreader is a Group id
:type nonreaders: list[str], optional
"""
def __init__(self, invitation, signature=None, tag=None, readers=None, writers=None, id=None, parent_invitations=None, cdate=None, tcdate=None, tmdate=None, ddate=None, forum=None, nonreaders=None, profile=None, weight=None, label=None, note=None):
self.id = id
self.cdate = cdate
self.tcdate = tcdate
self.tmdate = tmdate
self.ddate = ddate
self.tag = tag
self.parent_invitations = parent_invitations
self.forum = forum
self.invitation = invitation
self.readers = readers
self.writers = writers
self.nonreaders = [] if nonreaders is None else nonreaders
self.signature = signature
self.profile = profile
self.weight = weight
self.label = label
self.note = note
def to_json(self):
"""
Converts Tag instance to a dictionary. The instance variable names are the keys and their values the values of the dictinary.
:return: Dictionary containing all the parameters of a Tag instance
:rtype: dict
"""
body = {}
if self.id:
body['id'] = self.id
if self.ddate:
body['ddate'] = self.ddate
if self.tag:
body['tag'] = self.tag
if self.parent_invitations:
body['parentInvitations'] = self.parent_invitations
if self.forum:
body['forum'] = self.forum
if self.invitation:
body['invitation'] = self.invitation
if self.readers:
body['readers'] = self.readers
if self.writers:
body['writers'] = self.writers
if self.nonreaders:
body['nonreaders'] = self.nonreaders
if self.signature:
body['signature'] = self.signature
if self.profile:
body['profile'] = self.profile
if self.weight is not None:
body['weight'] = self.weight
if self.label:
body['label'] = self.label
if self.note:
body['note'] = self.note
if self.cdate:
body['cdate'] = self.cdate
return body
@classmethod
def from_json(Tag, t):
"""
Creates a Tag object from a dictionary that contains keys values equivalent to the name of the instance variables of the Tag class
:param n: Dictionary containing key-value pairs, where the keys values are equivalent to the name of the instance variables in the Tag class
:type n: dict
:return: Tag whose instance variables contain the values from the dictionary
:rtype: Tag
"""
tag = Tag(
id = t.get('id'),
cdate = t.get('cdate'),
tcdate = t.get('tcdate'),
tmdate = t.get('tmdate'),
ddate = t.get('ddate'),
tag = t.get('tag'),
parent_invitations = t.get('parentInvitations'),
forum = t.get('forum'),
invitation = t.get('invitation'),
readers = t.get('readers'),
writers = t.get('writers'),
nonreaders = t.get('nonreaders'),
signature = t.get('signature'),
profile = t.get('profile'),
weight = t.get('weight'),
label = t.get('label'),
note = t.get('note')
)
return tag
def __repr__(self):
content = ','.join([("%s = %r" % (attr, value)) for attr, value in vars(self).items()])
return 'Tag(' + content + ')'
def __str__(self):
pp = pprint.PrettyPrinter()
return pp.pformat(vars(self))