pyspark dataframe отбрасывает повторяющиеся значения с более старой меткой времени

У меня есть фреймворк pyspark, в котором есть столбцы starttime и stoptime с дополнительными столбцами, значения которых обновляются

|startime  |stoptime  |hour  |minute  |sec  |sip          |dip            |sport|dport|proto|pkt |byt |
|1504766585|1504801216|16    |20      |16   |192.168.0.11 |23.204.108.58  |51249|80   |6    |0   |0   |
|1504766585|1504801216|16    |20      |16   |192.168.0.11 |23.204.108.58  |51249|80   |6    |0   |0   |
|1504781751|1504801216|16    |20      |16   |192.168.0.11 |23.72.38.96    |51252|80   |6    |0   |0   |
|1504781751|1504801216|16    |20      |16   |192.168.0.11 |23.72.38.96    |51252|80   |6    |0   |0   |
|1504766585|1504801336|16    |22      |16   |192.168.0.11 |23.204.108.58  |51249|80   |6    |0   |0   |
|1504766585|1504801336|16    |22      |16   |192.168.0.11 |23.204.108.58  |51249|80   |6    |0   |0   |
|1504781751|1504801336|16    |22      |16   |192.168.0.11 |23.72.38.96    |51252|80   |6    |0   |0   |
|1504781751|1504801336|16    |22      |16   |192.168.0.11 |23.72.38.96    |51252|80   |6    |0   |0   |

В этом примере я хочу выбрать все строки с последним временем остановки, все остальные значения столбцов дублируются.


person user2825083    schedule 07.09.2017    source источник


Ответы (2)


arrow_upward
1
arrow_downward

Полагаю, вы хотите вести последние записи на каждый sport. Вы должны использовать оконную функцию для определения последней записи для каждого раздела:

import pyspark.sql.functions as psf
from pyspark.sql import Window
w = Window.partitionBy("sport").orderBy(psf.desc("stoptime"))

df.withColumn("rn", psf.row_number().over(w)).filter("rn = 1").drop("rn")

    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
    |  startime|  stoptime|hour|min|sec|         sip|          dip|sport|dport|proto|pkt|byt|
    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
    |1504781751|1504801336|  16| 22| 16|192.168.0.11|  23.72.38.96|51252|   80|    6|  0|  0|
    |1504766585|1504801336|  16| 22| 16|192.168.0.11|23.204.108.58|51249|   80|    6|  0|  0|
    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+

У вас будет столько записей, сколько различных разделов для sport.

Если вам нужен последний stoptime для всей таблицы без разделения, вы можете удалить partitionBy и использовать вместо него dense_rank (одинаковые значения будут иметь одинаковый ранг):

w = Window.orderBy(psf.desc("stoptime"))

df.withColumn("rn", psf.dense_rank().over(w)).filter("rn = 1").drop("rn").show()

    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
    |  startime|  stoptime|hour|min|sec|         sip|          dip|sport|dport|proto|pkt|byt|
    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
    |1504766585|1504801336|  16| 22| 16|192.168.0.11|23.204.108.58|51249|   80|    6|  0|  0|
    |1504766585|1504801336|  16| 22| 16|192.168.0.11|23.204.108.58|51249|   80|    6|  0|  0|
    |1504781751|1504801336|  16| 22| 16|192.168.0.11|  23.72.38.96|51252|   80|    6|  0|  0|
    |1504781751|1504801336|  16| 22| 16|192.168.0.11|  23.72.38.96|51252|   80|    6|  0|  0|
    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
person MaFF    schedule 09.09.2017
comment
Мне нужны строки с одинаковым временем начала, часами, минутами, секундами, sip, dip, sport, dport, proto, но с последним временем остановки. Это, в свою очередь, даст мне max byt и pkt - person user2825083; 11.09.2017
comment
Если вам нужно максимальное значение stoptime для каждого отдельного startime, hour, min, sec, sip, dip, sport, dport, proto. вам просто нужно, чтобы все эти столбцы в partitionBy - person MaFF; 11.09.2017
comment
Выглядит правильно? import pyspark.sql.functions as psf from pyspark.sql import Window w = Window.partitionBy("startime","hour","sip","dip","sport","dport","proto").orderBy(psf.desc("stoptime")) df = dataframe.withColumn("rn", psf.row_number().over(w)).filter("rn = 1").drop("rn") df.show() - person user2825083; 11.09.2017
comment
Это выглядит правильно, проверьте результат, чтобы убедиться, что он соответствует вашим ожиданиям. - person MaFF; 11.09.2017

arrow_upward
0
arrow_downward

Надеюсь это поможет!

from pyspark.sql.functions import col

