时间序列预测法及Spark-TimeSerial实现

时间序列预测法及Spark-Timeserial

时间序列预测法

时间序列预测法(Time Series Forecasting Method)

什么是时间序列预测法?

​ 一种历史资料延伸预测,也称历史引伸预测法。是以时间数列所能反映的社会经济现象的发展过程和规律性,进行引伸外推,预测其发展趋势的方法。

时间序列,也叫时间数列、历史复数或动态数列。它是将某种统计指标的数值,按时间先后顺序排到所形成的数列。时间序列预测法就是通过编制和分析时间序列,根据时间序列所反映出来的发展过程、方向和趋势,进行类推或延伸,借以预测下一段时间或以后若干年内可能达到的水平。其内容包括:收集与整理某种社会现象的历史资料;对这些资料进行检查鉴别,排成数列;分析时间数列,从中寻找该社会现象随时间变化而变化的规律,得出一定的模式;以此模式去预测该社会现象将来的情况。

时间序列预测法的步骤

第一步 收集历史资料,加以整理,编成时间序列,并根据时间序列绘成统计图。时间序列分析通常是把各种可能发生作用的因素进行分类,传统的分类方法是按各种因素的特点或影响效果分为四大类:(1)长期趋势;(2)季节变动;(3)循环变动;(4)不规则变动。

第二步 分析时间序列。时间序列中的每一时期的数值都是由许许多多不同的因素同时发生作用后的综合结果。

第三步 求时间序列的长期趋势(T)季节变动(s)和不规则变动(I)的值,并选定近似的数学模式来代表它们。对于数学模式中的诸未知参数,使用合适的技术方法求出其值。

第四步 利用时间序列资料求出长期趋势、季节变动和不规则变动的数学模型后,就可以利用它来预测未来的长期趋势值T和季节变动值s,在可能的情况下预测不规则变动值I。然后用以下模式计算出未来的时间序列的预测值Y:

加法模式T+S+I=Y

乘法模式T×S×I=Y

如果不规则变动的预测值难以求得,就只求长期趋势和季节变动的预测值,以两者相乘之积或相加之和为时间序列的预测值。如果经济现象本身没有季节变动或不需预测分季分月的资料,则长期趋势的预测值就是时间序列的预测值,即T=Y。但要注意这个预测值只反映现象未来的发展趋势,即使很准确的趋势线在按时间顺序的观察方面所起的作用,本质上也只是一个平均数的作用,实际值将围绕着它上下波动。

时间序列分析基本特征

​ 1.时间序列分析法是根据过去的变化趋势预测未来的发展,它的前提是假定事物的过去延续到未来。

时间序列分析,正是根据客观事物发展的连续规律性,运用过去的历史数据,通过统计分析,进一步推测未来的发展趋势。事物的过去会延续到未来这个假设前提包含两层含义:一是不会发生突然的跳跃变化,是以相对小的步伐前进;二是过去和当前的现象可能表明现在和将来活动的发展变化趋向。这就决定了在一般情况下,时间序列分析法对于短、近期预测比较显著,但如延伸到更远的将来,就会出现很大的局限性,导致预测值偏离实际较大而使决策失误。

2.时间序列数据变动存在着规律性与不规律性

时间序列中的每个观察值大小,是影响变化的各种不同因素在同一时刻发生作用的综合结果。从这些影响因素发生作用的大小和方向变化的时间特性来看,这些因素造成的时间序列数据的变动分为四种类型。

(1)趋势性:某个变量随着时间进展或自变量变化,呈现一种比较缓慢而长期的持续上升、下降、停留的同性质变动趋向,但变动幅度可能不相等。

(2)周期性:某因素由于外部影响随着自然季节的交替出现高峰与低谷的规律。

(3)随机性:个别为随机变动,整体呈统计规律。

(4)综合性:实际变化情况是几种变动的叠加或组合。预测时设法过滤除去不规则变动,突出反映趋势性和周期性变动。

时间序列预测法的分类

时间序列预测法可用于短期预测中期预测长期预测。根据对资料分析方法的不同,又可分为:简单序时平均数法加权序时平均数法移动平均法加权移动平均法趋势预测法指数平滑法季节性趋势预测法市场寿命周期预测法等。

