Mercurial Hosting > traffic-intelligence
view trafficintelligence/pavement.py @ 1213:3f2214125164
work in progress
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Wed, 03 May 2023 14:58:26 -0400 |
parents | 956a66096e91 |
children | 785c86013d2c |
line wrap: on
line source
#! /usr/bin/env python '''Tools for processing and analyzing pavement marking data''' import datetime import numpy as np from trafficintelligence import utils paintTypes = {0: "Non-existant", 1: "Eau", 2: "Epoxy", 3: "Alkyde", 4: "Autre"} durabilities = {1: 98, #96 to 100 2: 85, #75 to 96 3: 62, #50 to 75 4: 32, #15 to 50 5: 7 #0 to 15 } roadFunctionalClasses = {40: "Collectrice", 20: "Nationale", 30: "Regionale", 10: "Autoroute", 60: "Acces ressources", 51: "Local 1", 52: "Local 2", 53: "Local 3", 15: "Aut (PRN)", 25: "Nat (PRN)", 70: "Acces isolees", 99: "Autres"} def caracteristiques(rtss, maintenanceLevel, rtssWeatherStation, fmr, paintType): '''Computes characteristic data for the RTSS (class rtss) maintenanceLevel = pylab.csv2rec('C:\\Users\Alexandre\Desktop\Projet_maitrise_recherche\BDD_access\\analyse_donnees_deneigement\\exigence_circuits.txt', delimiter = ';') rtssWeatherStation = pylab.csv2rec('C:\\Users\Alexandre\Desktop\Projet_maitrise_recherche\stations_environnement_canada\\rtssWeatherStation\juste_pour_rtss_avec_donnees_entretien_hiv\\rtssWeatherStation_EC3.txt', delimiter = ',') fmr = pylab.csv2rec('C:\\Users\Alexandre\Desktop\Projet_maitrise_recherche\BDD_access\\analyse_donnees_deneigement\\fmr.txt', delimiter = ';') paintType = pylab.csv2rec('C:\\Users\Alexandre\Desktop\Projet_maitrise_recherche\BDD_access\\analyse_donnees_deneigement\\type_peinture.txt', delimiter = ';') ''' # determination exigence deneigement if rtss.id in maintenanceLevel['rtss_debut']: for i in range(len(maintenanceLevel)): if maintenanceLevel['rtss_debut'][i] == rtss.id: exigence = maintenanceLevel['exigence'][i] else: exigence = '' # determination x/y if rtss.id in rtssWeatherStation['rtss']: for i in range(len(rtssWeatherStation)): if rtssWeatherStation['rtss'][i] == rtss.id: x_moy = rtssWeatherStation['x_moy'][i] y_moy = rtssWeatherStation['y_moy'][i] else: x_moy, y_moy = '','' # determination info fmr age_revtm, classe_fonct, type_revtm, milieu, djma, pourc_camions, vit_max = [], [], [], [], [], [], [] if rtss.id in fmr['rtss_debut']: for i in range(len(fmr)): if fmr['rtss_debut'][i] == rtss.id: age_revtm.append(fmr['age_revtm'][i]) classe_fonct.append(fmr['des_clasf_fonct'][i]) type_revtm.append(fmr['des_type_revtm'][i]) milieu.append(fmr['des_cod_mil'][i]) djma.append(fmr['val_djma'][i]) pourc_camions.append(fmr['val_pourc_camns'][i]) vit_max.append(fmr['val_limt_vitss'][i]) age_revtm = utils.mostCommon(age_revtm) classe_fonct = utils.mostCommon(classe_fonct) type_revtm = utils.mostCommon(type_revtm) milieu = utils.mostCommon(milieu) djma = utils.mostCommon(djma) vit_max = utils.mostCommon(vit_max) if vit_max < 0: vit_max = '' pourc_camions = utils.mostCommon(pourc_camions) if pourc_camions == "" or pourc_camions < 0: djma_camions = "" else: djma_camions = pourc_camions*djma/100 else: age_revtm, classe_fonct, type_revtm, milieu, djma, djma_camions, vit_max = '','','','','','','' # determination type peinture peinture_rd, peinture_rg, peinture_cl = [], [], [] peinture_lrd, peinture_lrg, peinture_lc = 0,0,0 if rtss.id in paintType['rtss_debut_orig']: for i in range(len(paintType)): if paintType['rtss_debut_orig'][i] == rtss.id: peinture_rd.append((paintType['peinture_rd'][i])) peinture_rg.append((paintType['peinture_rg'][i])) peinture_cl.append((paintType['peinture_cl'][i])) peinture_lrd = utils.mostCommon(peinture_rd) peinture_lrg = utils.mostCommon(peinture_rg) peinture_lc = utils.mostCommon(peinture_cl) else: peinture_lrd, peinture_lrg, peinture_lc = '','','' return (exigence, x_moy, y_moy, age_revtm, classe_fonct, type_revtm, milieu, djma, djma_camions, vit_max, peinture_lrd, peinture_lrg, peinture_lc) def winterMaintenanceIndicators(data, startDate, endDate, circuitReference, snowThreshold): '''Computes several winter maintenance indicators data = entretien_hivernal = pylab.csv2rec('C:\\Users\Alexandre\Documents\Cours\Poly\Projet\mesures_entretien_hivernal\mesures_deneigement.txt', delimiter = ',')''' somme_eau, somme_neige, somme_abrasif, somme_sel, somme_lc, somme_lrg, somme_lrd, compteur_premiere_neige, compteur_somme_abrasif = 0,0,0,0,0,0,0,0,0 if circuitReference in data['ref_circuit']: for i in range(len(data)): if data['ref_circuit'][i] == circuitReference and (data['date'][i] + datetime.timedelta(days = 6)) <= endDate and (data['date'][i] + datetime.timedelta(days = 6)) > startDate: compteur_premiere_neige += float(data['premiere_neige'][i]) somme_neige += float(data['neige'][i]) somme_eau += float(data['eau'][i]) somme_abrasif += float(data['abrasif'][i]) somme_sel += float(data['sel'][i]) somme_lc += float(data['lc'][i]) somme_lrg += float(data['lrg'][i]) somme_lrd += float(data['lrd'][i]) compteur_somme_abrasif += float(data['autre_abrasif_binaire'][i]) if compteur_premiere_neige >= 1: premiere_neige = 1 else: premiere_neige = 0 if compteur_somme_abrasif >= 1: autres_abrasifs = 1 else: autres_abrasifs = 0 if somme_neige < snowThreshold: neigeMTQ_sup_seuil = 0 else: neigeMTQ_sup_seuil = 1 else: somme_eau, somme_neige, somme_abrasif, somme_sel, somme_lc, somme_lrg, somme_lrd, premiere_neige, autres_abrasifs, neigeMTQ_sup_seuil = '','','','','','','','','','' return (somme_eau, somme_neige, neigeMTQ_sup_seuil, somme_abrasif, somme_sel, somme_lc, somme_lrg, somme_lrd, premiere_neige, autres_abrasifs) def weatherIndicators(data, startDate, endDate, snowThreshold, weatherDatatype, minProportionMeasures = 0.): '''Computes the indicators from Environment Canada files (loaded as a recarray using csv2rec in data), between start and end dates (datetime.datetime objects) weatherDataType is to indicate Environnement Canada data ('ec') or else MTQ minProportionMeasures is proportion of measures necessary to consider the indicators''' nbre_jours_T_negatif,nbre_jours_gel_degel,pluie_tot,neige_tot,ecart_type_T = 0,0,0,0,0 compteur,nbre_jours_gel_consecutifs=0,0 tmoys = [] seuils_T = [20,15,10,5] deltas_T = [0,0,0,0] startIndex = np.nonzero(data['date'] == startDate) nDays = int((endDate - startDate).days)+1 if len(startIndex) > 0 and startIndex+nDays <= len(data): startIndex = startIndex[0] for i in range(startIndex, startIndex+nDays): if not np.isnan(data['tmax'][i]): tmax = data['tmax'][i] else: tmax = None if not np.isnan(data['tmin'][i]): tmin = data['tmin'][i] else: tmin = None if weatherDatatype == 'ec': if data['pluie_tot'][i] is not None and not np.isnan(data['pluie_tot'][i]): pluie_tot += data['pluie_tot'][i] if data['neige_tot'][i] is not None and not np.isnan(data['neige_tot'][i]): neige_tot += data['neige_tot'][i] if tmax is not None: if tmax < 0: nbre_jours_T_negatif += 1 if tmax is not None and tmin is not None: if tmax > 0 and tmin < 0: nbre_jours_gel_degel += 1 for l in range(len(seuils_T)): if tmax - tmin >=seuils_T[l]: deltas_T[l] += 1 if not np.isnan(data['tmoy'][i]): tmoys.append(data['tmoy'][i]) if tmax is not None: if tmax < 0: compteur += 1 elif tmax >= 0 and compteur >= nbre_jours_gel_consecutifs: nbre_jours_gel_consecutifs = compteur compteur = 0 else: compteur = 0 nbre_jours_gel_consecutifs = max(nbre_jours_gel_consecutifs,compteur) if len(tmoys) > 0 and float(len(tmoys))/nDays >= minProportionMeasures: if tmoys != []: ecart_type_T = np.std(tmoys) else: ecart_type = None if neige_tot < snowThreshold: neigeEC_sup_seuil = 0 else: neigeEC_sup_seuil = 1 return (nbre_jours_T_negatif,nbre_jours_gel_degel, deltas_T, nbre_jours_gel_consecutifs, pluie_tot, neige_tot, neigeEC_sup_seuil, ecart_type_T) else: return [None]*2+[[None]*len(seuils_T)]+[None]*5 def mtqWeatherIndicators(data, startDate, endDate,tmax,tmin,tmoy): print("Deprecated, use weatherIndicators") nbre_jours_T_negatif,nbre_jours_gel_degel,ecart_type_T = 0,0,0 compteur,nbre_jours_gel_consecutifs=0,0 tmoys = [] seuils_T = [20,15,10,5] deltas_T = [0,0,0,0] startIndex = np.nonzero(data['date'] == startDate) nDays = (endDate - startDate).days+1 for i in range(startIndex, startIndex+nDays): if tmax[i] < 0: nbre_jours_T_negatif += 1 if tmax[i] > 0 and tmin[i] < 0: nbre_jours_gel_degel += 1 for l in range(len(seuils_T)): if tmax[i] - tmin[i] >=seuils_T[l]: deltas_T[l] += 1 tmoys.append(tmoy[i]) if tmax[i] < 0: compteur += 1 elif tmax[i] >= 0 and compteur >= nbre_jours_gel_consecutifs: nbre_jours_gel_consecutifs = compteur compteur = 0 else: compteur = 0 nbre_jours_gel_consecutifs = max(nbre_jours_gel_consecutifs,compteur) if tmoys != []: ecart_type_T = np.std(tmoys) else: ecart_type = None return (nbre_jours_T_negatif,nbre_jours_gel_degel, deltas_T, nbre_jours_gel_consecutifs, ecart_type_T) class RTSS(object): '''class for data related to a RTSS: - agregating pavement marking measurements - RTSS characteristics from FMR: pavement type, age, AADT, truck AADT - winter maintenance level from V155 If divided highway, the RTSS ends with G or D and are distinct: there is no ambiguity - retroreflectivity types: there are CB, RJ and RB If undivided, ending with C - durability is fine: ETAT_MARQG_RG ETAT_MARQG_CL ETAT_MARQG_RD (+SG/SD, but recent) - retroreflectivity: CJ is center line, RB and SB are left/right if DEBUT-FIN>0 or <0 ''' def __init__(self, _id, name, data): self.id = _id self.name = name self.data = data class MarkingTest(object): '''class for a test site for a given product including the series of measurements over the years''' def __init__(self, _id, paintingDate, paintingType, color, data): self.id = _id self.paintingDate = paintingDate self.paintingType = paintingType self.color = color self.data = data self.nMeasures = len(data) def getSite(self): return int(self.id[:2]) def getTestAttributes(self): return [self.paintingType, self.color, self.paintingDate.year] def plot(self, measure, options = 'o', dayRatio = 1., **kwargs): from matplotlib.pyplot import plot plot(self.data['jours']/float(dayRatio), self.data[measure], options, **kwargs) def getMarkingMeasures(self, dataLabel): nonZeroIndices = ~np.isnan(self.data[dataLabel]) return self.data[nonZeroIndices]['jours'], self.data[nonZeroIndices][dataLabel] def plotMarkingMeasures(self, measure, options = 'o', dayRatio = 1., **kwargs): for i in range(1,7): self.plot('{}_{}'.format(measure, i), options, dayRatio, **kwargs) def computeMarkingMeasureVariations(self, dataLabel, lanePositions, weatherData, snowThreshold, weatherDataType = 'ec', minProportionMeasures = 0.): '''Computes for each successive measurement lanePositions = None measure variation, initial measure, time duration, weather indicators TODO if measurements per lane, add a variable for lane position (position1 to 6) lanePositions = list of integers (range(1,7)) measure variation, initial measure, time duration, lane position1, weather indicators measure variation, initial measure, time duration, lane position2, weather indicators ...''' variationData = [] if lanePositions is None: nonZeroIndices = ~np.isnan(self.data[dataLabel]) days = self.data[nonZeroIndices]['jours'] dates = self.data[nonZeroIndices]['date_mesure'] measures = self.data[nonZeroIndices][dataLabel] for i in range(1, len(dates)): nDaysTNegative, nDaysThawFreeze, deltaTemp, nConsecutiveFrozenDays, totalRain, totalSnow, snowAboveThreshold, stdevTemp = weatherIndicators(weatherData, dates[i-1], dates[i], snowThreshold, weatherDataType, minProportionMeasures) if dates[i-1].year+1 == dates[i].year: winter = 1 if days[i-1]<365: firstWinter = 1 else: winter = 0 firstWinter = 0 variationData.append([measures[i-1]-measures[i], measures[i-1], days[i]-days[i-1], days[i-1], winter, firstWinter, nDaysTNegative, nDaysThawFreeze] + deltaTemp + [nConsecutiveFrozenDays, totalRain, totalSnow, snowAboveThreshold, stdevTemp]) return variationData