pyspark.streaming module
Module contents
class pyspark.streaming.StreamingContext(sparkContext, batchDuration=None, jssc=None)
Bases: object
Spark Streaming功能的主要入口点。 StreamingContext表示与Spark集群的连接,可用于创建DStream各种输入源。 它可以来自现有的SparkContext。 在创建和转换DStream之后,可以分别使用context.start()和context.stop()来启动和停止流计算。 context.awaitTermination()允许当前线程通过stop()或异常等待上下文的终止。
- addStreamingListener(streamingListener)
添加[[org.apache.spark.streaming.scheduler.StreamingListener]]对象来接收与流式传输相关的系统事件。 - awaitTermination(timeout=None)
等待执行停止。 - awaitTerminationOrTimeout(timeout)
等待执行停止。 如果停止或在执行过程中抛出错误,则返回true; 或者如果从方法返回之前超过了等待时间,则返回false。 - binaryRecordsStream(directory, recordLength)
创建一个输入流,用于监视Hadoop兼容的文件系统是否存在新文件,并将其作为具有固定长度记录的平面二进制文件进行读取。 必须通过从同一个文件系统中的另一个位置“移动”文件来将文件写入受监视的目录。 文件名称是.
的将被忽略。 - checkpoint(directory)
设置上下文以定期检查DStream操作是否具有主容错功能。 该图会检查每个批处理间隔。 - classmethod getActive()
返回当前活动的StreamingContext(即,如果存在已启动但未停止的上下文)或None。 - classmethod getActiveOrCreate(checkpointPath, setupFunc)
返回活动的StreamingContext(即,当前已启动但未停止),或者从检查点数据重新创建StreamingContext,或使用提供的setupFunc函数创建新的StreamingContext。 如果checkpointPath为None或不包含有效的检查点数据,则将调用setupFunc来创建新的上下文并设置DStreams。 - classmethod getOrCreate(checkpointPath, setupFunc)
重新从检查点中的数据创建一个StreamingContext或创建一个新的StreamingContext。 如果提供的checkpointPath中存在检查点数据,则将从检查点数据重新创建StreamingContext。 如果数据不存在,则提供的setupFunc将用于创建新的上下文。 - queueStream(rdds, oneAtATime=True, default=None)
从RDD的队列或列表创建一个输入流。 在每个批次中,它将处理队列返回的一个或全部RDD。 - remember(duration)
在此上下文中设置每个DStream,以记住在上一个给定的持续时间内生成的RDD。 DStream只记录在有限的时间内和垃圾回收机制释放它之前的RDD。 此方法允许开发人员指定如何记住RDD(如果开发人员希望查询DStream计算之外的旧数据)。 - socketTextStream(hostname, port, storageLevel=StorageLevel(True, True, False, False, 2))
从TCP源hostname:port创建一个输入。 使用TCP套接字接收数据,接收字节被解释为UTF8编码的,\n
分隔的。 - sparkContext
返回与此StreamingContext关联的SparkContext。 - start()
开始执行流。 - stop(stopSparkContext=True, stopGraceFully=False)
停止流的执行,并确保所有接收到的数据已被处理。
Parameters:
- stopSparkContext – 停止关联的SparkContext或不。
- stopGracefully – 等待所有接收到的数据处理完成之后优雅地停止。
- textFileStream(directory)
创建一个输入流,用于监视与Hadoop兼容的文件系统中的新文件,并将其作为文本文件读取。 文件必须通过从同一个文件系统中的另一个位置“移动”来监视目录。 文件名称是.
的将被忽略。 - transform(dstreams, transformFunc)
通过对源 DStream 的每个 RDD 应用 RDD-to-RDD 函数,创建一个新的 DStream. 这个可以在 DStream 中的任何 RDD 操作中使用. - union(*dstreams)
从相同类型和相同的滑动时间的多个DStream中创建一个统一的DStream。
class pyspark.streaming.DStream(jdstream, ssc, jrdd_deserializer)
Bases: object
离散化流(DStream)是Spark Streaming中的基本抽象,它是连续的(相同类型的)RDD序列,用于表示连续的数据流(有关RDD的更多详细信息,请参阅Spark核心文档中的RDD)。
可以使用StreamingContext从实时数据(例如来自TCP套接字,Kafka,Flume等的数据)创建DStream,或者可以通过使用诸如map,window和reduceByKeyAndWindow之类的操作来转换现有DStream来生成DStream。 当Spark Streaming程序正在运行时,每个DStream都会定期生成一个RDD,可以通过实时数据或通过转换父DStream生成的RDD来生成RDD。
DStreams的内部特点是一些基本属性:
- DStream依赖其他DStream列表
- DStream生成RDD的时间间隔
- 在每个时间间隔之后用于生成RDD的函数
- cache()
使用默认存储级别(MEMORY_ONLY)存储此DStream的RDD。 - checkpoint(interval)
启用此DStream的RDD的定期检查点。 - cogroup(other, numPartitions=None)
通过在此DStream的RDD和其他DStream之间应用“cogroup”来返回一个新的DStream。
散列分区用于生成具有numPartitions分区的RDD。 - combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None)
通过将combineByKey应用于每个RDD来返回一个新的DStream。 - context()
返回与此DStream关联的StreamingContext - count()
返回一个新的DStream,其中每个RDD具有通过对该DStream的每个RDD进行计数而生成的单个元素。 - countByValue()
返回一个新的DStream,其中每个RDD包含此DStream的每个RDD中每个不同值的计数。 - countByValueAndWindow(windowDuration, slideDuration, numPartitions=None)
返回一个新的DStream,其中每个RDD包含在该DStream上的滑动窗口中的RDD中的不同元素的计数。
Parameters:
- windowDuration – 窗户的宽度; 必须是DStream批处理间隔的倍数
- slideDuration – 窗口的滑动间隔;必须是DStream批处理间隔的倍数
- numPartitions – 新DStream中每个RDD的分区数量
- countByWindow(windowDuration, slideDuration)
返回一个新的DStream,其中每个RDD都有一个通过计算该DStream中窗口中元素数目而生成的单个元素。 windowDuration和slideDuration是在window()操作中定义的。
这相当于window(windowDuration,slideDuration).count(),但是如果window很大,效率会更高。 - filter(f)
返回仅包含满足f的元素的新DStream。 - flatMap(f, preservesPartitioning=False)
通过对该DStream的所有元素应用一个函数来返回一个新的DStream,然后展平结果。 - flatMapValues(f)
通过将flatmap函数应用于此DStream中每个键值对中的value而不更改key来返回新的DStream。 - foreachRDD(func)
在此DStream中的每个RDD上应用一个函数。 - fullOuterJoin(other, numPartitions=None)
通过在此DStream的RDD和其他DStream之间应用“完全外连接”来返回一个新的DStream。
Hash分区用于生成具有numPartitions分区的RDD。 - glom()
返回一个新的DStream,其中通过将glom()应用于此DStream的RDD来生成RDD。 - groupByKey(numPartitions=None)
通过在每个RDD上应用groupByKey来返回一个新的DStream。 - groupByKeyAndWindow(windowDuration, slideDuration, numPartitions=None)
通过在滑动窗口上应用groupByKey来返回一个新的DStream。 与DStream.groupByKey()类似,但将其应用于滑动窗口。 - join(other, numPartitions=None)
通过应用此DStream的RDD和其他DStream之间的“join”来返回一个新的DStream。
Hash分区用于生成具有numPartitions分区的RDD。 - leftOuterJoin(other, numPartitions=None)
通过应用此DStream的RDD和其他DStream之间的“left outer join”来返回一个新的DStream。
Hash分区用于生成具有numPartitions分区的RDD。 - map(f, preservesPartitioning=False)
通过对DStream的每个元素应用一个函数来返回一个新的DStream。 - mapPartitions(f, preservesPartitioning=False)
返回一个新的DStream,其中通过将mapPartitions()应用于此DStream的每个RDD来生成每个RDD。 - mapPartitionsWithIndex(f, preservesPartitioning=False)
通过将mapPartitionsWithIndex()应用到此DStream的每个RDD,返回一个新的DStream,其中每个RDD都是通过应用mapPartitionsWithIndex()生成的。 - mapValues(f)
通过将映射函数应用于此DStream中每个键值对的值,而不更改键,可以返回一个新的DStream。 - partitionBy(numPartitions, partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
返回使用指定分区程序对每个RDD进行分区的DStream副本。 - persist(storageLevel)
以给定的存储级别存储该DStream的RDD。 - pprint(num=10)
打印此DStream中生成的每个RDD的第一个num元素。 - reduce(func)
Return a new DStream in which each RDD has a single element generated by reducing each RDD of this DStream. - reduceByKey(func, numPartitions=None)
通过将reduceByKey应用于每个RDD来返回一个新的DStream。 - reduceByKeyAndWindow(func, invFunc, windowDuration, slideDuration=None, numPartitions=None, filterFunc=None)
通过在滑动窗口上应用递增的reduceByKey来返回一个新的DStream。 - reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)
- repartition(numPartitions)
以增加或减少的并行度返回一个新的DStream。 - rightOuterJoin(other, numPartitions=None)
通过在此DStream的RDD和其他DStream之间应用“右外连接”来返回一个新的DStream。
Hash分区用于生成具有numPartitions分区的RDD。 - saveAsTextFiles(prefix, suffix=None)
将这个DStream中的每个RDD保存为文本文件,使用字符串表示元素。 - slice(begin, end)
返回“begin”到“end”之间的所有RDD(包括两者) - transform(func)
返回一个新的DStream,其中每个RDD是通过在该DStream的每个RDD上应用一个函数而生成的。
func可以有rdd这一个参数,或者有两个参数(time,rdd) - transformWith(func, other, keepSerializer=False)
返回一个新的DStream,其中每个RDD是通过在该DStream和其他DStream的每个RDD上应用一个函数而生成的。
func可以有两个参数(rdd_a,rdd_b)或者有三个参数(time,rdd_a,rdd_b) - union(other)
通过使用此DStream来联合另一个DStream的数据来返回一个新的DStream。 - updateStateByKey(updateFunc, numPartitions=None, initialRDD=None)
返回一个新的“state”DStream,其中通过对键的先前状态和键的新值应用给定函数来更新每个键的状态。 - window(windowDuration, slideDuration=None)
返回一个新的DStream,其中每个RDD包含所有在这个DStream上的滑动时间窗口中看到的元素。
class pyspark.streaming.StreamingListener
Bases: object
class Java
implements = [‘org.apache.spark.streaming.api.java.PythonStreamingListener’]
- StreamingListener.onBatchCompleted(batchCompleted)
当一批作业处理完成时调用。 - StreamingListener.onBatchStarted(batchStarted)
当处理一批作业已经开始时调用。 - StreamingListener.onBatchSubmitted(batchSubmitted)
当一批作业已经提交处理时调用。 - StreamingListener.onOutputOperationCompleted(outputOperationCompleted)
一批作业的处理已经完成时调用。 - StreamingListener.onOutputOperationStarted(outputOperationStarted)
批量作业的处理已经开始时调用。 - StreamingListener.onReceiverError(receiverError)
当接收方报告错误时调用。 - StreamingListener.onReceiverStarted(receiverStarted)
接收者启动时调用。 - StreamingListener.onReceiverStopped(receiverStopped)
接收者停止时调用。
pyspark.streaming.kafka module
class pyspark.streaming.kafka.Broker(host, port)
Bases: object
表示kafka broker的主机和端口信息。
class pyspark.streaming.kafka.KafkaMessageAndMetadata(topic, partition, offset, key, message)
Bases: object
kafka消息和元数据信息。 包括topic, partition, offset and message.
- key
- message
class pyspark.streaming.kafka.KafkaUtils
Bases: object
- static createDirectStream(ssc, topics, kafkaParams, fromOffsets=None, keyDecoder=<function utf8_decoder at 0x7f51e9a60b90>, valueDecoder=<function utf8_decoder at 0x7f51e9a60b90>, messageHandler=None)
- Note: 实验阶段
创建一个直接从Kafka broker和特定偏移量中获取消息的输入流。
这不是一个基于接收者的Kafka输入流,它在每个批次持续时间直接从Kafka拉取消息,并且不经过存储处理。
这不使用Zookeeper来存储偏移量。 消耗的偏移量由流自身跟踪。 为了与依赖于Zookeeper的Kafka监控工具进行互操作,您必须自己从流应用程序更新Kafka / Zookeeper。 您可以从生成的RDD中访问每个批次中使用的偏移量。
要从驱动程序失败中恢复,必须在StreamingContext中启用检查点。 关于消耗偏移量的信息可以从检查点恢复。
Parameters:
- ssc – StreamingContext object.
- topics – list of topic_name to consume.
- kafkaParams – Kafka的附加参数
- fromOffsets – 每个主题/分区Kafka偏移量定义(包括)流的起始点。
- keyDecoder – A function used to decode key (default is utf8_decoder).
- valueDecoder – A function used to decode value (default is utf8_decoder).
- messageHandler – 用于转换KafkaMessageAndMetadata的函数。 你可以使用messageHandler来评估meta(默认是None)。
Returns: A DStream object
- static createRDD(sc, kafkaParams, offsetRanges, leaders=None, keyDecoder=<function utf8_decoder at 0x7f51e9a60b90>, valueDecoder=<function utf8_decoder at 0x7f51e9a60b90>, messageHandler=None)
使用每个主题和分区的偏移量范围,从Kafka创建一个RDD。
Parameters:
- sc – SparkContext object
- kafkaParams – Additional params for Kafka
- offsetRanges – offsetRange列表来指定topic:partition:[start,end)来消费
- leaders – Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map, in which case leaders will be looked up on the driver.
- keyDecoder – A function used to decode key (default is utf8_decoder)
- valueDecoder – A function used to decode value (default is utf8_decoder)
- messageHandler – A function used to convert KafkaMessageAndMetadata. You can assess meta using messageHandler (default is None).
Returns: A DStream object
- static createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None, storageLevel=StorageLevel(True, True, False, False, 2), keyDecoder=<function utf8_decoder at 0x7f51e9a60b90>, valueDecoder=<function utf8_decoder at 0x7f51e9a60b90>)
创建一个从Kafka Broker中提取消息的输入流。
Parameters:
- ssc – StreamingContext object
- zkQuorum – Zookeeper quorum (hostname:port,hostname:port,..).
- groupId – The group id for this consumer.
- topics – Dict of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread.
- kafkaParams – Additional params for Kafka
- storageLevel – RDD storage level.
- keyDecoder – A function used to decode key (default is utf8_decoder)
- valueDecoder – A function used to decode value (default is utf8_decoder)
Returns: A DStream object
class pyspark.streaming.kafka.OffsetRange(topic, partition, fromOffset, untilOffset)
Bases: object
表示来自单个kafka TopicAndPartition的一系列偏移量。
class pyspark.streaming.kafka.TopicAndPartition(topic, partition)
Bases: object
代表Kafka的特定主题和分区。
pyspark.streaming.kafka.utf8_decoder(s)
将Unicode解码为UTF-8
pyspark.streaming.kinesis module
class pyspark.streaming.kinesis.KinesisUtils
Bases: object
- static createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName, initialPositionInStream, checkpointInterval, storageLevel=StorageLevel(True, True, False, False, 2), awsAccessKeyId=None, awsSecretKey=None, decoder=<function utf8_decoder at 0x7f51e9541d70>, stsAssumeRoleArn=None, stsSessionName=None, stsExternalId=None)
创建一个从Kinesis流中获取消息的输入流。 这使用Kinesis客户端库(KCL)从Kinesis中获取消息。
Parameters:
- ssc – StreamingContext object
- kinesisAppName – Kinesis application name used by the Kinesis Client Library (KCL) to update DynamoDB
- streamName – Kinesis stream name
- endpointUrl – Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
- regionName – Name of region used by the Kinesis Client Library (KCL) to update DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- initialPositionInStream – In the absence of Kinesis checkpoint info, this is the worker’s initial starting position in the stream. The values are either the beginning of the stream per Kinesis’ limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream (InitialPositionInStream.LATEST).
- checkpointInterval – Checkpoint interval for Kinesis checkpointing. See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints.
- storageLevel – Storage level to use for storing the received objects (default is StorageLevel.MEMORY_AND_DISK_2)
- awsAccessKeyId – AWS AccessKeyId (default is None. If None, will use DefaultAWSCredentialsProviderChain)
- awsSecretKey – AWS SecretKey (default is None. If None, will use DefaultAWSCredentialsProviderChain)
- decoder – A function used to decode value (default is utf8_decoder)
- stsAssumeRoleArn – ARN of IAM role to assume when using STS sessions to read from the Kinesis stream (default is None).
- stsSessionName – Name to uniquely identify STS sessions used to read from Kinesis stream, if STS is being used (default is None).
- stsExternalId – External ID that can be used to validate against the assumed IAM role’s trust policy, if STS is being used (default is None).
Returns: A DStream object
class pyspark.streaming.kinesis.InitialPositionInStream
Bases: object
LATEST = 0
TRIM_HORIZON = 1
pyspark.streaming.kinesis.utf8_decoder(s)
将Unicode解码为UTF-8
pyspark.streaming.flume.module
class pyspark.streaming.flume.FlumeUtils
Bases: object
- static createPollingStream(ssc, addresses, storageLevel=StorageLevel(True, True, False, False, 2), maxBatchSize=1000, parallelism=5, bodyDecoder=<function utf8_decoder at 0x7f51eb4416e0>)
创建要与Flume代理上部署的Spark agent一起使用的输入流。 这个流将轮询接收器的数据,并将提供事件,因为它们是可用的。
Parameters:
- ssc – StreamingContext object
- addresses – 运行Spark Sink的(主机,端口)列表
- storageLevel –用于存储接收对象的存储级别
- maxBatchSize – 在单个RPC调用中从Spark接收器中拉取的最大事件数量
- parallelism – 此流应发送到接收器的并发请求数。 请注意,同时拉取更多请求将导致此流使用更多的线程
- bodyDecoder – A function used to decode body (default is utf8_decoder)
Returns: A DStream object
- static createStream(ssc, hostname, port, storageLevel=StorageLevel(True, True, False, False, 2), enableDecompression=False, bodyDecoder=<function utf8_decoder at 0x7f51eb4416e0>)
创建一个从Flume中提取事件的输入流。
Parameters:
- ssc – StreamingContext object
- hostname – Hostname of the slave machine to which the flume data will be sent
- port – Port of the slave machine to which the flume data will be sent
- storageLevel – Storage level to use for storing the received objects
- enableDecompression – Should netty server decompress input stream
- bodyDecoder – A function used to decode body (default is utf8_decoder)
Returns: A DStream object
pyspark.streaming.flume.utf8_decoder(s)
将Unicode解码为UTF-8