简单序时平均数法 也称算术平均法。即把若干历史时期的统计数值作为观察值,求出算术平均数作为下期预测值。这种方法基于下列假设:“过去这样,今后也将这样”,把近期和远期数据等同化和平均化,因此只能适用于事物变化不大的趋势预测。如果事物呈现某种上升或下降的趋势,就不宜采用此法。

加权序时平均数法 就是把各个时期的历史数据按近期和远期影响程度进行加权,求出平均值,作为下期预测值。

简单移动平均法 就是相继移动计算若干时期的算术平均数作为下期预测值。

加权移动平均法 即将简单移动平均数进行加权计算。在确定权数时,近期观察值的权数应该大些,远期观察值的权数应该小些。

上述几种方法虽然简便,能迅速求出预测值,但由于没有考虑整个社会经济发展的新动向和其他因素的影响,所以准确性较差。应根据新的情况,对预测结果作必要的修正。

指数平滑法 即根据历史资料的上期实际数和预测值,用指数加权的办法进行预测。此法实质是由内加权移动平均法演变而来的一种方法,优点是只要有上期实际数和上期预测值,就可计算下期的预测值,这样可以节省很多数据和处理数据的时间,减少数据的存储量,方法简便。是国外广泛使用的一种短期预测方法。

季节趋势预测法 根据经济事物每年重复出现的周期性季节变动指数,预测其季节性变动趋势。推算季节性指数可采用不同的方法,常用的方法有季(月)别平均法和移动平均法两种:a.季(月)别平均法。就是把各年度的数值分季(或月)加以平均,除以各年季(或月)的总平均数,得出各季(月)指数。这种方法可以用来分析生产、销售、原材料储备、预计资金周转需要量等方面的经济事物的季节性变动;b.移动平均法。即应用移动平均数计算比例求典型季节指数。

市场寿命周期预测法 就是对产品市场寿命周期的分析研究。例如对处于成长期的产品预测其销售量,最常用的一种方法就是根据统计资料,按时间序列画成曲线图,再将曲线外延,即得到未来销售发展趋势。最简单的外延方法是直线外延法,适用于对耐用消费品的预测。这种方法简单、直观、易于掌握。

时间序列预测法

1.逐步自回归(StepAR)模型:StepAR模型是有趋势、季节因素数据的模型类。

2.Winters Method—Additive模型:它是将时势和乘法季节因素相结合,考虑序列中有规律节波动。

3.ARlMA模型:它是处理带有趋势、季节因平稳随机项数据的模型类[3]

4.Winters Method—Muhiplicative模型:该方将时同趋势和乘法季节因素相结合,考虑序列规律的季节波动。时间趋势模型可根据该序列律的季节波动对该趋势进行修正。为了能捕捉到季节性,趋势模型包含每个季节的一个季节参季节因子采用乘法季节因子。随机时间序列
《时间序列预测法及Spark-TimeSerial实现》 x_t(t=1,2,\ldots,N,N=n)

整理汇总历史上各类保险的数据得到逐月的数据,Winters Method-Multiplicative模型表示为

xt = (a + bt)s(t) + εt  (1)

其中a和b为趋势参数,s(t)为对应于时刻t的这个季节选择的季节参数,修正方程为。

《时间序列预测法及Spark-TimeSerial实现》 a_t=\omega_1\frac{x_t}{s{t-1}(t)}+(1-\omega_1)(a_{t-1}+b_{t-1})

bt = ω2(atat − 1) + (1 − ω2)bt − 1  (2)

其中:xt,at,bt,分别为序列在时刻t的实测值、平滑值和平滑趋势s{t-1}(t)选择在季节因子被修正之前对应于时刻t的季节因子的过去值。

在该修正系统中,趋势多项式在当前周期中总是被中心化,以便在t以后的时间里预报值的趋势多项式的截距参数总是修正后的截距参数at。向前τ个周期的预报值是。

xt + τ = (at + btτ)st(t + τ)(3)

当季节在数据中改变时季节参数被修正,它使用季节实测值与预报值比率的平均值。

5.GARCH(ARCH)模型

带自相关扰动的回归模型为。

