This can be considered a follow-up to this thread, but I need more help with moving things along. Hopefully someone can have a look over my attempts below and provide further guidance.
To summarize, I need a cloud function that
- Is triggered by a PubSub message being published in topic A (this can be done in UI).
- reads a messy object change notification message in "push" PubSub topic A.
- "parse" it
- publish a message in PubSub topic B, with the original message ID as data, and other metadata (e.g. file name, size, time) as attributes.
. 1:
Example of a messy object change notification:
\n "kind": "storage#object",\n "id": "bucketcfpubsub/test.txt/1544681756538155",\n "selfLink": "https://www.googleapis.com/storage/v1/b/bucketcfpubsub/o/test.txt",\n "name": "test.txt",\n "bucket": "bucketcfpubsub",\n "generation": "1544681756538155",\n "metageneration": "1",\n "contentType": "text/plain",\n "timeCreated": "2018-12-13T06:15:56.537Z",\n "updated": "2018-12-13T06:15:56.537Z",\n "storageClass": "STANDARD",\n "timeStorageClassUpdated": "2018-12-13T06:15:56.537Z",\n "size": "1938",\n "md5Hash": "sDSXIvkR/PBg4mHyIUIvww==",\n "mediaLink": "https://www.googleapis.com/download/storage/v1/b/bucketcfpubsub/o/test.txt?generation=1544681756538155&alt=media",\n "crc32c": "UDhyzw==",\n "etag": "CKvqjvuTnN8CEAE="\n}\n
To clarify, is this a message with blank "data" field, and all the information above are in attribute pairs (like "attribute name": "attribute data")? Or is it just a long string stuffed into the "data" field, with no "attributes"?
. 2:
In the above thread, a "pull" subscription is used. Is it better than using a "push" subscription? Push sample below:
def create_push_subscription(project_id,
topic_name,
subscription_name,
endpoint):
"""Create a new push subscription on the given topic."""
# [START pubsub_create_push_subscription]
from google.cloud import pubsub_v1
# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"
# TODO subscription_name = "Your Pub/Sub subscription name"
# TODO endpoint = "https://my-test-project.appspot.com/push"
subscriber = pubsub_v1.SubscriberClient()
topic_path = subscriber.topic_path(project_id, topic_name)
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
push_config = pubsub_v1.types.PushConfig(
push_endpoint=endpoint)
subscription = subscriber.create_subscription(
subscription_path, topic_path, push_config)
print('Push subscription created: {}'.format(subscription))
print('Endpoint for subscription is: {}'.format(endpoint))
# [END pubsub_create_push_subscription]
Or do I need further code after this to receive messages?
Also, doesn't this create a new subscriber every time the Cloud Function is triggered by a pubsub message being published? Should I add a subscription delete code at the end of the CF, or are there more efficient ways to do this?
. 3:
Next, to parse the code, this sample code doing a few attributes as follows:
def summarize(message):
# [START parse_message]
data = message.data
attributes = message.attributes
event_type = attributes['eventType']
bucket_id = attributes['bucketId']
object_id = attributes['objectId']
Will this work with my above notification in 1:?
. 4:
How do I separate the topic_name? Steps 1 and 2 use topic A, while this step is to publish into topic B. Is is as simple as re-writing the topic_name in the below code example?
# TODO topic_name = "Your Pub/Sub topic name"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
for n in range(1, 10):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
# Add two attributes, origin and username, to the message
publisher.publish(
topic_path, data, origin='python-sample', username='gcp')
print('Published messages with custom attributes.')
Source where I got most of the sample code from (besides the above thread):python-docs-samples. Will adapting and stringing the above code samples together produce useful code? Or will I still be missing stuff like "import ****"?