view python/pavement.py @ 446:a65a14c90834

adding weather info to pavement data
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Tue, 04 Feb 2014 02:13:27 -0500
parents b5cc6b001ae6
children 7ef40014236c
line wrap: on
line source

#! /usr/bin/env python
'''Tools for processing and analyzing pavement marking data'''

import numpy as np

__metaclass__ = type

def occ_max(a):
    if a != []:
        s = set(a)
        l = list(s)
        occ = []
        for i in range(len(l)):
            b = 0
            for j in range(len(a)):
                if a[j] == l[i]:
                    b += 1
            occ.append([l[i], b])
        nbre_occs = []
        for i in range(len(occ)):
            nbre_occs.append(occ[i][1])
        return occ[nbre_occs.index(max(nbre_occs))][0]
    else:
        return ""

def caracteristiques(rtss, maintenanceLevel, rtssWeatherStation, fmr, paintType):
    '''Computes characteristic data for the RTSS (class rtss) 
    maintenanceLevel = pylab.csv2rec('C:\Users\Alexandre\Desktop\Projet_maitrise_recherche\BDD_access\\analyse_donnees_deneigement\\exigence_circuits.txt', delimiter = ';')
    rtssWeatherStation = pylab.csv2rec('C:\Users\Alexandre\Desktop\Projet_maitrise_recherche\stations_environnement_canada\\rtssWeatherStation\juste_pour_rtss_avec_donnees_entretien_hiv\\rtssWeatherStation_EC3.txt', delimiter = ',')
    fmr = pylab.csv2rec('C:\Users\Alexandre\Desktop\Projet_maitrise_recherche\BDD_access\\analyse_donnees_deneigement\\fmr.txt', delimiter = ';')
    paintType = pylab.csv2rec('C:\Users\Alexandre\Desktop\Projet_maitrise_recherche\BDD_access\\analyse_donnees_deneigement\\type_peinture.txt', delimiter = ';')
    '''
    # determination exigence deneigement
    if rtss.id in maintenanceLevel['rtss_debut']:
        for i in range(len(maintenanceLevel)):
            if maintenanceLevel['rtss_debut'][i] == rtss.id:
                exigence = maintenanceLevel['exigence'][i]
    else:
        exigence = ''

    # determination x/y
    if rtss.id in rtssWeatherStation['rtss']:
        for i in range(len(rtssWeatherStation)):		
            if rtssWeatherStation['rtss'][i] == rtss.id:
                x_moy = rtssWeatherStation['x_moy'][i]
                y_moy = rtssWeatherStation['y_moy'][i]
    else:
        x_moy, y_moy = '',''	

    # determination info fmr
    age_revtm, classe_fonct, type_revtm, milieu, djma, pourc_camions = [], [], [], [], [], []
    if rtss.id in fmr['rtss_debut']:
        for i in range(len(fmr)):
            if fmr['rtss_debut'][i] == rtss.id:
                age_revtm.append(fmr['age_revtm'][i])
                classe_fonct.append(fmr['des_clasf_fonct'][i])
                type_revtm.append(fmr['des_type_revtm'][i])
                milieu.append(fmr['des_cod_mil'][i])
                djma.append(fmr['val_djma'][i])
                pourc_camions.append(fmr['val_pourc_camns'][i])
        age_revtm = occ_max(age_revtm)
        classe_fonct = occ_max(classe_fonct)
        type_revtm = occ_max(type_revtm)
        milieu = occ_max(milieu)
        djma = occ_max(djma)
        pourc_camions = occ_max(pourc_camions)
        if pourc_camions == "" or pourc_camions < 0:
            djma_camions = ""
        else:
            djma_camions = pourc_camions*djma/100
    else:
        age_revtm, classe_fonct, type_revtm, milieu, djma, djma_camions  = '','','','','',''

    # determination type peinture
    peinture_rd, peinture_rg, peinture_cl = [], [], []
    peinture_lrd, peinture_lrg, peinture_lc = 0,0,0
    if rtss.id in paintType['rtss_debut_orig']:
        for i in range(len(paintType)):
            if paintType['rtss_debut_orig'][i] == rtss.id:
                peinture_rd.append((paintType['peinture_rd'][i]))
                peinture_rg.append((paintType['peinture_rg'][i]))
                peinture_cl.append((paintType['peinture_cl'][i]))
        peinture_lrd = occ_max(peinture_rd)
        peinture_lrg = occ_max(peinture_rg)
        peinture_lc = occ_max(peinture_cl)
    else:
        peinture_lrd, peinture_lrg, peinture_lc = '','',''		

    return (exigence, x_moy, y_moy, age_revtm, classe_fonct, type_revtm, milieu, djma, djma_camions, peinture_lrd, peinture_lrg, peinture_lc)