xt = ξtβ + vt

《时间序列预测法及Spark-TimeSerial实现》 v_t=\epsilon_t-\varphi_1v_{t-1}-\ldots-\varphi_n v_{t-n}

,

εt = N(0,σ2)  (4)

其中:xt为因变量;ξt为回归因子构成的列向量;\beta为结构参数构成的列向量;εt为均值是0、方差是一的独立同分布正态随机变量。

服从GARCH过程的序列xt,对于t时刻X的条件分布记为

xt | φt − 1˜N(0,ht)  (5)

其中\phi_{t-1}表示时间t-1前的所有可用信息,条件方差。

《时间序列预测法及Spark-TimeSerial实现》 h_t=\tilde{\omega}+\sum_{i=1}^p a_ix^2_{t-i}+\sum_{j=1}\gamma_j h_{t-j}

(6)。

其中p≥0,q>0,
《时间序列预测法及Spark-TimeSerial实现》 \tilde{\omega}\ge0

,
《时间序列预测法及Spark-TimeSerial实现》 \gamma_j\ge0 。

当p=0时,GARCH(p,q)模型退化为ARCH(p)模型,ARCH参数至少要有一个不为0。

GARCH回归模型可写成

《时间序列预测法及Spark-TimeSerial实现》 x_t=\xi^\prime_t\beta+\epsilon_t,\epsilon_t=\sqrt{h_t e_t}

,

《时间序列预测法及Spark-TimeSerial实现》 h_t=\tilde{\omega}+\sum_{a_i}\epsilon^2_{t-i}+\sum\gamma_j h_{t-j}

et˜ N(0,1)  (7)

也可以考虑服从自回归过程的扰动或带有GARCH误差的模型,即AR(n)-GARCH(p,q)。

《时间序列预测法及Spark-TimeSerial实现》 x_t==\xi^\prime_t\beta+v_t

,

《时间序列预测法及Spark-TimeSerial实现》 v_t=\epsilon_t-\varphi_1v_{t-1}-\ldots-\varphi_n v_{t-n}

,

《时间序列预测法及Spark-TimeSerial实现》 \epsilon_t=\sqrt{h_t e_t}
《时间序列预测法及Spark-TimeSerial实现》 h_t=\xi^\prime_t+\sum_{i=1}^q a_i\epsilon^2_{t-i}+\sum\gamma_j h_{t-j}

  (8)

其中三次平滑指数(HoltWinters)http://www.dataguru.cn/article-3235-1.html

该部分详细介绍见

http://wiki.mbalib.com/wiki/%E6%97%B6%E9%97%B4%E5%BA%8F%E5%88%97%E9%A2%84%E6%B5%8B%E6%B3%95

Spark-TimeSerial

spark里面的库是没有时间序列算法的,但是国外有人已经写好了相应的算法。其github网址是:https://github.com/sryza/spark-timeseries

spark-timeserial介绍:https://yq.aliyun.com/articles/70292

实例:http://blog.csdn.net/qq_30232405/article/details/70622400

实际应用。

数据格式(处理过后的):每5分钟一个值;

《时间序列预测法及Spark-TimeSerial实现》

预测代码:

/**
  * Created by ${lyp} on 2017/6/21.
  */


case class PV(time:String,key:String,ct: Double );

object RunModel {
  val starttime="2017-01-01 00:00:00"
  val endtime= "2017-05-31 23:55:00"
  val predictedN=288
  val outputTableName=""
  val modelName="holtwinters"
  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  val hiveColumnName=List("time","key","ct")

  def main(args: Array[String]): Unit = {


    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val conf= new SparkConf().setAppName("timeserial").setMaster("local")
    val sc= new SparkContext(conf)
    val sqlContext=new SQLContext(sc)

    import sqlContext.implicits._

    //create dataframe
    val trainData=getData(sc,sqlContext,"src/main/resource/data.csv")
    //val vertifyData=getData(sc,sqlContext,"src/main/resource/data2.csv")
    trainData.show()
    //vertifyData.show()


    //create DateTimeIndex
    val zone = ZoneId.systemDefault()
    var dtIndex:UniformDateTimeIndex=DateTimeIndex.uniformFromInterval(
      ZonedDateTime.of(2017, 1, 1, 0, 0, 0, 0, zone),
      ZonedDateTime.of(2017, 5, 31, 23, 55, 0, 0, zone),
      new MinuteFrequency(5)
    )

    //create TimeSeriesRDD
    val trainTsrdd=TimeSeriesRDD.timeSeriesRDDFromObservations(dtIndex,trainData,hiveColumnName(0),hiveColumnName(1), hiveColumnName(2))
    trainTsrdd.foreach(println(_))

    //cache
    trainTsrdd.cache()

    //add absent value "linear", "nearest", "next", or "previous"
    val filledTrainTsrdd=trainTsrdd.fill("linear")

    //create model
    val timeSeriesModel= new TimeSeriesModel(predictedN)

    //train model
    val forcast=modelName match {
      case "arima"=>{
        println("begin train")
        val (forecast,coefficients)=timeSeriesModel.arimaModelTrain(filledTrainTsrdd)
        forecast

      }
      case "holtwinters"=>{
        //季节性参数(12或者4)
        val period=288*30
        //holtWinters选择模型:additive(加法模型)、Multiplicative(乘法模型)
        val holtWintersModelType="Multiplicative"
        val (forecast,sse)=timeSeriesModel.holtWintersModelTrain(filledTrainTsrdd,period,holtWintersModelType)
        forecast
      }
      case _=>throw new UnsupportedOperationException("Currently only supports 'ariam' and 'holtwinters")
    }

    val time=timeSeriesModel.productStartDatePredictDate(predictedN,endtime,endtime)

    forcast.map{
      row=>
        val key=row._1
        val values=row._2.toArray.mkString(",")
        (key,values)
    }.flatMap(row=>row._2.split(","))saveAsTextFile("src/main/resource/30Multiplicative")
  }


  def getTrainData(sqlContext:SQLContext):DataFrame={
    val data=sqlContext.sql(
      s"""
         |select time, 'key' as  key, ct from tmp_music.tmp_lyp_nginx_result_ct2 where time between ${starttime} and ${endtime}
       """.stripMargin)
    data
  }

  def getData(sparkContext: SparkContext,sqlContext:SQLContext,path:String):DataFrame={
    val data=sparkContext.textFile(path).map(line=>line.split(",")).map{
      line=>
        val time =sdf.parse(line(0))
        val timestamp= new Timestamp(time.getTime)
        Row(timestamp,line(1),line(2).toDouble)
    }

    val field=Seq(
      StructField(hiveColumnName(0), TimestampType, true),
      StructField(hiveColumnName(1), StringType, true),
      StructField(hiveColumnName(2), DoubleType, true)
    )
    val schema=StructType(field)
    val zonedDateDataDf=sqlContext.createDataFrame(data,schema)
    zonedDateDataDf
  }
}
/**
  * 时间序列模型
  * Created by Administrator on 2017/4/19.
  */
class TimeSeriesModel extends Serializable{

  //预测后面N个值
  private var predictedN=1
  //存放的表名字
  private var outputTableName="timeseries_output"

  def this(predictedN:Int){
    this()
    this.predictedN=predictedN
  }

