Module file_log_reader

Copyright © 2015 Serge Aleynikov

Behaviours: gen_server.

Authors: Serge Aleynikov (saleyn@gmail.com).

Description

Periodically read an append-only log file and parse newly added data.

The user controls the interval in msec how often to check for file modifications. When new data is appended to file it triggers invocation of the user-defined parsing function that deliminates the file, and the result is delivered to the consumer by calling the consumer callback function.

The log reader can be started as a gen_server or can be controlled synchronously by using init/3, run/1, and close/1 methods.

Data Types

consumer()

consumer() = 
    fun((Msg ::
             any() |
             {'$end_of_file',
              string(),
              Res ::
                  ok |
                  {error | throw | exit,
                   Reason :: any(),
                   Stacktrace :: list()}},
         Pos :: integer(),
         State :: any()) ->
            NewState :: any())

options()

options() = 
    [{pos, StartPos :: integer()} |
     {end_pos, ReadUntilPos :: integer() | eof} |
     {max_size, MaxReadSize :: integer() | eof} |
     {timeout, MSec :: integer()} |
     {retry_sec, Sec :: integer()} |
     {parser,
      fun((Data :: binary(), ParserState :: any()) ->
              {ok,
               Msg :: any(),
               Tail :: binary(),
               NewParserState :: any()} |
              {incomplete, NewParserState :: any()} |
              {skip, Tail :: binary(), NewParserState :: any()}) |
      {Mod :: atom(), Fun :: atom()}} |
     {pstate,
      fun((File :: string(), consumer(), Options :: list()) ->
              any()) |
      any()} |
     {pstate_update,
      fun((Option :: atom(), Value :: any(), PState :: any()) ->
              {ok, NewPState :: any()} | {error, any()})}]

Details:

pos
Start reading from this position (default: 0)
end_pos
Read until this position and stop. If provided and file position reaches end_pos, the consumer() callback given to the reader will be called as: Consumer({'$end_of_file', Filename::string(), Result}, Pos::integer(), State) where Result is ok or {error|exit|exception, Error::any(), StackTrace} if an error occured.
max_size
Maximum chunk size to read from file in a single pass (default: 32M).
timeout
Number of milliseconds between successive file scanning (default: 1000)
retry_sec
Number of seconds between successive retries upon failure to open the market data file passed to one of the start*/{3,4} functions (default: 15). The value of 0 means that the file must exist or else the process won't start.
parser
Is the function to be called when the next chunk is read from file. The function must return:
{ok, Msg, Tail, State}
invoke Consumer callback passing it the parsed message Msg, and continue parsing the Tail binary
{incomplete, State}
the data contains no complete messages - wait until there's more
{skip, Tail, State}
disregard input and continue parsing Tail without calling Consumer callback
pstate
Initial value of the parser state or a functor fun((File::string() Consumer::consumer(), Options::options()) -> PState::any())
pstate_update
Update function of the parser state. Called when the user invokes update_pstate/3

Function Index

close/1Close file processor (use this method when not using gen_server).
init/3When using file processor without gen_server, use this function to initialize the state, and then call run/1.
position/1Report last processed file position/size.
pstate/1Return current parser state ({pstate, any()} initialization option).
run/1Process file from given position Pos to EndPos (or eof).
start/3
start/4Start the server outside of supervision tree.
start_link/3Process File by calling Consumer callback on every delimited message.
start_link/4To be called by the supervisor in order to start the server.
stop/1Stop the server.
update_pstate/3Update parser state.

Function Details

close/1

close(State) -> any()

Close file processor (use this method when not using gen_server)

init/3

init(File :: string(),
     Consumer :: consumer(),
     Options :: options()) ->
        {ok,
         #state{consumer = consumer(),
                tref = reference(),
                fd = port(),
                file = string(),
                pos = integer() | eof,
                end_pos = integer() | eof | undefined,
                max_size = integer(),
                timeout = integer(),
                parser =
                    {atom(), atom()} |
                    fun((binary(), any()) ->
                            {any(), binary(), any()}),
                pstate = any(),
                pstate_update = fun((any()) -> any()),
                part_size = integer(),
                done =
                    false | ok |
                    {error | exception | exit,
                     Reason :: any(),
                     Stacktrace :: any()},
                incompl_count = integer()}}

When using file processor without gen_server, use this function to initialize the state, and then call run/1.

position/1

position(Pid :: pid() | atom()) -> {ok, Position :: integer()}

Report last processed file position/size.

pstate/1

pstate(Pid :: pid() | atom()) -> {ok, any()}

Return current parser state ({pstate, any()} initialization option).

run/1

run(State ::
        #state{consumer = consumer(),
               tref = reference(),
               fd = port(),
               file = string(),
               pos = integer() | eof,
               end_pos = integer() | eof | undefined,
               max_size = integer(),
               timeout = integer(),
               parser =
                   {atom(), atom()} |
                   fun((binary(), any()) ->
                           {any(), binary(), any()}),
               pstate = any(),
               pstate_update = fun((any()) -> any()),
               part_size = integer(),
               done =
                   false | ok |
                   {error | exception | exit,
                    Reason :: any(),
                    Stacktrace :: any()},
               incompl_count = integer()}) ->
       #state{consumer = consumer(),
              tref = reference(),
              fd = port(),
              file = string(),
              pos = integer() | eof,
              end_pos = integer() | eof | undefined,
              max_size = integer(),
              timeout = integer(),
              parser =
                  {atom(), atom()} |
                  fun((binary(), any()) ->
                          {any(), binary(), any()}),
              pstate = any(),
              pstate_update = fun((any()) -> any()),
              part_size = integer(),
              done =
                  false | ok |
                  {error | exception | exit,
                   Reason :: any(),
                   Stacktrace :: any()},
              incompl_count = integer()}

Process file from given position Pos to EndPos (or eof).

start/3

start(File :: string(),
      Consumer :: consumer(),
      Options :: options()) ->
         {ok, pid()} | {error, any()}

start/4

start(RegName :: atom(),
      File :: string(),
      Consumer :: consumer(),
      Options :: options()) ->
         {ok, pid()} | {error, any()}

Start the server outside of supervision tree.

start_link/3

start_link(File :: string(),
           Consumer :: consumer(),
           Options :: options()) ->
              {ok, pid()} | ignore | {error, any()}

Process File by calling Consumer callback on every delimited message. Message delimination is handled by the {parser, Parser} option. Consumer function gets called iteratively with the following arguments:

(Msg, Pos::integer(), State)
Msg is what the parser function returned. Pos is current file position following the Msg. State is current value of parser state that is initialized by {pstate, PState} option given to the start_link/{3,4} function
({'$end_of_file', Filename::string(), Result}, Pos::integer(), PState)
This call happens when end of file condition is reached (see definition of consumer() type)
Consumer can end processing normally without reaching the end of file by throwing {eof, PState} exception.

start_link/4

start_link(RegName :: atom(),
           File :: string(),
           Consumer :: consumer(),
           Options :: options()) ->
              {ok, pid()} | ignore | {error, any()}

To be called by the supervisor in order to start the server. If init/1 fails with Reason, the function returns {error,Reason}. If init/1 returns {stop,Reason} or ignore, the process is terminated and the function returns {error,Reason} or ignore, respectively.

See also: start_link/3.

stop/1

stop(Pid :: pid() | atom()) -> ok

Stop the server.

update_pstate/3

update_pstate(Pid :: pid(), Option :: atom(), Value :: any()) ->
                 {ok, State :: any()}

Update parser state.