def winterMaintenanceIndicators(data, startDate, endDate, circuitReference, snowThreshold):
    '''Computes several winter maintenance indicators
    data = entretien_hivernal = pylab.csv2rec('C:\Users\Alexandre\Documents\Cours\Poly\Projet\mesures_entretien_hivernal\mesures_deneigement.txt', delimiter = ',')'''
    import datetime
    somme_eau, somme_neige, somme_abrasif, somme_sel, somme_lc, somme_lrg, somme_lrd, premiere_neige, compteur_somme_abrasif = 0,0,0,0,0,0,0,0,0

    if circuitReference in data['ref_circuit']:
        for i in range(len(data)):
            if data['ref_circuit'][i] == circuitReference and (data['date'][i] + datetime.timedelta(days = 6)) <= endDate and (data['date'][i] + datetime.timedelta(days = 6)) > startDate:
                premiere_neige += data['premiere_neige'][i]
                somme_neige += float(data['neige'][i])
                somme_eau += float(data['eau'][i])
                somme_abrasif += float(data['abrasif'][i])
                somme_sel += float(data['sel'][i])
                somme_lc += float(data['lc'][i])
                somme_lrg += float(data['lrg'][i])
                somme_lrd += float(data['lrd'][i])
                compteur_somme_abrasif += float(data['autre_abrasif_binaire'][i])			
        if compteur_somme_abrasif >= 1:
            autres_abrasifs = 1
        else:
            autres_abrasifs = 0
        if somme_neige < snowThreshold:
            neigeMTQ_sup_seuil = 0
        else:
            neigeMTQ_sup_seuil = 1
    else:
        somme_eau, somme_neige, somme_abrasif, somme_sel, somme_lc, somme_lrg, somme_lrd, premiere_neige, autres_abrasifs, neigeMTQ_sup_seuil = '','','','','','','','','',''

    return (somme_eau, somme_neige, neigeMTQ_sup_seuil, somme_abrasif, somme_sel, somme_lc, somme_lrg, somme_lrd, premiere_neige, autres_abrasifs)

