Source code for GRID_LRT.token

# -*- coding: utf-8 -*-
#
# Copyright (C) 2016 Alexandar P. Mechev
# All rights reserved.
#
# This software is licensed as described in the file LICENSE.md, which
# you should have received as part of this distribution.

"""
.. module:: GRID_LRT.Token
   :platform: Unix
   :synopsis: Set of tools for manually and automatically creating tokens

.. moduleauthor:: Alexandar Mechev <LOFAR@apmechev.com>
.. note:: Will be renamed GRID_LRT.token to conform to python standards

>>> #Example creation of a token of token_type 'test'
>>> from GRID_LRT.auth.get_picas_credentials import picas_cred
>>> pc=picas_cred() #Gets picas_credentials
>>>
>>> from cloudant.client import CouchDB         
>>> from GRID_LRT.token import caToken #Token that implements the cloudant interface
>>> from GRID_LRT.token import TokenList
>>> client = CouchDB(pc.user,pc.password, url='https://picas-lofar.grid.surfsara.nl:6984',connect=True)
>>> db = client[pc.database]
>>> 'token_id' in db # Checks if database includes the token
>>> db['token_id'] #Pulls the token 
>>> tl = TokenList(database=db, token_type='token_type') #makes an empty list
>>> tl.add_view(TokenView('temp',"doc.type == \"{}\" ".format(tl.token_type))) # adds a view to the token
>>> for token in tl.list_view_tokens('temp'):
        tl.append(caToken(token_type=tl.token_type, token_id= token['_id'], database=db))
        # adds every token in view to the local list (only their ids)
>>> tl.fetch() #Fetch actual data for token in list
>>> t1 = caToken(database=db, token_type='restructure_test', token_id=token_id) #make a token (locally)
>>> t1.build(TokenJsonBuilder('/path/to/token/data.json'))
>>> t1.save() #upload to the database
>>> t1.add_attachment(attachment_name='attachment_name_in_db.txt',filename='/path/to/attachment/file') #Adds attachment to token
"""

from __future__ import print_function
import sys
import os
import shutil
import itertools
import time
import tarfile
import yaml
import json
#from retrying import retry
from abc import ABCMeta, abstractmethod
import pdb

import mimetypes
import GRID_LRT
from couchdb.design import ViewDefinition
import couchdb
from cloudant import couchdb as cloudant_couchdb
from cloudant.design_document import DesignDocument
from cloudant import couchdb_admin_party
#from cloudant.client import CouchDB
from cloudant import couchdb as CaCouchClient
from cloudant.document import Document
from cloudant.error import CloudantArgumentError
from requests.exceptions import HTTPError

__version__ = GRID_LRT.__version__
__author__ = GRID_LRT.__author__
__license__ = GRID_LRT.__license__
__email__ = GRID_LRT.__email__
__copyright__ = GRID_LRT.__copyright__
__credits__ = GRID_LRT.__credits__
__maintainer__ = GRID_LRT.__maintainer__
__status__ = GRID_LRT.__status__

def get_all_design_docs(pcreds=None, srv="https://picas-lofar.grid.surfsara.nl:6984"):
    """Returns a list of design documents for the pcreds.databse on server=srv
    If pcreds are none, then we're adminparty and db is test_db"""
    kwargs={'connect':True, 'url':srv}
    if pcreds:
        user, passwd, dbn = pcreds.user, pcreds.password, pcreds.database
        connect_client = CaCouchClient
        kwargs['user'] = user
        kwargs['passwd'] = passwd
    else:
        user, passwd, dbn = None, None, "test_db"
        connect_client = couchdb_admin_party
    with connect_client(**kwargs) as client:
        database = client[dbn]
        ad = [doc for doc in database['_all_docs']['rows'] if '_design' in doc['id']]
    return [i['id'] for i in ad]


def reset_all_tokens(token_type, picas_creds, server="https://picas-lofar.grid.surfsara.nl:6984"):
    """ Resets all Tokens with the pc authorization
    """
    thandler = TokenHandler(t_type=token_type, srv=server,
                            uname=picas_creds.user, pwd=picas_creds.password,
                            dbn=picas_creds.database)
    thandler.load_views()
    for view in list(thandler.views):
        if view != 'overview_total':
            thandler.reset_tokens(view)


def purge_tokens(token_type, picas_creds, server="https://picas-lofar.grid.surfsara.nl:6984"):
    """Automated function to purge tokens authorizing with Picas_creds"""
    thandler = TokenHandler(t_type=token_type, srv=server,
                            uname=picas_creds.user, pwd=picas_creds.password,
                            dbn=picas_creds.database)
    thandler.load_views()
    thandler.purge_tokens()