  case class Coefficient(coefficients: String,p: String,d: String,q:String,logLikelihoodCSS:String,arc:String);
  /**
    * Arima模型:
    * 输出其p,d,q参数
    * 输出其预测的predictedN个值
    * @param trainTsrdd
    */
  def arimaModelTrain(trainTsrdd:TimeSeriesRDD[String]): (RDD[(String,Vector)],RDD[(String,Coefficient)])={
    val predictedN=this.predictedN

    //train model
    val arimaAndVectorRdd=trainTsrdd.map{line=>
      line match {
        case (key,denseVector)=>
          (key,ARIMA.autoFit(denseVector),denseVector)
      }
    }

    /**参数输出:p,d,q的实际值和其系数值、最大似然估计值、aic值**/
    val coefficients=arimaAndVectorRdd.map{line=>
      line match{
        case (key,arimaModel,denseVector)=>{
          val coefficients=arimaModel.coefficients.mkString(",")
          val p=arimaModel.p.toString
          val d=arimaModel.d.toString
          val q=arimaModel.q.toString
          val logLikelihoodCSS=arimaModel.logLikelihoodCSS(denseVector).toString
          val arc=arimaModel.approxAIC(denseVector).toString
          (key,Coefficient(coefficients,p,d,q,logLikelihoodCSS,arc))
        }
      }
    }

    //print coefficients
    coefficients.collect().map{f=>
      val key=f._1
      val coefficients=f._2
      println(key+" coefficients:"+coefficients.coefficients+"=>"+"(p="+coefficients.p+",d="+coefficients.d+",q="+coefficients.q+")"
        +"logLikelihoodCSS:"+coefficients.logLikelihoodCSS+" arc:"+coefficients.arc)
    }

    //predict
    val forecast= arimaAndVectorRdd.map{
      row=>
        val key=row._1
        val model=row._2
        val denseVector=row._3
        (key,model.forecast(denseVector,predictedN))
    }

    //print predict
    val forecastValue=forecast.map{
      _ match{
        case (key,value)=>{
          val partArray=value.toArray.mkString(",").split(",")
          var forecastArrayBuffer=new ArrayBuffer[Double]()
          var i=partArray.length-predictedN
          while(i<partArray.length){
            forecastArrayBuffer+=partArray(i).toDouble
            i=i+1
          }
          (key,Vectors.dense(forecastArrayBuffer.toArray))
        }
      }
    }
    println("Arima forecast of next "+predictedN+" observations:")
    forecastValue.foreach(println)

    //return forecastValue & coefficients
    (forecastValue,coefficients)
  }



  /**
    *实现HoltWinters模型
    * @param trainTsrdd
    */
  def holtWintersModelTrain(trainTsrdd:TimeSeriesRDD[String],period:Int,holtWintersModelType:String): (RDD[(String,Vector)],RDD[(String,Double)]) ={

    //set parms
    val predictedN=this.predictedN

    //create model
    val holtWintersAndVectorRdd=trainTsrdd.map{
      row=>
        val key=row._1
        val denseVector=row._2
        //ts: Vector, period: Int, modelType: String = "additive", method: String = "BOBYQA"
        val model=HoltWinters.fitModel(denseVector,period,holtWintersModelType)
        (key,model,denseVector)
    }

    //create dist vector
    val predictedArrayBuffer=new ArrayBuffer[Double]()
    var i=0
    while(i<predictedN){
      predictedArrayBuffer+=i
      i=i+1
    }
    val predictedVectors=Vectors.dense(predictedArrayBuffer.toArray)

    //predict
    val forecast=holtWintersAndVectorRdd.map{
      row=>
        val key=row._1
        val model=row._2
        val denseVector=row._3
        val forcaset=model.forecast(denseVector,predictedVectors)
        (key,forcaset)
    }

    //print predict
    println("HoltWinters forecast of next "+predictedN+" observations:")
    forecast.foreach(println)

    //sse- to get sum of squared errors
    val sse=holtWintersAndVectorRdd.map{
      row=>
        val key=row._1
        val model=row._2
        val vector=row._3
        (key,model.sse(vector))
    }
    return (forecast,sse)
  }



  /**
    * 批量生成日期(具体到分钟秒),用来保存
    * 格式为:yyyy-MM-dd HH:mm:ss
    * @param predictedN
    * @param startTime
    * @param endTime
    */
  def productStartDatePredictDate(predictedN:Int,startTime:String,endTime:String): ArrayBuffer[String] ={
    //形成开始start到预测predicted的日期
    var dateArrayBuffer=new ArrayBuffer[String]()
    val dateFormat= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    val cal1 = Calendar.getInstance()
    val cal2 = Calendar.getInstance()
    val st=dateFormat.parse(startTime)
    val et=dateFormat.parse(endTime)
    //设置训练数据中开始和结束日期
    cal1.set(st.getYear,st.getMonth,st.getDay,st.getHours,st.getMinutes,st.getSeconds)
    cal2.set(et.getYear,et.getMonth,et.getDay,et.getHours,et.getMinutes,et.getSeconds)
    //间隔差
    val minuteDiff=(cal2.getTimeInMillis-cal1.getTimeInMillis)/ (1000 * 60 * 5)+predictedN

    var iMinuteDiff = 0;
    while(iMinuteDiff<=minuteDiff){
      cal1.add(Calendar.MINUTE,5)
      //保存日期
      dateArrayBuffer+=dateFormat.format(cal1.getTime)
      iMinuteDiff=iMinuteDiff+1;

    }
    dateArrayBuffer
  }
}

