Mercurial Hosting > traffic-intelligence
view python/pavement.py @ 462:af2222c0c9c0
TTC tested and updatet!
author | Sohail Zangenehpour <sohail.zangenehpour@mail.mcgill.ca> |
---|---|
date | Mon, 03 Feb 2014 15:41:57 -0500 |
parents | b5cc6b001ae6 |
children | a65a14c90834 |
line wrap: on
line source
#! /usr/bin/env python '''Tools for processing and analyzing pavement marking data''' import numpy as np __metaclass__ = type class RTSS: 'class for data related to a RTSS, including agregating pavement marking measurements' def __init__(self, id): self.id = id class MarkingTest: '''class for a test site for a given product''' def __init__(self, siteId, paintingDate, paintingType, color, data): self.siteId = siteId self.paintingDate = paintingDate self.paintingType = paintingType self.color = color self.data = data self.nMeasures = len(data) def plot(self, measure, options = 'o', dayRatio = 1., **kwargs): from matplotlib.pyplot import plot plot(self.data['jours']/float(dayRatio), self.data[measure], options, **kwargs) def getMarkingMeasures(self, dataLabel): from numpy import isnan nonZeroIndices = ~isnan(self.data[dataLabel]) return self.data[nonZeroIndices]['jours'], self.data[nonZeroIndices][dataLabel] def plotMarkingMeasures(self, measure, options = 'o', dayRatio = 1., **kwargs): for i in range(1,7): self.plot('{}_{}'.format(measure, i), options, dayRatio, **kwargs) def occ_max(a): if a != []: s = set(a) l = list(s) occ = [] for i in range(len(l)): b = 0 for j in range(len(a)): if a[j] == l[i]: b += 1 occ.append([l[i], b]) nbre_occs = [] for i in range(len(occ)): nbre_occs.append(occ[i][1]) return occ[nbre_occs.index(max(nbre_occs))][0] else: return "" def caracteristiques(rtss, maintenanceLevel, rtssWeatherStation, fmr, paintType): '''Computes characteristic data for the RTSS (class rtss) maintenanceLevel = pylab.csv2rec('C:\Users\Alexandre\Desktop\Projet_maitrise_recherche\BDD_access\\analyse_donnees_deneigement\\exigence_circuits.txt', delimiter = ';') rtssWeatherStation = pylab.csv2rec('C:\Users\Alexandre\Desktop\Projet_maitrise_recherche\stations_environnement_canada\\rtssWeatherStation\juste_pour_rtss_avec_donnees_entretien_hiv\\rtssWeatherStation_EC3.txt', delimiter = ',') fmr = pylab.csv2rec('C:\Users\Alexandre\Desktop\Projet_maitrise_recherche\BDD_access\\analyse_donnees_deneigement\\fmr.txt', delimiter = ';') paintType = pylab.csv2rec('C:\Users\Alexandre\Desktop\Projet_maitrise_recherche\BDD_access\\analyse_donnees_deneigement\\type_peinture.txt', delimiter = ';') ''' # determination exigence deneigement if rtss.id in maintenanceLevel['rtss_debut']: for i in range(len(maintenanceLevel)): if maintenanceLevel['rtss_debut'][i] == rtss.id: exigence = maintenanceLevel['exigence'][i] else: exigence = '' # determination x/y if rtss.id in rtssWeatherStation['rtss']: for i in range(len(rtssWeatherStation)): if rtssWeatherStation['rtss'][i] == rtss.id: x_moy = rtssWeatherStation['x_moy'][i] y_moy = rtssWeatherStation['y_moy'][i] else: x_moy, y_moy = '','' # determination info fmr age_revtm, classe_fonct, type_revtm, milieu, djma, pourc_camions = [], [], [], [], [], [] if rtss.id in fmr['rtss_debut']: for i in range(len(fmr)): if fmr['rtss_debut'][i] == rtss.id: age_revtm.append(fmr['age_revtm'][i]) classe_fonct.append(fmr['des_clasf_fonct'][i]) type_revtm.append(fmr['des_type_revtm'][i]) milieu.append(fmr['des_cod_mil'][i]) djma.append(fmr['val_djma'][i]) pourc_camions.append(fmr['val_pourc_camns'][i]) age_revtm = occ_max(age_revtm) classe_fonct = occ_max(classe_fonct) type_revtm = occ_max(type_revtm) milieu = occ_max(milieu) djma = occ_max(djma) pourc_camions = occ_max(pourc_camions) if pourc_camions == "" or pourc_camions < 0: djma_camions = "" else: djma_camions = pourc_camions*djma/100 else: age_revtm, classe_fonct, type_revtm, milieu, djma, djma_camions = '','','','','','' # determination type peinture peinture_rd, peinture_rg, peinture_cl = [], [], [] peinture_lrd, peinture_lrg, peinture_lc = 0,0,0 if rtss.id in paintType['rtss_debut_orig']: for i in range(len(paintType)): if paintType['rtss_debut_orig'][i] == rtss.id: peinture_rd.append((paintType['peinture_rd'][i])) peinture_rg.append((paintType['peinture_rg'][i])) peinture_cl.append((paintType['peinture_cl'][i])) peinture_lrd = occ_max(peinture_rd) peinture_lrg = occ_max(peinture_rg) peinture_lc = occ_max(peinture_cl) else: peinture_lrd, peinture_lrg, peinture_lc = '','','' return (exigence, x_moy, y_moy, age_revtm, classe_fonct, type_revtm, milieu, djma, djma_camions, peinture_lrd, peinture_lrg, peinture_lc) def winterMaintenanceIndicators(data, startDate, endDate, circuitReference, snowThreshold): '''Computes several winter maintenance indicators data = entretien_hivernal = pylab.csv2rec('C:\Users\Alexandre\Documents\Cours\Poly\Projet\mesures_entretien_hivernal\mesures_deneigement.txt', delimiter = ',')''' import datetime somme_eau, somme_neige, somme_abrasif, somme_sel, somme_lc, somme_lrg, somme_lrd, premiere_neige, compteur_somme_abrasif = 0,0,0,0,0,0,0,0,0 if circuitReference in data['ref_circuit']: for i in range(len(data)): if data['ref_circuit'][i] == circuitReference and (data['date'][i] + datetime.timedelta(days = 6)) <= endDate and (data['date'][i] + datetime.timedelta(days = 6)) > startDate: premiere_neige += data['premiere_neige'][i] somme_neige += float(data['neige'][i]) somme_eau += float(data['eau'][i]) somme_abrasif += float(data['abrasif'][i]) somme_sel += float(data['sel'][i]) somme_lc += float(data['lc'][i]) somme_lrg += float(data['lrg'][i]) somme_lrd += float(data['lrd'][i]) compteur_somme_abrasif += float(data['autre_abrasif_binaire'][i]) if compteur_somme_abrasif >= 1: autres_abrasifs = 1 else: autres_abrasifs = 0 if somme_neige < snowThreshold: neigeMTQ_sup_seuil = 0 else: neigeMTQ_sup_seuil = 1 else: somme_eau, somme_neige, somme_abrasif, somme_sel, somme_lc, somme_lrg, somme_lrd, premiere_neige, autres_abrasifs, neigeMTQ_sup_seuil = '','','','','','','','','','' return (somme_eau, somme_neige, neigeMTQ_sup_seuil, somme_abrasif, somme_sel, somme_lc, somme_lrg, somme_lrd, premiere_neige, autres_abrasifs) def ecWeatherIndicators(data, startDate, endDate, snowThreshold): '''Computes the indicators from Environment Canada files (loaded as a recarray using csv2rec in data), between start and end dates (datetime.datetime objects)''' nbre_jours_T_negatif,nbre_jours_gel_degel,pluie_tot,neige_tot,ecart_type_T = 0,0,0,0,0 compteur,nbre_jours_gel_consecutifs=0,0 tmoys = [] seuils_T = [20,15,10,5] deltas_T = [0,0,0,0] for i in range(int((endDate - startDate).days)+1): if data['tmax'][i] != '' and data['tmax'][i] != None: tmax = float(data['tmax'][i].replace(',','.')) else: tmax = None if data['tmin'][i] != '' and data['tmin'][i] != None: tmin = float(data['tmin'][i].replace(',','.')) if data['pluie_tot'][i] != '' and data['pluie_tot'][i] != None: pluie_tot += float(data['pluie_tot'][i].replace(',','.')) if data['neige_tot'][i] != '' and data['neige_tot'][i] != None: neige_tot += float(data['neige_tot'][i].replace(',','.')) if tmax != None: if tmax < 0: nbre_jours_T_negatif += 1 if tmax != None and data['tmin'][i] != '' and data['tmin'][i] != None: if tmax > 0 and tmin < 0: nbre_jours_gel_degel += 1 for l in range(len(seuils_T)): if tmax - tmin >=seuils_T[l]: deltas_T[l] += 1 if data['tmoy'][i] != '' and data['tmoy'][i] != None: tmoys.append(float(data['tmoy'][i].replace(',','.'))) if tmax != None: if tmax < 0: compteur += 1 elif tmax >= 0 and compteur >= nbre_jours_gel_consecutifs: nbre_jours_gel_consecutifs = compteur compteur = 0 else: compteur = 0 nbre_jours_gel_consecutifs = max(nbre_jours_gel_consecutifs,compteur) ecart_type_T = np.std(tmoys) if neige_tot < snowThreshold: neigeEC_sup_seuil = 0 else: neigeEC_sup_seuil = 1 return (nbre_jours_T_negatif,nbre_jours_gel_degel, deltas_T, nbre_jours_gel_consecutifs, pluie_tot, neige_tot, neigeEC_sup_seuil, ecart_type_T)