5
votes

Background

I've written a logfile parser using attoparsec. All my smaller parsers succeed, as does the composed final parser. I've confirmed this with tests. But I'm stumbling over performing operations with the parsed stream.

What I've tried

I started by trying to pass the successfully parsed input to a function. But all the seems to get is Done (), which I'm presuming means the logfile has been consumed by this point.

prepareStats :: Result Log -> IO ()
prepareStats r =
case r of
    Fail _ _ _ -> putStrLn $ "Parsing failed"
    Done _ parsedLog -> putStrLn "Success" -- This now has a [LogEntry] array. Do something with it.

main :: IO ()
main = do
[f] <- getArgs
logFile <- B.readFile (f :: FilePath)
let results = parseOnly parseLog logFile
putStrLn "TBC"

What I'm trying to do

I want to accumulate some stats from the logfile as I consume the input. For example, I'm parsing response codes and I'd like to count how many 2** responses there were and how many 4/5** ones. I'm parsing the number of bytes each response returned as Ints, and I'd like to efficiently sum these (sounds like a foldl'?). I've defined a data type like this:

data Stats = Stats {
    successfulRequestsPerMinute :: Int
  , failingRequestsPerMinute    :: Int
  , meanResponseTime            :: Int
  , megabytesPerMinute          :: Int
  } deriving Show

And I'd like to constantly update that as I parse the input. But the part of performing operations as I consume is where I got stuck. So far print is the only function I've successfully passed output to and it showed the parsing is succeeding by returning Done before printing the output.

My main parser(s) look like this:

parseLogEntry :: Parser LogEntry
parseLogEntry = do
ip <- logItem
_ <- char ' '
logName <- logItem
_ <- char ' '
user <- logItem
_ <- char ' '
time <- datetimeLogItem
_ <- char ' '
firstLogLine <- quotedLogItem
_ <- char ' '
finalRequestStatus <- intLogItem
_ <- char ' '
responseSizeB <- intLogItem
_ <- char ' '
timeToResponse <- intLogItem
return $ LogEntry ip logName user time firstLogLine finalRequestStatus responseSizeB timeToResponse

type Log = [LogEntry]

parseLog :: Parser Log
parseLog = many $ parseLogEntry <* endOfLine

Desired outcome

I want to pass each parsed line to a function that will update the above data type. Ideally I want this to be very memory efficient because it'll be operating on large files.

3
Please edit your question and make your code self-contained. In particular, add the required import statements. Also, have you considered using the applicative style for parseLogEntry? It wouldn't affect performance, but it would readability.jub0bs

3 Answers

2
votes

You have to make your unit of parsing a single log entry rather than a list of log entries.

It's not pretty, but here is an example of how to interleave parsing and processing:

(Depends on bytestring, attoparsec and mtl)

{-# LANGUAGE NoMonomorphismRestriction, FlexibleContexts #-}

import qualified Data.ByteString.Char8 as BS
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.Attoparsec.ByteString.Char8 hiding (takeWhile)
import Data.Char
import Control.Monad.State.Strict

aWord :: Parser BS.ByteString
aWord = skipSpace >> A.takeWhile isAlphaNum

getNext :: MonadState [a] m => m (Maybe a)
getNext = do
  xs <- get
  case xs of
    [] -> return Nothing
    (y:ys) -> put ys >> return (Just y)

loop iresult =
  case iresult of
    Fail _ _ msg  -> error $ "parse failed: " ++ msg
    Done x' aword -> do lift $ process aword; loop (parse aWord x')
    Partial _     -> do
      mx <- getNext
      case mx of
        Just y  -> loop (feed iresult y)
        Nothing -> case feed iresult BS.empty of
                     Fail _ _ msg  -> error $ "parse failed: " ++ msg
                     Done x' aword -> do lift $ process aword; return ()
                     Partial _     -> error $ "partial returned"  -- probably can't happen

process :: Show a => a -> IO ()
process w = putStrLn $ "got a word: " ++ show w

theWords = map BS.pack [ "this is a te", "st of the emergency ", "broadcasting sys", "tem"]


main = runStateT (loop (Partial (parse aWord))) theWords

Notes:

  • We parse a aWord at a time and call process after each word is recognized.
  • Use feed to feed the parser more input when it returns a Partial.
  • Feed the parser an empty string when there is no more input left.
  • When Done is return, process the recognized word and continue with parse aWord.
  • getNext is just an example of a monadic function which gets the next unit of input. Replace it with your own version - i.e. something that reads the next line from a file.

Update

Here is a solution using parseWith as @dfeuer suggested:

noMoreInput = fmap null get

loop2 x = do
  iresult <- parseWith (fmap (fromMaybe BS.empty) getNext) aWord x
  case iresult of
    Fail _ _ msg  -> error $ "parse failed: " ++ msg
    Done x' aword -> do lift $ process aword;
                        if BS.null x'
                           then do b <- noMoreInput
                                   if b then return ()
                                        else loop2 x'
                           else loop2 x'
    Partial _     -> error $ "huh???" -- this really can't happen

main2 = runStateT (loop2 BS.empty) theWords
1
votes

If each log entry is exactly one line, here's a simpler solution:

do loglines <- fmap BS.lines $ BS.readfile "input-file.log"
   foldl' go initialStats loglines
   where
     go stats logline = 
        case parseOnly yourParser logline of
          Left e  -> error $ "oops: " ++ e
          Right r -> let stats' = ... combine r with stats ...
                     in stats'

Basically you are just reading the file line-by-line and calling parseOnly on each line and accumulating the results.

1
votes

This is properly done with a streaming library

main = do
  f:_ <- getArgs
  withFile f ReadMode $ \h -> do
       result <- foldStream $ streamProcess $ streamHandle h
       print result
where
 streamHandle  = undefined
 streamProcess = undefined
 foldStream    = undefined

where the blanks can be filled by any streaming library, e.g.

 import qualified Pipes.Prelude as P
 import Pipes
 import qualified Pipes.ByteString as PB
 import Pipes.Group (folds)
 import qualified Control.Foldl as L
 import Control.Lens (view) -- or import Lens.Simple (view), or whatever

 streamHandle =  Pipes.ByteStream.fromHandle :: Handle -> Producer ByteString IO ()

in that case we might then divide the labor further thus:

 streamProcess :: Producer ByteString m r -> Producer LogEntry m r
 streamProcess p =  streamLines p >-> lineParser

 streamLines :: Producer ByteString m r -> Producer ByteString m r
 streamLines p = L.purely fold L.list (view (Pipes.ByteString.lines p)) >-> P.map B.toStrict

 lineParser :: Pipe ByteString LogEntry m r
 lineParser = P.map (parseOnly line_parser) >-> P.concat -- concat removes lefts

(This is slightly laborious because pipes is sensible persnickety about accumulating lines, and memory generally: we are just trying to get a producer of individual strict bytestring lines, and then to convert that into a producer of parsed lines, and then to throw out bad parses, if there are any. With io-streams or conduit, things will be basically the same, and that particular step will be easier.)

We are now in a position to fold over our Producer LogEntry IO (). This can be done explicitly using Pipes.Prelude.fold, which makes a strict left fold. Here we will just cop the structure from user5402

 foldStream str = P.fold go initial_stats id
  where
   go stats_till_now new_entry = undefined

If you get used to the use of the foldl library and the application of a fold to a Producer with L.purely fold some_fold, then you can build Control.Foldl.Folds for your LogEntries out of components and slot in different requests as you please.

If you use pipes-attoparsec and include the newline bit in your parser, then you can just write

 handleToLogEntries :: Handle -> Producer LogEntry IO ()
 handleToLogEntries h = void $ parsed my_line_parser (fromHandle h) >-> P.concat

and get the Producer LogEntry IO () more directly. (This ultra-simple way of writing it will, however, stop at a bad parse; dividing on lines first will be faster than using attoparsec to recognize newlines.) This is very simple with io-streams too, you would write something like

import qualified System.IO.Streams as Streams

io :: Handle -> IO ()
io h = do  
    bytes <- Streams.handleToInputStream h
    log_entries <- Streams.parserToInputStream my_line_parser bytes
    fold_result <- Stream.fold go initial_stats log_entries
    print fold_result

or to keep with the structure above:

 where 
  streamHandle = Streams.handleToInputStream
  streamProcess io_bytes = 
      io_bytes >>= Streams.parserToInputStream my_line_parser
  foldStream io_logentries =
      log_entries >>= Stream.fold go initial_stats 

Either way, my_line_parser should return a Maybe LogEntry and should recognize the newline.