Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file sum_group.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178openCoreopenAsyncmoduleAgg:sigtypet[@@derivingenumerate]valname:t->stringvaldesc:t->stringvalupdate:t->string->tvalget_val:t->stringend=structtype'au={name:string;desc:string;value:'a;update:'a->string->'a;get_val:'a->string}typet=T:'au->tletname(Tt)=t.nameletdesc(Tt)=t.descletupdate(Tt)s=T{twithvalue=t.updatet.value(String.strips)}letget_val(Tt)=t.get_valt.valueletall=letunless_emptyfinits=ifString.is_emptystheninitelsefinit(Bignum.of_strings)in[T{name="sum";desc="Sum of column";value=Bignum.zero;update=unless_emptyBignum.(+);get_val=Bignum.to_string_hum};T{name="count";desc="Count how many unique strings are in this column";value=String.Set.empty;update=Set.add;get_val=(funacc->Int.to_string(Set.lengthacc))};T{name="list";desc="List of all unique values in this column, separated by semicolons";value=String.Set.empty;update=Set.add;get_val=(funacc->String.concat~sep:";"(Set.to_listacc))};T{name="sum-pos";desc="Sum of all positive values in this column, ignoring negatives";value=Bignum.zero;update=unless_empty(funaccn->Bignum.(+)accBignum.(ifn>zerothennelsezero));get_val=Bignum.to_string_hum};T{name="sum-neg";desc="Sum of all negative values in this column, ignoring positives";value=Bignum.zero;update=unless_empty(funaccn->Bignum.(+)accBignum.(ifn<zerothennelsezero));get_val=Bignum.to_string_hum}];;endletprocess_input_file~sep~keys~aggregations~initreader=letpipe=Delimited.Read.pipe_of_readerDelimited.Read.Row.builder~sep~header:`YesreaderinPipe.fold_without_pushbackpipe~init~f:(funinitrow->letkey=List.mapkeys~f:(Delimited.Read.Row.get_exnrow)inMap.updateinitkey~f:(funprev->letprev=Option.value~default:aggregationsprevinList.mapprev~f:(fun(col,agg)->col,Agg.updateagg(Delimited.Read.Row.get_exnrowcol))));;letwrite_output~keys~aggregations~sepdata=letw=Delimited.Write.Expert.By_row.of_writer_and_close~sep(Lazy.forceWriter.stdout)inPipe.writew(keys@List.mapaggregations~f:(fun(col,agg)->col^"_"^Agg.nameagg))>>=fun()->Deferred.Map.iteri~how:`Sequentialdata~f:(fun~key~data->ifnot(Pipe.is_closedw)thenPipe.writew(key@List.mapdata~f:(fun(_,agg)->Agg.get_valagg))elseDeferred.unit)>>|fun()->Pipe.closew;;moduleKey=structmoduleT=structtypet=stringlist[@@derivingcompare,sexp]endincludeTincludeComparable.Make(T)endletreadme()={|
Fold over a csv file, creating a new csv file containing
key, and value fields, where the input is grouped by key
fields, then values are aggregated in one of a few
different ways. E.g.,
$ csv sum-group \
-key apple \
-key orange \
-sum-pos price \
-sum-neg price \
-sum price \
-count supplier \
- <<EOF | csv pretty
apple,orange,supplier,price
fuji,navel,dole,1.5
red delicious,navel,dole,-1.5
fuji,navel,sysco,0.1
EOF
orange
| price_sum-neg
| | price_sum-pos
| | | supplier_count
apple | | | | price_sum
| | | | | |
fuji navel 1.6 0 2 1.6
red delicious navel 0 -1.5 1 -1.5
|};;(* We want to offer the user a separate flag for each aggregation, but internally see one
big list of aggregations. *)letaggregation_flags=List.mapAgg.all~f:(funagg->letopenCommand.Paraminflag("-"^Agg.nameagg)~doc:(" "^Agg.descagg)(listed(Arg_type.create(funcolumn_to_aggregate->column_to_aggregate,agg))))|>Command.Param.all|>Command.Param.map~f:List.concat;;letcommand=letsummary="sum a csv file, grouping by specified fields, producing a new csv file"inCommand.async~summary~readme(let%map_open.Csv_paramsep=sepandkeys=flag"-key"(listedstring)~doc:" group by these fields"andaggregations=aggregation_flagsandcsv,csvs=anon(non_empty_sequence_as_pair("input-csv"%:Filename_unix.arg_type))inletopenAsyncinfun()->Deferred.List.fold(csv::csvs)~init:Key.Map.empty~f:(funinitcsv->matchcsvwith|"-"->process_input_file~sep~keys~aggregations~init(Lazy.forceReader.stdin)|csv->Reader.with_filecsv~f:(process_input_file~sep~keys~aggregations~init))>>=write_output~sep~keys~aggregations)~behave_nicely_in_pipeline:false;;