22
votes

I'm working on a bot for a competition that receives its input through sys.stdin and uses Python's print() for output. I have the following:

import sys

def main():
    while True:
        line = sys.stdin.readline()
        parts = line.split()
        if len(parts) > 0:
            # do stuff

The problem is that the input comes in through a stream and using the above, blocks me from printing anything back until the stream is closed. What can I do to make this work?

6
Non-blocking on stdin either doesn't work or doesn't work very reliably. Are you allowed to use threading/multiprocessing? Cause that should work - Wayne Werner

6 Answers

11
votes

By turning blocking off you can only read a character at a time. So, there is no way to get readline() to work in a non-blocking context. I assume you just want to read key presses to control the robot.

I have had no luck using select.select() on Linux and created a way with tweaking termios settings. So, this is Linux specific but works for me:

old_settings=None

def init_anykey():
   global old_settings
   old_settings = termios.tcgetattr(sys.stdin)
   new_settings = termios.tcgetattr(sys.stdin)
   new_settings[3] = new_settings[3] & ~(termios.ECHO | termios.ICANON) # lflags
   new_settings[6][termios.VMIN] = 0  # cc
   new_settings[6][termios.VTIME] = 0 # cc
   termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings)

@atexit.register
def term_anykey():
   global old_settings
   if old_settings:
      termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)

def anykey():
   ch_set = []
   ch = os.read(sys.stdin.fileno(), 1)
   while ch != None and len(ch) > 0:
      ch_set.append( ord(ch[0]) )
      ch = os.read(sys.stdin.fileno(), 1)
   return ch_set;

init_anykey()
while True:
   key = anykey()
   if key != None:
      print key
   else:
      time.sleep(0.1)

A better Windows or cross-platform answer is here: Python nonblocking console input

7
votes
#-----------------------------------------------------------------------
# Get a character from the keyboard.  If Block is True wait for input,
# else return any available character or throw an exception if none is
# available.  Ctrl+C isn't handled and continues to generate the usual
# SIGINT signal, but special keys like the arrows return the expected 
# escape sequences.
#
# This requires:
#
#    import sys, select
#
# This was tested using python 2.7 on Mac OS X.  It will work on any
# Linux system, but will likely fail on Windows due to select/stdin
# limitations.
#-----------------------------------------------------------------------

def GetChar(Block=True):
  if Block or select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
    return sys.stdin.read(1)
  raise error('NoChar')
7
votes

You can use selectors for handle I/O multiplexing:

https://docs.python.org/3/library/selectors.html

Try this out:

#! /usr/bin/python3

import sys
import fcntl
import os
import selectors

# set sys.stdin non-blocking
orig_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK)

# function to be called when enter is pressed
def got_keyboard_data(stdin):
    print('Keyboard input: {}'.format(stdin.read()))

# register event
m_selector = selectors.DefaultSelector()
m_selector.register(sys.stdin, selectors.EVENT_READ, got_keyboard_data)

while True:
    sys.stdout.write('Type something and hit enter: ')
    sys.stdout.flush()
    for k, mask in m_selector.select():
        callback = k.data
        callback(k.fileobj)

The above code will hold on the line

for k, mask in m_selector.select():

until a registered event occurs, returning a selector_key instance (k) and a mask of monitored events.

In the above example we registered only one event (Enter key press):

m_selector.register(sys.stdin, selectors.EVENT_READ, got_keyboard_data)

The selector key instance is defined as follows:

abstractmethod register(fileobj, events, data=None)

Therefore, the register method sets k.data as our callback function got_keyboard_data, and calls it when the Enter key is pressed :

    callback = k.data
    callback(k.fileobj)

A more complete example (and hopefully more useful) would be to multiplex stdin data from user with incomming connections from network:

import selectors
import socket
import sys
import os
import fcntl

m_selector = selectors.DefaultSelector()

# set sys.stdin non-blocking
def set_input_nonblocking():
    orig_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
    fcntl.fcntl(sys.stdin, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK)

def create_socket(port, max_conn):
    server_addr = ('localhost', port)
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.setblocking(False)
    server.bind(server_addr)
    server.listen(max_conn)
    return server

def read(conn, mask):
    global GO_ON
    client_address = conn.getpeername()
    data = conn.recv(1024)
    print('Got {} from {}'.format(data, client_address))
    if not data:
         GO_ON = False

def accept(sock, mask):
    new_conn, addr = sock.accept()
    new_conn.setblocking(False)
    print('Accepting connection from {}'.format(addr))
    m_selector.register(new_conn, selectors.EVENT_READ, read)

def quit():
    global GO_ON
    print('Exiting...')
    GO_ON = False


def from_keyboard(arg1, arg2):
    line = arg1.read()
    if line == 'quit\n':
        quit()
    else:
        print('User input: {}'.format(line))

GO_ON = True
set_input_nonblocking()

# listen to port 10000, at most 10 connections
server = create_socket(10000, 10)

m_selector.register(server, selectors.EVENT_READ, accept)
m_selector.register(sys.stdin, selectors.EVENT_READ, from_keyboard)

while GO_ON:
    sys.stdout.write('>>> ')
    sys.stdout.flush()
    for k, mask in m_selector.select():
        callback = k.data
        callback(k.fileobj, mask)


# unregister events
m_selector.unregister(sys.stdin)

# close connection
server.shutdown()
server.close()

#  close select
m_selector.close()

You can test using two terminals. first terminal:

$ python3 test.py 
>>> bla

open another terminal and run:

 $ nc localhost 10000
 hey!

back to the first

>>> qwerqwer     

Result (seen on the main terminal):

$ python3 test.py 
>>> bla
User input: bla

>>> Accepting connection from ('127.0.0.1', 39598)
>>> Got b'hey!\n' from ('127.0.0.1', 39598)
>>> qwerqwer     
User input: qwerqwer

>>> 
3
votes

This is a posix solution, close to that of swdev.

As he stated, you have to play with VMIN an VTIME to catch more than one char without requiring user to press [enter]. Trying to only use raw mode will be a problem as special keys like arrows can mess next keypress.

Here we use tty.setcbreak() or tty.setraw() as a shortcut, but they have short internals.

import termios
import tty
import sys
import select

def get_enter_key():
    fd = sys.stdin.fileno()
    orig_fl = termios.tcgetattr(fd)
    try:
        tty.setcbreak(fd)  # use tty.setraw() instead to catch ^C also
        mode = termios.tcgetattr(fd)
        CC = 6
        mode[CC][termios.VMIN] = 0
        mode[CC][termios.VTIME] = 0
        termios.tcsetattr(fd, termios.TCSAFLUSH, mode)
        keypress, _, _ = select.select([fd], [], [])
        if keypress:
            return sys.stdin.read(4095)
    finally:
        termios.tcsetattr(fd, termios.TCSANOW, orig_fl)

try:
    while True:
        print(get_enter_key())
except KeyboardInterrupt:
    print('exiting')
    sys.exit()

note that there are two potential timeouts you could add here:

0
votes
-6
votes

Use a generator - thankfully sys.stdin is already a generator!

A generator enables you to work on an infinite stream. Always when you call it, it returns the next element. In order to build a generator you need the yield keyword.

for line in sys.stdin:
    print line

    if a_certain_situation_happens:
        break        

Do not forget to place a break statement into the loop if a certain, wished situation happens.

You can find more information about generators on: