Mercurial Hosting > traffic-intelligence
diff trafficintelligence/pavement.py @ 1028:cc5cb04b04b0
major update using the trafficintelligence package name and install through pip
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Fri, 15 Jun 2018 11:19:10 -0400 |
parents | python/pavement.py@933670761a57 |
children | c6cf75a2ed08 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/trafficintelligence/pavement.py Fri Jun 15 11:19:10 2018 -0400 @@ -0,0 +1,313 @@ +#! /usr/bin/env python +'''Tools for processing and analyzing pavement marking data''' + +from trafficintelligence import utils + +import numpy as np + + +paintTypes = {0: "Non-existant", + 1: "Eau", + 2: "Epoxy", + 3: "Alkyde", + 4: "Autre"} + +durabilities = {1: 98, #96 to 100 + 2: 85, #75 to 96 + 3: 62, #50 to 75 + 4: 32, #15 to 50 + 5: 7 #0 to 15 + } + +roadFunctionalClasses = {40: "Collectrice", + 20: "Nationale", + 30: "Regionale", + 10: "Autoroute", + 60: "Acces ressources", + 51: "Local 1", + 52: "Local 2", + 53: "Local 3", + 15: "Aut (PRN)", + 25: "Nat (PRN)", + 70: "Acces isolees", + 99: "Autres"} + +def caracteristiques(rtss, maintenanceLevel, rtssWeatherStation, fmr, paintType): + '''Computes characteristic data for the RTSS (class rtss) + maintenanceLevel = pylab.csv2rec('C:\\Users\Alexandre\Desktop\Projet_maitrise_recherche\BDD_access\\analyse_donnees_deneigement\\exigence_circuits.txt', delimiter = ';') + rtssWeatherStation = pylab.csv2rec('C:\\Users\Alexandre\Desktop\Projet_maitrise_recherche\stations_environnement_canada\\rtssWeatherStation\juste_pour_rtss_avec_donnees_entretien_hiv\\rtssWeatherStation_EC3.txt', delimiter = ',') + fmr = pylab.csv2rec('C:\\Users\Alexandre\Desktop\Projet_maitrise_recherche\BDD_access\\analyse_donnees_deneigement\\fmr.txt', delimiter = ';') + paintType = pylab.csv2rec('C:\\Users\Alexandre\Desktop\Projet_maitrise_recherche\BDD_access\\analyse_donnees_deneigement\\type_peinture.txt', delimiter = ';') + ''' + # determination exigence deneigement + if rtss.id in maintenanceLevel['rtss_debut']: + for i in range(len(maintenanceLevel)): + if maintenanceLevel['rtss_debut'][i] == rtss.id: + exigence = maintenanceLevel['exigence'][i] + else: + exigence = '' + + # determination x/y + if rtss.id in rtssWeatherStation['rtss']: + for i in range(len(rtssWeatherStation)): + if rtssWeatherStation['rtss'][i] == rtss.id: + x_moy = rtssWeatherStation['x_moy'][i] + y_moy = rtssWeatherStation['y_moy'][i] + else: + x_moy, y_moy = '','' + + # determination info fmr + age_revtm, classe_fonct, type_revtm, milieu, djma, pourc_camions, vit_max = [], [], [], [], [], [], [] + if rtss.id in fmr['rtss_debut']: + for i in range(len(fmr)): + if fmr['rtss_debut'][i] == rtss.id: + age_revtm.append(fmr['age_revtm'][i]) + classe_fonct.append(fmr['des_clasf_fonct'][i]) + type_revtm.append(fmr['des_type_revtm'][i]) + milieu.append(fmr['des_cod_mil'][i]) + djma.append(fmr['val_djma'][i]) + pourc_camions.append(fmr['val_pourc_camns'][i]) + vit_max.append(fmr['val_limt_vitss'][i]) + age_revtm = utils.mostCommon(age_revtm) + classe_fonct = utils.mostCommon(classe_fonct) + type_revtm = utils.mostCommon(type_revtm) + milieu = utils.mostCommon(milieu) + djma = utils.mostCommon(djma) + vit_max = utils.mostCommon(vit_max) + if vit_max < 0: + vit_max = '' + pourc_camions = utils.mostCommon(pourc_camions) + if pourc_camions == "" or pourc_camions < 0: + djma_camions = "" + else: + djma_camions = pourc_camions*djma/100 + else: + age_revtm, classe_fonct, type_revtm, milieu, djma, djma_camions, vit_max = '','','','','','','' + + # determination type peinture + peinture_rd, peinture_rg, peinture_cl = [], [], [] + peinture_lrd, peinture_lrg, peinture_lc = 0,0,0 + if rtss.id in paintType['rtss_debut_orig']: + for i in range(len(paintType)): + if paintType['rtss_debut_orig'][i] == rtss.id: + peinture_rd.append((paintType['peinture_rd'][i])) + peinture_rg.append((paintType['peinture_rg'][i])) + peinture_cl.append((paintType['peinture_cl'][i])) + peinture_lrd = utils.mostCommon(peinture_rd) + peinture_lrg = utils.mostCommon(peinture_rg) + peinture_lc = utils.mostCommon(peinture_cl) + else: + peinture_lrd, peinture_lrg, peinture_lc = '','','' + + return (exigence, x_moy, y_moy, age_revtm, classe_fonct, type_revtm, milieu, djma, djma_camions, vit_max, peinture_lrd, peinture_lrg, peinture_lc) + +def winterMaintenanceIndicators(data, startDate, endDate, circuitReference, snowThreshold): + '''Computes several winter maintenance indicators + data = entretien_hivernal = pylab.csv2rec('C:\\Users\Alexandre\Documents\Cours\Poly\Projet\mesures_entretien_hivernal\mesures_deneigement.txt', delimiter = ',')''' + import datetime + somme_eau, somme_neige, somme_abrasif, somme_sel, somme_lc, somme_lrg, somme_lrd, compteur_premiere_neige, compteur_somme_abrasif = 0,0,0,0,0,0,0,0,0 + + if circuitReference in data['ref_circuit']: + for i in range(len(data)): + if data['ref_circuit'][i] == circuitReference and (data['date'][i] + datetime.timedelta(days = 6)) <= endDate and (data['date'][i] + datetime.timedelta(days = 6)) > startDate: + compteur_premiere_neige += float(data['premiere_neige'][i]) + somme_neige += float(data['neige'][i]) + somme_eau += float(data['eau'][i]) + somme_abrasif += float(data['abrasif'][i]) + somme_sel += float(data['sel'][i]) + somme_lc += float(data['lc'][i]) + somme_lrg += float(data['lrg'][i]) + somme_lrd += float(data['lrd'][i]) + compteur_somme_abrasif += float(data['autre_abrasif_binaire'][i]) + if compteur_premiere_neige >= 1: + premiere_neige = 1 + else: + premiere_neige = 0 + if compteur_somme_abrasif >= 1: + autres_abrasifs = 1 + else: + autres_abrasifs = 0 + if somme_neige < snowThreshold: + neigeMTQ_sup_seuil = 0 + else: + neigeMTQ_sup_seuil = 1 + else: + somme_eau, somme_neige, somme_abrasif, somme_sel, somme_lc, somme_lrg, somme_lrd, premiere_neige, autres_abrasifs, neigeMTQ_sup_seuil = '','','','','','','','','','' + + return (somme_eau, somme_neige, neigeMTQ_sup_seuil, somme_abrasif, somme_sel, somme_lc, somme_lrg, somme_lrd, premiere_neige, autres_abrasifs) + +def weatherIndicators(data, startDate, endDate, snowThreshold, weatherDatatype, minProportionMeasures = 0.): + '''Computes the indicators from Environment Canada files + (loaded as a recarray using csv2rec in data), + between start and end dates (datetime.datetime objects) + + weatherDataType is to indicate Environnement Canada data ('ec') or else MTQ + minProportionMeasures is proportion of measures necessary to consider the indicators''' + from matplotlib.mlab import find + nbre_jours_T_negatif,nbre_jours_gel_degel,pluie_tot,neige_tot,ecart_type_T = 0,0,0,0,0 + compteur,nbre_jours_gel_consecutifs=0,0 + tmoys = [] + seuils_T = [20,15,10,5] + deltas_T = [0,0,0,0] + startIndex = find(data['date'] == startDate) + nDays = int((endDate - startDate).days)+1 + if len(startIndex) > 0 and startIndex+nDays <= len(data): + startIndex = startIndex[0] + for i in range(startIndex, startIndex+nDays): + if not np.isnan(data['tmax'][i]): + tmax = data['tmax'][i] + else: + tmax = None + if not np.isnan(data['tmin'][i]): + tmin = data['tmin'][i] + else: + tmin = None + if weatherDatatype == 'ec': + if data['pluie_tot'][i] is not None and not np.isnan(data['pluie_tot'][i]): + pluie_tot += data['pluie_tot'][i] + if data['neige_tot'][i] is not None and not np.isnan(data['neige_tot'][i]): + neige_tot += data['neige_tot'][i] + if tmax is not None: + if tmax < 0: + nbre_jours_T_negatif += 1 + if tmax is not None and tmin is not None: + if tmax > 0 and tmin < 0: + nbre_jours_gel_degel += 1 + for l in range(len(seuils_T)): + if tmax - tmin >=seuils_T[l]: + deltas_T[l] += 1 + if not np.isnan(data['tmoy'][i]): + tmoys.append(data['tmoy'][i]) + if tmax is not None: + if tmax < 0: + compteur += 1 + elif tmax >= 0 and compteur >= nbre_jours_gel_consecutifs: + nbre_jours_gel_consecutifs = compteur + compteur = 0 + else: + compteur = 0 + nbre_jours_gel_consecutifs = max(nbre_jours_gel_consecutifs,compteur) + if len(tmoys) > 0 and float(len(tmoys))/nDays >= minProportionMeasures: + if tmoys != []: + ecart_type_T = np.std(tmoys) + else: + ecart_type = None + if neige_tot < snowThreshold: + neigeEC_sup_seuil = 0 + else: + neigeEC_sup_seuil = 1 + return (nbre_jours_T_negatif,nbre_jours_gel_degel, deltas_T, nbre_jours_gel_consecutifs, pluie_tot, neige_tot, neigeEC_sup_seuil, ecart_type_T) + else: + return [None]*2+[[None]*len(seuils_T)]+[None]*5 + +def mtqWeatherIndicators(data, startDate, endDate,tmax,tmin,tmoy): + print("Deprecated, use weatherIndicators") + from matplotlib.mlab import find + nbre_jours_T_negatif,nbre_jours_gel_degel,ecart_type_T = 0,0,0 + compteur,nbre_jours_gel_consecutifs=0,0 + tmoys = [] + seuils_T = [20,15,10,5] + deltas_T = [0,0,0,0] + startIndex = find(data['date'] == startDate) + nDays = (endDate - startDate).days+1 + for i in range(startIndex, startIndex+nDays): + if tmax[i] < 0: + nbre_jours_T_negatif += 1 + if tmax[i] > 0 and tmin[i] < 0: + nbre_jours_gel_degel += 1 + for l in range(len(seuils_T)): + if tmax[i] - tmin[i] >=seuils_T[l]: + deltas_T[l] += 1 + tmoys.append(tmoy[i]) + if tmax[i] < 0: + compteur += 1 + elif tmax[i] >= 0 and compteur >= nbre_jours_gel_consecutifs: + nbre_jours_gel_consecutifs = compteur + compteur = 0 + else: + compteur = 0 + nbre_jours_gel_consecutifs = max(nbre_jours_gel_consecutifs,compteur) + if tmoys != []: + ecart_type_T = np.std(tmoys) + else: + ecart_type = None + + return (nbre_jours_T_negatif,nbre_jours_gel_degel, deltas_T, nbre_jours_gel_consecutifs, ecart_type_T) + +class RTSS(object): + '''class for data related to a RTSS: + - agregating pavement marking measurements + - RTSS characteristics from FMR: pavement type, age, AADT, truck AADT + - winter maintenance level from V155 + + If divided highway, the RTSS ends with G or D and are distinct: there is no ambiguity + - retroreflectivity types: there are CB, RJ and RB + If undivided, ending with C + - durability is fine: ETAT_MARQG_RG ETAT_MARQG_CL ETAT_MARQG_RD (+SG/SD, but recent) + - retroreflectivity: CJ is center line, RB and SB are left/right if DEBUT-FIN>0 or <0 + ''' + + def __init__(self, _id, name, data): + self.id = _id + self.name = name + self.data = data + +class MarkingTest(object): + '''class for a test site for a given product + + including the series of measurements over the years''' + + def __init__(self, _id, paintingDate, paintingType, color, data): + self.id = _id + self.paintingDate = paintingDate + self.paintingType = paintingType + self.color = color + self.data = data + self.nMeasures = len(data) + + def getSite(self): + return int(self.id[:2]) + + def getTestAttributes(self): + return [self.paintingType, self.color, self.paintingDate.year] + + def plot(self, measure, options = 'o', dayRatio = 1., **kwargs): + from matplotlib.pyplot import plot + plot(self.data['jours']/float(dayRatio), + self.data[measure], options, **kwargs) + + def getMarkingMeasures(self, dataLabel): + nonZeroIndices = ~np.isnan(self.data[dataLabel]) + return self.data[nonZeroIndices]['jours'], self.data[nonZeroIndices][dataLabel] + + def plotMarkingMeasures(self, measure, options = 'o', dayRatio = 1., **kwargs): + for i in range(1,7): + self.plot('{}_{}'.format(measure, i), options, dayRatio, **kwargs) + + def computeMarkingMeasureVariations(self, dataLabel, lanePositions, weatherData, snowThreshold, weatherDataType = 'ec', minProportionMeasures = 0.): + '''Computes for each successive measurement + lanePositions = None + measure variation, initial measure, time duration, weather indicators + + TODO if measurements per lane, add a variable for lane position (position1 to 6) + lanePositions = list of integers (range(1,7)) + measure variation, initial measure, time duration, lane position1, weather indicators + measure variation, initial measure, time duration, lane position2, weather indicators + ...''' + variationData = [] + if lanePositions is None: + nonZeroIndices = ~np.isnan(self.data[dataLabel]) + days = self.data[nonZeroIndices]['jours'] + dates = self.data[nonZeroIndices]['date_mesure'] + measures = self.data[nonZeroIndices][dataLabel] + for i in range(1, len(dates)): + nDaysTNegative, nDaysThawFreeze, deltaTemp, nConsecutiveFrozenDays, totalRain, totalSnow, snowAboveThreshold, stdevTemp = weatherIndicators(weatherData, dates[i-1], dates[i], snowThreshold, weatherDataType, minProportionMeasures) + if dates[i-1].year+1 == dates[i].year: + winter = 1 + if days[i-1]<365: + firstWinter = 1 + else: + winter = 0 + firstWinter = 0 + variationData.append([measures[i-1]-measures[i], measures[i-1], days[i]-days[i-1], days[i-1], winter, firstWinter, nDaysTNegative, nDaysThawFreeze] + deltaTemp + [nConsecutiveFrozenDays, totalRain, totalSnow, snowAboveThreshold, stdevTemp]) + return variationData