Holtwinters实现

/**
 * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
 *
 * Cloudera, Inc. licenses this file to you under the Apache License,
 * Version 2.0 (the "License"). You may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
 * CONDITIONS OF ANY KIND, either express or implied. See the License for
 * the specific language governing permissions and limitations under the
 * License.
 */

package com.cloudera.sparkts.models

import org.apache.commons.math3.analysis.MultivariateFunction
import org.apache.spark.mllib.linalg._
import org.apache.commons.math3.optim.MaxIter
import org.apache.commons.math3.optim.nonlinear.scalar.ObjectiveFunction
import org.apache.commons.math3.optim.MaxEval
import org.apache.commons.math3.optim.SimpleBounds
import org.apache.commons.math3.optim.nonlinear.scalar.noderiv.BOBYQAOptimizer
import org.apache.commons.math3.optim.InitialGuess
import org.apache.commons.math3.optim.nonlinear.scalar.GoalType

/**
 * Triple exponential smoothing takes into account seasonal changes as well as trends.
 * Seasonality is defined to be the tendency of time-series data to exhibit behavior that repeats
 * itself every L periods, much like any harmonic function.
 *
 * The Holt-Winters method is a popular and effective approach to forecasting seasonal time series
 *
 * See https://en.wikipedia.org/wiki/Exponential_smoothing#Triple_exponential_smoothing
 * for more information on Triple Exponential Smoothing
 * See https://www.otexts.org/fpp/7/5 and
 * https://stat.ethz.ch/R-manual/R-devel/library/stats/html/HoltWinters.html
 * for more information on Holt Winter Method.
 */
object HoltWinters {

  /**
   * Fit HoltWinter model to a given time series. Holt Winter Model has three parameters
   * level, trend and season component of time series.
   * We use BOBYQA optimizer which is used to calculate minimum of a function with
   * bounded constraints and without using derivatives.
   * See http://www.damtp.cam.ac.uk/user/na/NA_papers/NA2009_06.pdf for more details.
   *
   * @param ts Time Series for which we want to fit HoltWinter Model
   * @param period Seasonality of data i.e  period of time before behavior begins to repeat itself
   * @param modelType Two variations differ in the nature of the seasonal component.
   *    Additive method is preferred when seasonal variations are roughly constant through the series,
   *    Multiplicative method is preferred when the seasonal variations are changing
   *    proportional to the level of the series.
   * @param method: Currently only BOBYQA is supported.
   */
  def fitModel(ts: Vector, period: Int, modelType: String = "additive", method: String = "BOBYQA")
  : HoltWintersModel = {
    method match {
      case "BOBYQA" => fitModelWithBOBYQA(ts, period, modelType)
      case _ => throw new UnsupportedOperationException("Currently only supports 'BOBYQA'")
    }
  }

  def fitModelWithBOBYQA(ts: Vector, period: Int, modelType:String): HoltWintersModel = {
    val optimizer = new BOBYQAOptimizer(7)
    val objectiveFunction = new ObjectiveFunction(new MultivariateFunction() {
      def value(params: Array[Double]): Double = {
        new HoltWintersModel(modelType, period, params(0), params(1), params(2)).sse(ts)
      }
    })

    // The starting guesses in R's stats:HoltWinters
    val initGuess = new InitialGuess(Array(0.3, 0.1, 0.1))
    val maxIter = new MaxIter(30000)
    val maxEval = new MaxEval(30000)
    val goal = GoalType.MINIMIZE
    val bounds = new SimpleBounds(Array(0.0, 0.0, 0.0), Array(1.0, 1.0, 1.0))
    val optimal = optimizer.optimize(objectiveFunction, goal, bounds,initGuess, maxIter, maxEval)
    val params = optimal.getPoint
    new HoltWintersModel(modelType, period, params(0), params(1), params(2))
  }
}