[docs]class Token(dict):
[docs] def __init__(self, token_type, token_id=None, **kwargs): self.__setitem__('type', token_type) if not token_id: self.__setitem__('_id',token_type) else: self.__setitem__('_id',token_id) self.__setitem__('lock', 0) self.__setitem__('done', 0)
[docs] def synchronize(self, db, prefer_local=False, upload=False): """Synchronizes the token with the database. """ remote_token = db[self['_id']] for k in set(list(remote_token.keys())+list(self.keys())): if prefer_local: remote_token[k] = self.get(k) else: self[k] = remote_token.get(k) if upload: self.upload(db)
[docs] def build(self,token_builder): data = token_builder.data self.update(data)
[docs] def add_attachment(self): raise NotImplementedError
[docs] def reset(self): self.__setitem__('lock', 0) self.__setitem__('done', 0) scrub_count = self.get('scrub_count', 0) self.__setitem__('scrub_count', scrub_count + 1) self.__setitem__('hostname', '') self.__setitem__('output', '')
class caToken(Token, Document): def __init__(self, database, token_type, **kwargs): Token.__init__(self, token_type=token_type, **kwargs) Document.__init__(self,database=database, **kwargs) self._document_id = self['_id'] def add_attachment(self, filename, attachment_name): file_type = mimetypes.guess_type(filename)[0] self.put_attachment(attachment_name, file_type, open(filename,'r')) def get_all_attachments(self): """Gets all attachments from the remote token and saves them to a file with name ID-Attachment""" self.fetch() for attachment in self.get('__attachments'): if not self['_attachments'][attachment].get('content-type'): data=self.get_attachment(attachment, attachment_type='text/plain') else: data=self.get_attachment(attachment) with open(self['_id']+"-"+attachment,'wb') as att_f: att_f.write(data) class TokenBuilder: __metaclass__ = ABCMeta """Creates a token""" @abstractmethod def _build(self): pass @property def data(self): return self._data class TokenDictBuilder(TokenBuilder): def __init__(self, config_dict): self._build(config_dict) def _build(self,config_dict): self._build_from_dict(config_dict) def _build_from_dict(self, config_dict): self._data={} if "PicasApiVersion" in config_dict and config_dict["PicasApiVersion"]<0.5: raise RuntimeError("Unsupported PiCaS API version {0}", config_dict["PicasApiVersion"]) self._data['config.json']={} _config = config_dict _variables={} if 'Token' in _config: if 'variables' in _config['Token']: _variables.update(_config['Token']['variables']) _config['variables'] = _config['Token']['variables'] del _config['Token']['variables'] self._data.update(_config['Token']) if 'Job' in _config and 'variables' in _config['Job']: self._data.update(_config['Job']['variables']) if 'variables' in _config: self._data['config.json']['variables']=_config['variables'] if 'container' in _config: self._data['config.json']['container']=_config['container'] if 'sandbox' in _config: self._data['config.json']['sandbox']=_config['sandbox'] class TokenJsonBuilder(TokenDictBuilder): """Reads a json config file and builds the Token fields using the data in this file.""" def __init__(self, config_file): self._build(config_file) def _build(self, config_file): _config = json.load(open(config_file)) self._data={} self._build_from_dict(_config) class TokenList(list): """A list of token that a Token can be appended to Includes upload and download functions to upload all the local documents and to download the remote ones NOTE: Implement this in a composite pattern, to allow sublists where each sublist is the result of a view""" def __init__(self, token_type=None, database=None): self._token_ids = [] self._design_doc = None self.__set_database(database) self.__set_token_type(token_type) def __set_token_type(self, token_type): self.token_type = token_type if token_type and self._database!=None: self.__set_design_doc() def __set_database(self, database): self._database = database def __set_design_doc(self): design_doc_name = '_design/{0}'.format(self.token_type) self._design_doc = DesignDocument(self._database, design_doc_name) if design_doc_name not in self._database: self._design_doc.save() self._design_doc.fetch() def add_attachment(self, filename, attachment_name): for token in self: try: token.add_attachment(filename, attachment_name) except HTTPError as e: print(e) if '404' in str(e): token.save() token.add_attachment(filename, attachment_name) def append(self, item): if isinstance(item, Token): if not self._database: self.__set_database(item._database) #Does this have a getter? if not self.token_type: self.__set_token_type(item['type']) elif item['type'] != self.token_type: raise TypeError("Appending token of wrong token type, {0} != {1}".format( item['type'], self.token_type)) if item['_id'] not in self._token_ids: self._token_ids.append(item['_id']) else: raise RuntimeError("token with id {0} already exists! You need unique '_id' fields ".format( item['_id'])) super(TokenList, self).append(item) elif isinstance(item, TokenList): for _id in item._token_ids: self._token_ids.append(_id) super(TokenList, self).append(item) else: raise TypeError("Cannot append item {0} as it's not a Token".format(item)) def upload_all(self): """Uploads all tokens in the list. It will crash if the token already exists This also cannot be called if the tokens were created inside a context manager that has been exited. Best option is to create a persistent 'cloudant.client.CouchDB' object. """ for token in self: token.save() def save(self): self.upload_all() def delete_all(self): for token in self: token.delete() def reset(self): for token in self: token.reset() def delete(self): self.delete_all(self) def add_view(self, view): map_code = view.get_codes(self.token_type)[0] reduce_code = view.get_codes(self.token_type)[1] if self._design_doc: try: self._design_doc.add_view(view.name, map_code, reduce_code) except CloudantArgumentError: self._design_doc.delete_view(view.name) self._design_doc.add_view(view.name, map_code, reduce_code) self._design_doc.save() def fetch(self): for token in self: token.fetch() def list_view_tokens(self, view_name): view = self._design_doc.get_view(view_name) if not view: return self view_list = TokenList(token_type=self.token_type, database=self._database) for i in view.result: tok=caToken(database=self._database, token_type=self.token_type,token_id=i['id']) view_list.append(tok) view_list.fetch() return view_list def get_views(self): if self._design_doc: self._design_doc.fetch() return self._design_doc.list_views() def delete_views(self): for view in self.get_views(): self._design_doc.delete_view(view) self._design_doc.save() def add_token_views(self): """Adds the todo, locked, error, done and overview_view views that are standard for a PiCaS Token.""" self.add_view(TokenView("todo", 'doc.lock == 0 && doc.done == 0 ')) self.add_view(TokenView("locked", 'doc.lock > 0 && doc.done == 0 ', ('doc._id', 'doc.status'))) self.add_view(TokenView("done", 'doc.status == "done"' )) self.add_view(TokenView("error", 'doc.status == "error" ', ('doc._id', 'doc.status'))) self.add_view(TokenReduceView('overview_view')) class TokenView(object): def __init__(self, name, condition, emit_values=('doc._id', 'doc._id')): self.name = name self.condition = condition self.emit_values = emit_values def _get_map_code(self, token_type): general_view_code = """function(doc) {{ if(doc.type == "{0}") {{ if({1}) {{ emit({2}, {3}); }} }} }} """.format(token_type, self.condition, self.emit_values[0], self.emit_values[1]) return general_view_code def _get_reduce_code(self): return None def get_codes(self, token_type): return self._get_map_code(token_type), self._get_reduce_code() class TokenReduceView(TokenView): def __init__(self, name, **kwargs): super(TokenReduceView, self).__init__(name, kwargs) def _get_map_code(self, token_type): overview_map_code = '''function(doc) {{ if(doc.type == "{0}" ) {{ if (doc.lock == 0 && doc.done == 0){{ emit('todo', 1); }} if(doc.lock > 0 && doc.status == 'downloading' ) {{ emit('downloading', 1); }} if(doc.lock > 0 && doc.done > 0 && doc.output == 0 ) {{ emit('done', 1); }} if(doc.lock > 0 && doc.output != 0 && doc.output != "" ) {{ emit('error', 1); }} if(doc.lock > 0 && doc.status == 'launched' ) {{ emit('waiting', 1); }} if(doc.lock > 0 && doc.done==0 && doc.status!='downloading' ) {{ emit('running', 1); }} }} }} '''.format(token_type) return overview_map_code def _get_reduce_code(self): overview_reduce_code = '''function (key, values, rereduce) { return sum(values); } ''' return overview_reduce_code
[docs]class TokenHandler(object): """self.database.get("_design/"" The TokenHandler class uses couchdb to create, modify and delete tokens and views, to attach files, or download attachments and to easily modify fields in tokens. It's initiated with the token_type, server, username, password and name of database. """
[docs] def __init__(self, t_type="token", srv="https://picas-lofar.grid.surfsara.nl:6984", uname="", pwd="", dbn=""): """ >>> #Example creation of a token of token_type 'test' >>> from GRID_LRT.auth.get_picas_credentials import picas_cred >>> pc=picas_cred() #Gets picas_credentials >>> >>> th=token.TokenHandler( t_type="test", srv="https://picas-lofar.grid.surfsara.nl:6984", uname=pc.user, pwd=pc.password, dbn=pc.database ) #creates object to 'handle' Tokens >>> th.add_overview_view() >>> th.add_status_views() #Adds 'todo', 'done', 'locked' and 'error' views >>> th.load_views() >>> th.views.keys() >>> th.reset_tokens(view_name='error') # resets all tokens in 'error' view >>> th.set_view_to_status(view_name='done','processed') """ print("TokenHandler is no longer supported!")
@staticmethod def _append_id(keys, app=""): """ Helper function that appends a string to the token ID""" keys["_id"] += app
[docs] def del_view(self, view_name="test_view"): pass
[docs] def remove_error(self): pass
[docs] def add_attachment(self, token, filehandle, filename="test"): pass
[docs] def list_attachments(self, token): pass
[docs] def get_attachment(self, token, filename, savename=None): pass
[docs] def list_tokens_from_view(self, view_name): pass
[docs] def archive_tokens_from_view(self, viewname, delete_on_save=False): pass
[docs] def archive_tokens(self, delete_on_save=False, compress=True): pass
[docs] def archive_a_token(self, tokenid, delete=False): pass
[docs] def clear_all_views(self): pass
[docs] def purge_tokens(self): pass
[docs] def set_view_to_status(self, view_name, status): pass
[docs]class TokenSet(object): """ The TokenSet object can automatically create a group of tokens from a yaml configuration file and a dictionary. It keeps track internally of the set of tokens and allows users to batch attach files to the entire TokenSet or alter fields of all tokens in the set. """
[docs] def __init__(self, th=None, tok_config=None): """ The TokenSet object is created with a TokenHandler Object, which is responsible for the interface to the CouchDB views and Documents. This also ensures that only one job type is contained in a TokenSet. Args: :param th: The TokenHandler associated with the job tokens :type th: GRID_LRT.Token.TokenHandler :param tok_config: Location of the token yaml file on the host FileSystem :type tok_config: str :raises: AttributeError, KeyError """ self.thandler = th self.__tokens = [] if not tok_config: self.token_keys = {} else: with open(tok_config, 'r') as optfile: self.token_keys = json.load(optfile)['Token']
[docs] def create_dict_tokens(self, iterable={}, id_prefix='SB', id_append="L000000", key_name='STARTSB', file_upload=None): """ A function that accepts a dictionary and creates a set of tokens equal to the number of entries (keys) of the dictionary. The values of the dict are a list of strings that may be attached to each token if the 'file_upload' argument exists. Args: :param iterable: The dictionary which determines how many tokens will be created. The values are attached to each token :type iterable: dict :param id_append: Option to append the OBSID to each Token :type id_append: str :param key_name: The Token field which will hold the value of the dictionary's keys for each Token :type key_name: str :param file_upload: The name of the file which to upload to the tokens (typically srm.txt) :type file_upload: str """ # TODO: Check if key_name is inside token_keys! for key in iterable: keys = dict(itertools.chain(self.token_keys.items(), { key_name: str("%03d" % int(key))}.items())) # _=keys.pop('_attachments') pipeline = "" if 'PIPELINE_STEP' in keys: pipeline = "_"+keys['PIPELINE_STEP'] token = self.thandler.create_token( keys, append=id_append+pipeline+"_"+id_prefix+str("%03d" % int(key))) if file_upload: with open('temp_abn', 'w') as tmp_abn_file: for i in iterable[key]: tmp_abn_file.write("%s\n" % i) with open('temp_abn', 'r') as tmp_abn_file: self.thandler.add_attachment(token, tmp_abn_file, file_upload) os.remove('temp_abn') self.__tokens.append(token)
[docs] def add_attach_to_list(self, attachment, tok_list=None, name=None): '''Adds an attachment to all the tokens in the TokenSet, or to another list of tokens if explicitly specified. ''' if not name: name = attachment if not tok_list: tok_list = self.__tokens for token in tok_list: self.thandler.add_attachment(token, open( attachment, 'r'), os.path.basename(name))
@property def tokens(self): self.update_local_tokens() return self.__tokens
[docs] def update_local_tokens(self): self.__tokens = [] self.thandler.load_views() for view in self.thandler.views.keys(): if view != 'overview_total': for token in self.thandler.list_tokens_from_view(view): self.__tokens.append(token['id'])
[docs] def add_keys_to_list(self, key, val, tok_list=None): if not tok_list: tok_list = self.__tokens to_update = [] for token in tok_list: document = self.thandler.database[token] document[key] = str(val) to_update.append(document) self.thandler.database.update(to_update)