12
votes

I have data being inserted that uses host names. Annoyingly I'm about to change a domain from .lan to .mydomain.com

Obviously I'd like to be able to search my historical data of a machine as it crosses this change.

Can I update a tag definition from machine.lan to machine.mydomain.com?

4
Annoyingly, InfluxDB doesn't support some basic (to those coming from SQL) operations like updating tags, searching for NULLs, updating fields, dropping fields, or operators like IN.Dan Dascalescu

4 Answers

12
votes

While @Michael's answer is correct in that you can't change tag values via InfluxDB commands, you can however write a client script that can change the value of a tag by inserting "duplicate" points in the measurement with the same timestamp, fieldset and tagset, except that the desired tag will have its value changed.

Point with wrong tag (in Line Protocol format):

cpu,hostname=machine.lan cpu=50 1514970123

After running

INSERT cpu,hostname=machine.mydomain.com cpu=50 1514970123

a SELECT * FROM CPU would include

cpu,hostname=machine.lan cpu=50 1514970123
cpu,hostname=machine.mydomain.com cpu=50 1514970123

After the script runs all the INSERT commands, you'll need to drop the obsolete series of points with the old tag value:

DROP SERIES FROM cpu WHERE hostname='machine.lan'

Of course, this is highly inefficient (note in particular this bug) and if you need to update a tag value to another tag value that other points you don't want to drop already have, you can't just DROP SERIES. So please vote for InfluxDB to implement tag renaming and in particular, changing tag values based on WHERE queries. Or consider an alternative time-series database that lets you use regular SQL, such as Timescale.

4
votes

Unfortunately, there isn't a way to change tag names for historical data in InfluxDB.

4
votes

There is already an open feature request on GitHub for this. https://github.com/influxdata/influxdb/issues/4157

A probable solution suggested by influx developer if you want to go down the dump all, modify, re-import path (brutal, but effective), this comment might help.

https://github.com/influxdata/influxdb/issues/3904#issuecomment-268918613

0
votes

As other people have commented, the process seems to be:

  1. Load all the points you wish to change to local memory.
  2. Change all these points.
  3. Upload them back to influx.
  4. Delete the old values.

I wrote some helper functions in python today to do this for me, which I though I could share. The solution is a bit bulky, but I had most of the functions from before. I am sure there are other more concise ways, but I could not find a full python example, so here is mine:

Main function:

def replace_tag(database_name:str, measurement_name:str, tag:str, old_value:str, new_value:str,):
    """ Replaces an existing tag into a measurement, with a new tag for all affected records by deleting and reuploading """

    # Get a dataframe of selected data
    q = 'SELECT * FROM "'+ measurement_name + '"' + ' WHERE "' + tag + '" = ' + "'" + old_value + "'"
    df = influx_get_read_query(query=q, database_name=database_name)
    print(df)

    tags_keys = influx_get_tag_keys(database_name=database_name)
    field_keys = influx_get_field_keys(database_name=database_name, measurement_name=measurement_name)

    # Here we collect all the new records to be written to influx
    new_points = []

    # Loop through each row of the returned dataframe
    for i in trange(0, len(df)):
        row = df.iloc[i]
        print('row:', i)
        row_dict = row.to_dict()
        print('old row dict:', row_dict)

        new_tags = {}
        new_fields = {}
        new_time = ''

        for key in row_dict.keys():
            if key in tags_keys:
                new_tags[key] = row_dict[key]

            elif key in field_keys:
                new_fields[key] = row_dict[key]

            elif key == 'time':
                new_time = row_dict[key]

            else:
                easygui.msgbox('WARNING: A KEY WAS NOT FOUND: ' + str(key))

        # Replace the old value with a new value
        new_tags[tag] = new_value

        new_row_dict = {}
        new_row_dict['measurement'] = measurement_name
        new_row_dict['tags'] = new_tags
        new_row_dict['time'] = new_time
        new_row_dict['fields'] = new_fields


        # print('new row dict:', new_row_dict)
        new_points.append(new_row_dict)

    # Write the revised records back to the database
    influx_write_multiple_dicts(data_dicts=new_points, database_name=database_name)
    # When finished, delete all records.
    influx_delete_series(database_name=database_name, measurement_name=measurement_name, tag=tag, tag_value=old_value)

Other helper funnctions:

def influx_delete_series(database_name, measurement_name, tag, tag_value):

    q = 'DROP SERIES FROM "' + measurement_name + '"' + ' WHERE "' + tag + '" = ' + "'" + tag_value + "'"
    client = InfluxDBClient(host=HOST_ADDRESS, port=PORT, username="InfluxDB", password="Influx-DB-PASSWORD")
    client.switch_database(database_name)
    client.query(q, chunked=True, chunk_size=10000000000000000)


def influx_write_multiple_dicts(data_dicts:list, database_name):
    """Write a list of dicts with following structure:
    database_output_influx['measurement'] = 'SENSOR_ELEMENT_SUMMARY_TEST2'
        database_output_influx['tags'] = {'serialNumber':'1234', 'partNumber':'5678'}
        d = datetime.now()
        timestamp = d.isoformat('T')
        database_output_influx['time'] = timestamp
        database_output_influx['fields'] = summary_results_dict
    """
    client = InfluxDBClient(host=HOST_ADDRESS, port=PORT, username="InfluxDB", password="Influx-DB-PASSWORD")
    client.switch_database(database_name)
    print("Return code for influx write:", client.write_points(data_dicts))


def influx_get_tag_keys(database_name):

    client = InfluxDBClient(host=HOST_ADDRESS, port=PORT, username="InfluxDB", password="Influx-DB-PASSWORD")
    # client.create_database('SIEMENS_ENERGY_TEST')
    client.switch_database(database_name)

    results = client.query("SHOW TAG KEYS ")
    point_list = []
    points = results.get_points()
    for point in points:
        point_list.append(point['tagKey'])

    return point_list


def influx_get_field_keys(measurement_name, database_name):

    client = InfluxDBClient(host=HOST_ADDRESS, port=PORT, username="InfluxDB", password="Influx-DB-PASSWORD")
    client.switch_database(database_name)

    results = client.query("SHOW FIELD KEYS FROM " + measurement_name)
    point_list = []
    points = results.get_points()
    for point in points:
        point_list.append(point['fieldKey'])

    return point_list

def influx_get_read_query(query, database_name):
    """Returns a df of all measurements that have a certain field or value, for example stage. Note: single quotes for tag values, double quotes for al else. So best to use triple quotes surrounding statement. example:"""
    # q = """SELECT * FROM "SENSOR_ELEMENT_TEST_CYCLE" WHERE "TestStage" = '120'"""

    client = InfluxDBClient(host=HOST_ADDRESS, port=PORT, username="InfluxDB", password="Influx-DB-PASSWORD")
    client.switch_database(database_name)
    # print("Dataframe of all measurments of type:", measurement_name)
    q = query
    df = pd.DataFrame(client.query(q, chunked=True, chunk_size=10000000000000000).get_points())
    # print("DF: ", tabulate(df, headers=df.columns.tolist(), tablefmt="psql"))
    return df