class HoltWintersModel(
    val modelType: String,
    val period: Int,
    val alpha: Double,
    val beta: Double,
    val gamma: Double) extends TimeSeriesModel {

  if (!modelType.equalsIgnoreCase("additive") && !modelType.equalsIgnoreCase("multiplicative")) {
    throw new IllegalArgumentException("Invalid model type: " + modelType)
  }
  val additive = modelType.equalsIgnoreCase("additive")

  /**
   * Calculates sum of squared errors, used to estimate the alpha and beta parameters
   *
   * @param ts A time series for which we want to calculate the SSE, given the current parameters
   * @return SSE
   */
  def sse(ts: Vector): Double = {
    val n = ts.size
    val smoothed = new DenseVector(Array.fill(n)(0.0))
    addTimeDependentEffects(ts, smoothed)

    var error = 0.0
    var sqrErrors = 0.0

    // We predict only from period by using the first period - 1 elements.
    for(i <- period to (n - 1)) {
      error = ts(i) - smoothed(i)
      sqrErrors += error * error
    }

    sqrErrors
  }

  /**
   * {@inheritDoc}
   */
  override def removeTimeDependentEffects(ts: Vector, dest: Vector = null): Vector = {
    throw new UnsupportedOperationException("not yet implemented")
  }

  /**
   * {@inheritDoc}
   */
  override def addTimeDependentEffects(ts: Vector, dest: Vector): Vector = {
    val destArr = dest.toArray
    val fitted = getHoltWintersComponents(ts)._1
    for (i <- 0 to (dest.size - 1)) {
      destArr(i) = fitted(i)
    }
    dest
  }

  /**
   * Final prediction Value is sum of level trend and season
   * But in R's stats:HoltWinters additional weight is given for trend
   *
   * @param ts
   * @param dest
   */
  def forecast(ts: Vector, dest: Vector): Vector = {
    val destArr = dest.toArray
    val (_, level, trend, season) = getHoltWintersComponents(ts)
    val n = ts.size

    val finalLevel = level(n - period)
    val finalTrend = trend(n - period)
    val finalSeason = new Array[Double](period)

    for (i <- 0 until period) {
      finalSeason(i) = season(i + n - period)
    }

    for (i <- 0 until dest.size) {
      destArr(i) = if (additive) {
        (finalLevel + (i + 1) * finalTrend) + finalSeason(i % period)
      } else {
        (finalLevel + (i + 1) * finalTrend) * finalSeason(i % period)
      }
    }
    dest
  }

  /**
   * Start from the intial parameters and then iterate to find the final parameters
   * using the equations of HoltWinter Method.
   * See https://www.otexts.org/fpp/7/5 and
   * https://stat.ethz.ch/R-manual/R-devel/library/stats/html/HoltWinters.html
   * for more information on Holt Winter Method equations.
   *
   * @param ts A time series for which we want the HoltWinter parameters level,trend and season.
   * @return (level trend season). Final vectors of level trend and season are returned.
   */
  def getHoltWintersComponents(ts: Vector): (Vector, Vector, Vector, Vector) = {
    val n = ts.size
    require(n >= 2, "Requires length of at least 2")

    val dest = new Array[Double](n)

    val level = new Array[Double](n)
    val trend = new Array[Double](n)
    val season = new Array[Double](n)

    val (initLevel, initTrend, initSeason) = initHoltWinters(ts)
    level(0) = initLevel
    trend(0) = initTrend
    for (i <- 0 until initSeason.size){
      season(i) = initSeason(i)
    }

    for (i <- 0 to (n - period - 1)) {
      dest(i + period) = level(i) + trend(i)

      // Add the seasonal factor for additive and multiply for multiplicative model.
      if (additive) {
        dest(i + period) += season(i)
      } else {
        dest(i + period) *= season(i)
      }

      val levelWeight = if (additive) {
        ts(i + period) - season(i)
      } else {
        ts(i + period) / season(i)
      }

      level(i + 1) = alpha * levelWeight + (1 - alpha) * (level(i) + trend(i))

      trend(i + 1) = beta * (level(i + 1) - level(i)) + (1 - beta) * trend(i)

      val seasonWeight = if (additive) {
        ts(i + period) - level(i + 1)
      } else {
        ts(i + period) / level(i + 1)
      }
      season(i + period) = gamma * seasonWeight + (1 - gamma) * season(i)
    }

    (Vectors.dense(dest), Vectors.dense(level), Vectors.dense(trend), Vectors.dense(season))
  }

  def getKernel(): (Array[Double]) = {
    if (period % 2 == 0){
      val kernel = Array.fill(period + 1)(1.0 / period)
      kernel(0) = 0.5 / period
      kernel(period) = 0.5 / period
      kernel
    } else {
      Array.fill(period)(1.0 / period)
    }
  }

  /**
   * Function to calculate the Weighted moving average/convolution using above kernel/weights
   * for input data.
   * See http://robjhyndman.com/papers/movingaverage.pdf for more information
   * @param inData Series on which you want to do moving average
   * @param kernel Weight vector for weighted moving average
   */
  def convolve(inData: Array[Double], kernel: Array[Double]): (Array[Double]) = {
    val kernelSize = kernel.size
    val dataSize = inData.size

    val outData = new Array[Double](dataSize - kernelSize + 1)

    var end = 0
    while (end <= (dataSize - kernelSize)) {
      var sum = 0.0
      for (i <- 0 until kernelSize) {
        sum += kernel(i) * inData(end + i)
      }

      outData(end) = sum
      end += 1
    }

    outData
  }

  /**
   * Function to get the initial level, trend and season using method suggested in
   * http://robjhyndman.com/hyndsight/hw-initialization/
   * @param ts
   */
  def initHoltWinters(ts: Vector): (Double, Double, Array[Double]) = {
    val arrTs = ts.toArray

    // Decompose a window of time series into level trend and seasonal using convolution
    val kernel = getKernel()
    val kernelSize = kernel.size
    val trend = convolve(arrTs.take(period * 2), kernel)

    // Remove the trend from time series. Subtract for additive and divide for multiplicative
    val n = (kernelSize -1) / 2
    val removeTrend = arrTs.take(period * 2).zip(
      Array.fill(n)(0.0) ++ trend ++ Array.fill(n)(0.0)).map{
      case (a, t) =>
        if (t != 0){
          if (additive) {
            (a - t)
          } else {
            (a / t)
          }
        }  else{
          0
        }
    }

    // seasonal mean is sum of mean of all season values of that period
    val seasonalMean = removeTrend.splitAt(period).zipped.map { case (prevx, x) =>
      if (prevx == 0 || x == 0) (x + prevx) else (x + prevx) / 2
    }

    val meanOfFigures = seasonalMean.sum / period

    // The seasonal mean is then centered and removed to get season.
    // Subtract for additive and divide for multiplicative.
    val initSeason = if (additive) {
      seasonalMean.map(_ - meanOfFigures )
    } else {
      seasonalMean.map(_ / meanOfFigures )
    }

    // Do Simple Linear Regression to find the initial level and trend
    val indices = 1 to trend.size
    val xbar = (indices.sum: Double) / indices.size
    val ybar = trend.sum / trend.size

    val xxbar = indices.map( x => (x - xbar) * (x - xbar) ).sum
    val xybar = indices.zip(trend).map {
      case (x, y) => (x - xbar) * (y - ybar)
    }.sum

    val initTrend = xybar / xxbar
    val initLevel = ybar - (initTrend * xbar)

    (initLevel, initTrend, initSeason)
  }
}

预测结果:

《时间序列预测法及Spark-TimeSerial实现》

折线图:

《时间序列预测法及Spark-TimeSerial实现》

actural:6/1当天的实际值

Hotwind7*MUTI :预测值(周期为一周,乘法)

Hotwind30*MUTI :预测值(周期为一个月,乘法)

其余:预测值与实际值的差值

结论:

按照5分钟为时间间隔。最小的周期性为天。即period=288。这个周期误差较其余两者稍微大一些。选择一周288 * 7 或者一个月288 * 30 ,效果如图。还可以。但其实这个值怎么选。有点存疑,即使根据sum of squared errors来看。

    原文作者:raincoffee
    原文地址: https://www.jianshu.com/p/c14e27ac2918
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