0
votes

because i can not use spark csv i have manually created a dataframe from CSV as follow:

raw_data=sc.textFile("data/ALS.csv").cache()
csv_data=raw_data.map(lambda l:l.split(","))
header=csv_data.first()
csv_data=csv_data.filter(lambda line:line !=header)

row_data=csv_data.map(lambda p :Row (
location_history_id=p[0],
user_id=p[1],
latitude=p[2],
longitude=p[3],
address=p[4],
created_at=p[5],
valid_until=p[6],
timezone_offset_secs=p[7],
opening_times_id=p[8],
timezone_id=p[9]))

location_df = sqlContext.createDataFrame(row_data)
location_df.registerTempTable("locations")

i need only two columns :

lati_longi_df=sqlContext.sql("""SELECT latitude, longitude FROM locations""")

rdd_lati_longi = lati_longi_df.map(lambda data: Vectors.dense([float(c) for c in data]))
rdd_lati_longi.take(2):

[DenseVector([-6.2416, 106.7949]), DenseVector([-6.2443, 106.7956])]

now it seems that every thing is ready for KMeans training:

    clusters = KMeans.train(rdd_lati_longi, 10, maxIterations=30,
    runs=10, initializationMode="random")

but i get the following error:

IndexError: list index out of range

First three lines of ALS.csv: location_history_id,user_id,latitude,longitude,address,created_at,valid_until,timezone_offset_secs,opening_times_id,timezone_id

1
@eliasah i appreciate your help - chessosapiens
@zero323 i would appreciate your help - chessosapiens
You should provide some sample data from ALS.csv (should be easy to copy/paste) so that we can reproduce your code and hopefully your error. - Katya Willard
@KatyaHandler i edited the question you can find the first three line and header - chessosapiens
I can't reproduce your problem given those three lines. Out of curiosity, what happens if you run the following: csv_data.map(lambda x: len(x)).distinct().collect()? I wonder if somewhere in your underlying dataset is an empty line or perhaps a truncated row, that left you with an uneven number of columns per row. - Katya Willard

1 Answers

0
votes

Why don't you allow spark to parse csv instead? You can enable csv support with something like this:

pyspark --packages com.databricks:spark-csv_2.10:1.4.0