Source code for GRID_LRT.Staging.srmlist

import sys
import re
from collections import deque
from math import ceil
import logging
import subprocess

[docs]class srmlist(list): """ The srmlist class is an extension of Python lists that can hold a list of srm links to data on GRID Storage (LOFAR Archive, Intermediate Storage, etc). In addition to the regular list capabilities, it also has internal checks for the location and the OBSID of the data. When a new item is appended, these checks are done automatically. Checking OBSID is an optional argument set to True by default. """
[docs] def __init__(self, checkOBSID=True, link=None): super(srmlist, self).__init__() self.lta_location = None self.obsid = None self.checkobsid = checkOBSID if link: self.append(link)
[docs] def check_location(self, item): tmp_loc = "" if isinstance(item, str): tmp_loc = self.check_str_location(item) elif isinstance(item, srmlist): for i in item: tmp_loc = self.check_str_location(i) return tmp_loc
[docs] def check_str_location(self, item): loc = '' if '' in item: loc = 'sara' if '' in item: loc = 'juelich' if '' in item: loc = 'poznan' return loc
[docs] def stringify_item(self, item): if isinstance(item, str): link = item.strip('\n') link = item.strip('\r') elif isinstance(item, srmlist): link = "".join(str(v) for v in item) else: return "" return link
[docs] def check_obsid(self, item): link = self.stringify_item(item) tmp_obsid ='L[0-9][0-9]*', link).group(0) if not self.obsid: self.obsid = tmp_obsid if self.checkobsid and tmp_obsid != self.obsid: raise AttributeError("Different OBSID than previous items")
[docs] def append(self, item): if not item or item == "": return self.check_obsid(item) tmp_loc = self.check_location(item) item = self.trim_spaces(self.stringify_item(item)) if not self.lta_location: self.lta_location = tmp_loc elif self.lta_location != tmp_loc: raise AttributeError( "Appended srm link not the same location as previous links!") if item in self: return # don't add duplicate srms # append the item to itself (the list) super(srmlist, self).append(item)
[docs] def trim_spaces(self, item): """Sometimes there are two fields in the incoming list. Only take the first as long as it's fromatted properly """ item = re.sub('//pnfs', '/pnfs', "".join(item)) if self.lta_location == 'poznan': item = re.sub('//lofar', '/lofar', "".join(item)) if " " in item: for potential_link in item.split(" "): if 'srm://' in potential_link: return potential_link else: return item
[docs] def gfal_replace(self, item): """ For each item, it creates a valid link for the gfal staging scripts """ if 'srm://' in item: return re.sub(':8443', ':8443/srm/managerv2?SFN=', item) elif 'gsiftp://' in item: return self.srm_replace(item)
[docs] def srm_replace(self, item): if self.lta_location == 'sara': return re.sub('gsi', 'srm://', item) if self.lta_location == 'juelich': return re.sub("gsi", "srm://", item) if self.lta_location == 'poznan': return re.sub("gsi", "srm://", item)
[docs] def gsi_replace(self, item): if self.lta_location == 'sara': return re.sub('srm://', 'gsi', item) if self.lta_location == 'juelich': return re.sub("srm://", "gsi", item) if self.lta_location == 'poznan': return re.sub("srm://", "gsi", item)
[docs] def http_replace(self, item): if self.lta_location == 'sara': return re.sub('srm://', '', item) if self.lta_location == 'juelich': return re.sub( "srm://", "", item) if self.lta_location == 'poznan': return re.sub("srm://", "", item)
[docs] def sbn_dict(self, pref="SB", suff="_"): """ Returns a generator that creates a pair of SBN and link. Can be used to create dictionaries """ for i in self: match = None surl = srmlist() surl.append(i) match ='(.+?)'+suff, i) try: yield, surl except AttributeError as exc: sys.stderr.write("Are you using pref='SB' and suff='_'"+ "to match ...SB000_... ?") raise exc
[docs]def slice_dicts(srmdict, slice_size=10): """ Returns a dict of lists that hold 10 SBNs (by default). Missing Subbands are treated as empty spaces, if you miss SB009, the list will include 9 items from SB000 to SB008, and next will start at SB010""" srmdict = dict(srmdict) keys = sorted(srmdict.keys()) start = int(keys[0]) sliced = {} for chunk in range(0, 1 + int(ceil((int(keys[-1])-int(keys[0]))/float(slice_size)))): chunk_name = format(start+chunk*slice_size, '03') sliced[chunk_name] = srmlist() for i in range(slice_size): if format(start+chunk*slice_size+i, '03') in srmdict.keys(): sliced[chunk_name].append( srmdict[format(start+chunk*slice_size+i, '03')]) sliced = dict((k, v) for k, v in sliced.items() if v) #Removing empty items return sliced
[docs]def make_srmlist_from_gsiftpdir(gsiftpdir): srml = srmlist() for i in count_files_uberftp(gsiftpdir): srml.append(i) return srml
[docs]def count_files_uberftp(directory): f_list = subprocess.Popen(['uberftp', '-ls', directory], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out = f_list.communicate() if out[1] != '': logging.warning("srmls failed to find a directory!! ") return 0 if 'lofsksp' not in out[0]: logging.warning("no link found in folder!, returning 0 files") return 0 file_list = [directory+"/"+str(i.split()[-1]) for i in out[0].strip().split("\r\n")] return file_list