df = sc.parallelize([(1504766585,1504801216,16,20,16,'192.168.0.11','23.204.108.58',51249,80,6,0,0),
                     (1504766585,1504801216,16,20,16,'192.168.0.11','23.204.108.58',51249,80,6,0,0),
                     (1504781751,1504801216,16,20,16,'192.168.0.11','23.72.38.96',  51252,80,6,0,0),
                     (1504781751,1504801216,16,20,16,'192.168.0.11','23.72.38.96',  51252,80,6,0,0),
                     (1504766585,1504801336,16,22,16,'192.168.0.11','23.204.108.58',51249,80,6,0,0),
                     (1504766585,1504801336,16,22,16,'192.168.0.11','23.204.108.58',51249,80,6,0,0),
                     (1504781751,1504801336,16,22,16,'192.168.0.11','23.72.38.96',  51252,80,6,0,0),
                     (1504781751,1504801336,16,22,16,'192.168.0.11','23.72.38.96',  51252,80,6,0,0)]).\
    toDF(["startime","stoptime","hour","min","sec","sip","dip","sport","dport","proto","pkt","byt"])

df1 = df.where(col("stoptime") == df.select("stoptime").rdd.max()[0]).distinct()
df1.show()

Выход

+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
|  startime|  stoptime|hour|min|sec|         sip|          dip|sport|dport|proto|pkt|byt|
+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
|1504766585|1504801336|  16| 22| 16|192.168.0.11|23.204.108.58|51249|   80|    6|  0|  0|
|1504781751|1504801336|  16| 22| 16|192.168.0.11|  23.72.38.96|51252|   80|    6|  0|  0|
+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
person 1.618    schedule 07.09.2017
comment
Только что добавлено выделение кода (Python) - ищите ‹! - language-all: lang-python -› в начале кода ... - person desertnaut; 07.09.2017
comment
@ user2825083, вы должны отметить его как правильный ответ, если он ответил на ваш запрос, поскольку это поможет другим в случае, если они столкнутся с аналогичной проблемой в будущем. Спасибо! - person 1.618; 08.09.2017
comment
сбор значений pyspark в объекты python может привести к неожиданным результатам для float, поскольку они не аппроксимируются одинаково. Я бы не рекомендовал это делать. - person MaFF; 09.09.2017
comment
@ Мэри может объяснить поподробнее, если ты не против? Я не могу найти здесь неискровых предметов? - person 1.618; 09.09.2017
comment
df.select("stoptime").rdd.max()[0] - это питон int - person MaFF; 09.09.2017
comment
Вы также конвертируете его в rdd для агрегации, которая менее эффективна, чем агрегация фреймов данных. - person MaFF; 09.09.2017
comment
@Marie Я думаю, что мы всегда должны говорить разумно и быть готовыми защищать любые теоретические предположения, прежде чем что-то публиковать (например, сначала вы говорите о float, затем int, затем rdd vs df). Разве мы не должны сосредоточиться на помощи членам сообщества, а не на такого рода диалогах? (Кстати, это эталонное тестирование может заинтересовать ты). Пожалуйста, не пугайте ОП таким количеством комментариев. Будь спортом !!! - person 1.618; 10.09.2017
comment
о нет, я думаю, сообщения могут вводить в заблуждение ... Я не собирался показаться унизительным, я просто пытался помочь. Я потратил время, пытаясь найти воспроизводимый пример для float, это проблема, которую я должен был решить для моего коллеги, и я не мог ее воспроизвести. Я также столкнулся с этой веткой stackoverflow.com/questions/46122846/ пару дней назад, поэтому я знал, что другие люди столкнулись с этой проблемой преобразования. Я просто советовал не переключаться между питоном и искрой, если этого можно избежать. Прости еще раз - person MaFF; 10.09.2017
comment
Спасибо за предложения. Это тоже сработает? from pyspark.sql.functions import col, max as max_ dataframe.groupBy(dataframe.trhour, dataframe.trminute, dataframe.trsec, dataframe.starttime, dataframe.sip, dataframe.dip, dataframe.sport, dataframe.dport, dataframe.proto, dataframe.ipkt, dataframe.ibyt).agg(max_('stoptime')) - person user2825083; 11.09.2017
comment
@ user2825083 Это должно сработать, но разве это не даст строк с максимальным значением в каждой группе? - person 1.618; 11.09.2017
comment
@Prem Это решение предоставит только строки с максимальным временем остановки. - person user2825083; 11.09.2017
comment
Но в своем исходном посте вы прозвучали иначе. Похоже, случай неправильного сбора требований :) - Я думал, что вы хотите, чтобы строки имели последнее время остановки (независимо от группы !!!). Я бы посоветовал правильно обновить вопрос, указав ожидаемый результат, чтобы другие пользователи не запутались при поиске ответа на свою проблему. Спасибо! - person 1.618; 11.09.2017