def ecWeatherIndicators(data, startDate, endDate, snowThreshold):
    '''Computes the indicators from Environment Canada files
    (loaded as a recarray using csv2rec in data),
    between start and end dates (datetime.datetime objects)'''
    nbre_jours_T_negatif,nbre_jours_gel_degel,pluie_tot,neige_tot,ecart_type_T = 0,0,0,0,0
    compteur,nbre_jours_gel_consecutifs=0,0
    tmoys = []
    seuils_T = [20,15,10,5]
    deltas_T = [0,0,0,0]
    for i in range(int((endDate - startDate).days)+1):
        if data['tmax'][i] != '' and data['tmax'][i] != None:
            tmax = float(data['tmax'][i].replace(',','.'))
        else:
            tmax = None
        if data['tmin'][i] != '' and data['tmin'][i] != None:
            tmin = float(data['tmin'][i].replace(',','.'))
        if data['pluie_tot'][i] != '' and data['pluie_tot'][i] != None:
            pluie_tot  += float(data['pluie_tot'][i].replace(',','.'))
        if data['neige_tot'][i] != '' and data['neige_tot'][i] != None:
            neige_tot  += float(data['neige_tot'][i].replace(',','.'))
        if tmax != None:
            if tmax < 0:
                nbre_jours_T_negatif += 1
        if tmax != None and data['tmin'][i] != '' and data['tmin'][i] != None:
            if tmax > 0 and tmin < 0:
                nbre_jours_gel_degel += 1
            for l in range(len(seuils_T)):
                if tmax - tmin >=seuils_T[l]:
                    deltas_T[l] += 1
        if data['tmoy'][i] != '' and data['tmoy'][i] != None:
            tmoys.append(float(data['tmoy'][i].replace(',','.')))
        if tmax != None:
            if tmax < 0:
                compteur += 1
            elif tmax >= 0 and compteur >= nbre_jours_gel_consecutifs:
                nbre_jours_gel_consecutifs = compteur
                compteur = 0
            else:
                compteur = 0
        nbre_jours_gel_consecutifs = max(nbre_jours_gel_consecutifs,compteur)
    ecart_type_T = np.std(tmoys)
    if neige_tot < snowThreshold:
        neigeEC_sup_seuil = 0
    else:
        neigeEC_sup_seuil = 1

    return (nbre_jours_T_negatif,nbre_jours_gel_degel, deltas_T, nbre_jours_gel_consecutifs, pluie_tot, neige_tot, neigeEC_sup_seuil, ecart_type_T)


class RTSS:
    'class for data related to a RTSS, including agregating pavement marking measurements'

    def __init__(self, id):
        self.id = id

class MarkingTest:
    '''class for a test site for a given product

    including the series of measurements over the years'''

    def __init__(self, id, paintingDate, paintingType, color, data):
        self.id = id
        self.paintingDate = paintingDate
        self.paintingType = paintingType
        self.color = color
        self.data = data
        self.nMeasures = len(data)

    def getSite(self):
        return int(self.id[:2])

    def getTestAttributes(self):
        return [self.paintingType, self.color]

    def plot(self, measure, options = 'o', dayRatio = 1., **kwargs):
        from matplotlib.pyplot import plot
        plot(self.data['jours']/float(dayRatio), 
             self.data[measure], options, **kwargs)

    def getMarkingMeasures(self, dataLabel):
        from numpy import isnan
        nonZeroIndices = ~isnan(self.data[dataLabel])
        return self.data[nonZeroIndices]['jours'], self.data[nonZeroIndices][dataLabel]

    def plotMarkingMeasures(self, measure, options = 'o', dayRatio = 1., **kwargs):
        for i in range(1,7):
            self.plot('{}_{}'.format(measure, i), options, dayRatio, **kwargs)

    def computeMarkingMeasureVariations(self, dataLabel, lanePositions, weatherData, snowThreshold):
        '''Computes for each successive measurement
        lanePositions = None
        measure variation, initial measure, time duration, weather indicators
        
        TODO if measurements per lane, add a variable for lane position (position1 to 6)
        lanePositions = list of integers (range(1,7))
        measure variation, initial measure, time duration, lane position1, weather indicators
        measure variation, initial measure, time duration, lane position2, weather indicators
        ...'''
        from numpy import isnan
        variationData = []
        if lanePositions == None:
            nonZeroIndices = ~isnan(self.data[dataLabel])
            days = self.data[nonZeroIndices]['jours']
            dates = self.data[nonZeroIndices]['date_mesure']
            measures = self.data[nonZeroIndices][dataLabel]
            for i in range(1, len(dates)):
                nDaysTNegative, nDaysThawFreeze, deltaTemp, nConsecutiveFrozenDays, totalRain, totalSnow, snowAboveThreshold, stdevTemp = ecWeatherIndicators(weatherData, dates[i-1], dates[i], snowThreshold)
                variationData.append([measures[i-1]-measures[i], measures[i-1], days[i]-days[i-1], nDaysTNegative, nDaysThawFreeze] + deltaTemp + [nConsecutiveFrozenDays, totalRain, totalSnow, snowAboveThreshold, stdevTemp])
        return variationData